In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import year, month, countDistinct, lag, col, sum, when
from pyspark.sql.window import Window
from pyspark.sql.functions import isnan, when, count, col
from pyspark.sql.functions import sum, col, round

import pyspark.sql.functions as F
from pyspark.sql.types import *
from datetime import date
# create a SparkSession
spark = SparkSession.builder.appName("myApp").getOrCreate()
spark

In [4]:
Base = spark.read.csv('Donntransformes.csv',header=True)

In [5]:
# Get number of records
print("The data contain %d records." % Base.count())

[Stage 2:>                                                          (0 + 4) / 4]

The data contain 1372581 records.


                                                                                

In [6]:
# Get number of columns
print("The data contain %d columns." % len(Base.columns))

The data contain 29 columns.


In [7]:
# Select randomly 1M records
BaseE = Base.sample(False, 0.5, 42)

In [8]:
print("The data contain %d records." % BaseE.count())



The data contain 686558 records.


                                                                                

In [9]:
BaseE.show()

23/05/19 18:05:08 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


+-----------+----------+------------+--------------------+----------+----------+----------------+----------------+----------------+-----+--------------+-------------------+----------------------+-------------------+-----------+-----------+------------+-------+---------------+--------+-----------+------+------+--------------+-----------+--------------+-----------+---+------+
|code_client|TEL_MOBILE|   code_pret|date_de_deboursement|   produit|code_agent|          agence|montant_demander|montant_accorder|duree|   valider_par|   secteur_activite|code_secteur_du_client|code_secteur_projet|gouvernorat|status_pret|Accor/Payoff|     id|ACCOUNT_OFFICER|INDUSTRY|NATIONALITY|GENDER|SECTOR|MARITAL_STATUS|LR_CARTE_BQ|LR_CUS_MNT_REV|SUBURB_TOWN|AGE|Fraude|
+-----------+----------+------------+--------------------+----------+----------+----------------+----------------+----------------+-----+--------------+-------------------+----------------------+-------------------+-----------+-----------+-------

Data Pre-processing



In [10]:
### Get count of nan or missing values
from pyspark.sql.functions import isnan, when, count, col

BaseE.select([count(when(isnan(c), c)).alias(c) for c in BaseE.columns]).show()



+-----------+----------+---------+--------------------+-------+----------+------+----------------+----------------+-----+-----------+----------------+----------------------+-------------------+-----------+-----------+------------+---+---------------+--------+-----------+------+------+--------------+-----------+--------------+-----------+---+------+
|code_client|TEL_MOBILE|code_pret|date_de_deboursement|produit|code_agent|agence|montant_demander|montant_accorder|duree|valider_par|secteur_activite|code_secteur_du_client|code_secteur_projet|gouvernorat|status_pret|Accor/Payoff| id|ACCOUNT_OFFICER|INDUSTRY|NATIONALITY|GENDER|SECTOR|MARITAL_STATUS|LR_CARTE_BQ|LR_CUS_MNT_REV|SUBURB_TOWN|AGE|Fraude|
+-----------+----------+---------+--------------------+-------+----------+------+----------------+----------------+-----+-----------+----------------+----------------------+-------------------+-----------+-----------+------------+---+---------------+--------+-----------+------+------+-------------

                                                                                

Observations:


This dataset is clean, no missing values.



In [11]:
Base = Base.withColumnRenamed("Accor/Payoff", "Accor_Payoff")


In [12]:
BaseE = BaseE.withColumnRenamed("Accor/Payoff", "Accor_Payoff")

In [13]:
# Schema of the data
BaseE.printSchema()

root
 |-- code_client: string (nullable = true)
 |-- TEL_MOBILE: string (nullable = true)
 |-- code_pret: string (nullable = true)
 |-- date_de_deboursement: string (nullable = true)
 |-- produit: string (nullable = true)
 |-- code_agent: string (nullable = true)
 |-- agence: string (nullable = true)
 |-- montant_demander: string (nullable = true)
 |-- montant_accorder: string (nullable = true)
 |-- duree: string (nullable = true)
 |-- valider_par: string (nullable = true)
 |-- secteur_activite: string (nullable = true)
 |-- code_secteur_du_client: string (nullable = true)
 |-- code_secteur_projet: string (nullable = true)
 |-- gouvernorat: string (nullable = true)
 |-- status_pret: string (nullable = true)
 |-- Accor_Payoff: string (nullable = true)
 |-- id: string (nullable = true)
 |-- ACCOUNT_OFFICER: string (nullable = true)
 |-- INDUSTRY: string (nullable = true)
 |-- NATIONALITY: string (nullable = true)
 |-- GENDER: string (nullable = true)
 |-- SECTOR: string (nullable = true)

In [14]:
BaseE.select("code_client").show()


+-----------+
|code_client|
+-----------+
|    2097563|
|    2097615|
|    2551368|
|    2551451|
|    2551527|
|    2551726|
|    2551726|
|    2097834|
|    2551802|
|    2097847|
|    2098019|
|    2551915|
|    2098089|
|    2551922|
|    2551922|
|    2098089|
|    2551963|
|    2551992|
|    2098328|
|    2551992|
+-----------+
only showing top 20 rows



In [15]:
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline


In [16]:
BaseE.select("produit").show()


+----------+
|   produit|
+----------+
|   Mawilni|
|   Mawilni|
|   Mawilni|
|Mouasasaty|
|   Mawilni|
|   Mawilni|
|   Mawilni|
|     Darna|
|   Mawilni|
|     Darna|
|    Taalim|
|   Mawilni|
|   Mawilni|
|   Mawilni|
|     Darna|
|   Mawilni|
|   Mawilni|
|   Mawilni|
|   Mawssem|
|   Mawilni|
+----------+
only showing top 20 rows



from pyspark.ml.feature import OneHotEncoder

# Créer une instance de StringIndexer pour chaque colonne à encoder
indexer = StringIndexer(inputCol="produit", outputCol="produit_indexed")

# Appliquer la transformation d'indexation
indexed = indexer.fit(BaseE).transform(BaseE)

# Créer une instance de OneHotEncoder pour chaque colonne indexée
encoder = OneHotEncoder(inputCols=["produit_indexed"], outputCols=["produitF_encoded"])

# Appliquer la transformation d'encodage one-hot
df_encoded = encoder.fit(indexed).transform(indexed)

In [17]:
# pip install --quiet scikit-learn

In [18]:
# from sklearn.preprocessing import LabelEncoder


In [19]:
from pyspark.ml.feature import StringIndexer

In [20]:

columns_to_encode = ["status_pret", "LR_CARTE_BQ","MARITAL_STATUS","GENDER","NATIONALITY","Accor_Payoff","gouvernorat","secteur_activite","valider_par","duree","agence","produit","date_de_deboursement","code_pret"]

# Create a StringIndexer for each column
string_indexers = [StringIndexer(inputCol=col, outputCol=col+'_F') for col in columns_to_encode]

# Fit and transform the DataFrame for each StringIndexer
indexed_data = BaseE
for indexer in string_indexers:
    indexed_data = indexer.fit(indexed_data).transform(indexed_data)


# Show the transformed DataFrame
indexed_data.show()

23/05/19 18:05:57 WARN DAGScheduler: Broadcasting large task binary with size 21.8 MiB
[Stage 56:>                                                         (0 + 1) / 1]

+-----------+----------+------------+--------------------+----------+----------+----------------+----------------+----------------+-----+--------------+-------------------+----------------------+-------------------+-----------+-----------+------------+-------+---------------+--------+-----------+------+------+--------------+-----------+--------------+-----------+---+------+-------------+-------------+----------------+--------+-------------+--------------+-------------+------------------+-------------+-------+--------+---------+----------------------+-----------+
|code_client|TEL_MOBILE|   code_pret|date_de_deboursement|   produit|code_agent|          agence|montant_demander|montant_accorder|duree|   valider_par|   secteur_activite|code_secteur_du_client|code_secteur_projet|gouvernorat|status_pret|Accor_Payoff|     id|ACCOUNT_OFFICER|INDUSTRY|NATIONALITY|GENDER|SECTOR|MARITAL_STATUS|LR_CARTE_BQ|LR_CUS_MNT_REV|SUBURB_TOWN|AGE|Fraude|status_pret_F|LR_CARTE_BQ_F|MARITAL_STATUS_F|GENDER_F|N

                                                                                

In [21]:
# Supprimer les colonnes d'origine de indexed_data
indexed_data = indexed_data.drop(*columns_to_encode)

# Afficher le DataFrame résultant
indexed_data.show()

23/05/19 18:05:59 WARN DAGScheduler: Broadcasting large task binary with size 21.8 MiB
[Stage 57:>                                                         (0 + 1) / 1]

+-----------+----------+----------+----------------+----------------+----------------------+-------------------+-------+---------------+--------+------+--------------+-----------+---+------+-------------+-------------+----------------+--------+-------------+--------------+-------------+------------------+-------------+-------+--------+---------+----------------------+-----------+
|code_client|TEL_MOBILE|code_agent|montant_demander|montant_accorder|code_secteur_du_client|code_secteur_projet|     id|ACCOUNT_OFFICER|INDUSTRY|SECTOR|LR_CUS_MNT_REV|SUBURB_TOWN|AGE|Fraude|status_pret_F|LR_CARTE_BQ_F|MARITAL_STATUS_F|GENDER_F|NATIONALITY_F|Accor_Payoff_F|gouvernorat_F|secteur_activite_F|valider_par_F|duree_F|agence_F|produit_F|date_de_deboursement_F|code_pret_F|
+-----------+----------+----------+----------------+----------------+----------------------+-------------------+-------+---------------+--------+------+--------------+-----------+---+------+-------------+-------------+----------------

                                                                                

In [22]:
indexed_data.select("produit_F").show()

+---------+
|produit_F|
+---------+
|      0.0|
|      0.0|
|      0.0|
|      4.0|
|      0.0|
|      0.0|
|      0.0|
|      2.0|
|      0.0|
|      2.0|
|      3.0|
|      0.0|
|      0.0|
|      0.0|
|      2.0|
|      0.0|
|      0.0|
|      0.0|
|      1.0|
|      0.0|
+---------+
only showing top 20 rows



In [23]:
indexed_data.dtypes

[('code_client', 'string'),
 ('TEL_MOBILE', 'string'),
 ('code_agent', 'string'),
 ('montant_demander', 'string'),
 ('montant_accorder', 'string'),
 ('code_secteur_du_client', 'string'),
 ('code_secteur_projet', 'string'),
 ('id', 'string'),
 ('ACCOUNT_OFFICER', 'string'),
 ('INDUSTRY', 'string'),
 ('SECTOR', 'string'),
 ('LR_CUS_MNT_REV', 'string'),
 ('SUBURB_TOWN', 'string'),
 ('AGE', 'string'),
 ('Fraude', 'string'),
 ('status_pret_F', 'double'),
 ('LR_CARTE_BQ_F', 'double'),
 ('MARITAL_STATUS_F', 'double'),
 ('GENDER_F', 'double'),
 ('NATIONALITY_F', 'double'),
 ('Accor_Payoff_F', 'double'),
 ('gouvernorat_F', 'double'),
 ('secteur_activite_F', 'double'),
 ('valider_par_F', 'double'),
 ('duree_F', 'double'),
 ('agence_F', 'double'),
 ('produit_F', 'double'),
 ('date_de_deboursement_F', 'double'),
 ('code_pret_F', 'double')]

In [24]:
from pyspark.sql.types import DoubleType

indexed_data = indexed_data.withColumn("code_client", indexed_data.code_client.cast(DoubleType()))
indexed_data = indexed_data.withColumn("TEL_MOBILE", indexed_data.TEL_MOBILE.cast(DoubleType()))
indexed_data = indexed_data.withColumn("code_agent", indexed_data.code_agent.cast(DoubleType()))
indexed_data = indexed_data.withColumn("Fraude", indexed_data.Fraude.cast('int'))
indexed_data= indexed_data.withColumn("montant_demander", indexed_data.montant_demander.cast(DoubleType()))
indexed_data = indexed_data.withColumn("montant_accorder", indexed_data.montant_accorder.cast(DoubleType()))
indexed_data = indexed_data.withColumn("code_secteur_du_client", indexed_data.code_secteur_du_client.cast(DoubleType()))
indexed_data = indexed_data.withColumn("code_secteur_projet", indexed_data.code_secteur_projet.cast(DoubleType()))
indexed_data = indexed_data.withColumn("id", indexed_data.id.cast(DoubleType()))
indexed_data = indexed_data.withColumn("ACCOUNT_OFFICER", indexed_data.ACCOUNT_OFFICER.cast(DoubleType()))
indexed_data = indexed_data.withColumn("INDUSTRY",indexed_data.INDUSTRY.cast(DoubleType()))
indexed_data = indexed_data.withColumn("SECTOR", indexed_data.SECTOR.cast(DoubleType()))
indexed_data = indexed_data.withColumn("LR_CUS_MNT_REV", indexed_data.LR_CUS_MNT_REV.cast(DoubleType()))
indexed_data = indexed_data.withColumn("SUBURB_TOWN", indexed_data.SUBURB_TOWN.cast(DoubleType()))
indexed_data = indexed_data.withColumn("AGE", indexed_data.AGE.cast('int'))


columns_to_encode = ["status_pret", "LR_CARTE_BQ","MARITAL_STATUS","GENDER","NATIONALITY","Accor_Payoff","gouvernorat","secteur_activite","valider_par","duree","agence","produit","date_de_deboursement","code_pret"]


In [25]:
# Check column data types
indexed_data.dtypes

[('code_client', 'double'),
 ('TEL_MOBILE', 'double'),
 ('code_agent', 'double'),
 ('montant_demander', 'double'),
 ('montant_accorder', 'double'),
 ('code_secteur_du_client', 'double'),
 ('code_secteur_projet', 'double'),
 ('id', 'double'),
 ('ACCOUNT_OFFICER', 'double'),
 ('INDUSTRY', 'double'),
 ('SECTOR', 'double'),
 ('LR_CUS_MNT_REV', 'double'),
 ('SUBURB_TOWN', 'double'),
 ('AGE', 'int'),
 ('Fraude', 'int'),
 ('status_pret_F', 'double'),
 ('LR_CARTE_BQ_F', 'double'),
 ('MARITAL_STATUS_F', 'double'),
 ('GENDER_F', 'double'),
 ('NATIONALITY_F', 'double'),
 ('Accor_Payoff_F', 'double'),
 ('gouvernorat_F', 'double'),
 ('secteur_activite_F', 'double'),
 ('valider_par_F', 'double'),
 ('duree_F', 'double'),
 ('agence_F', 'double'),
 ('produit_F', 'double'),
 ('date_de_deboursement_F', 'double'),
 ('code_pret_F', 'double')]

In [26]:
indexed_data.show()

23/05/19 18:06:15 WARN DAGScheduler: Broadcasting large task binary with size 21.8 MiB
[Stage 59:>                                                         (0 + 1) / 1]

+-----------+----------+----------+----------------+----------------+----------------------+-------------------+---------+---------------+--------+------+--------------+-----------+---+------+-------------+-------------+----------------+--------+-------------+--------------+-------------+------------------+-------------+-------+--------+---------+----------------------+-----------+
|code_client|TEL_MOBILE|code_agent|montant_demander|montant_accorder|code_secteur_du_client|code_secteur_projet|       id|ACCOUNT_OFFICER|INDUSTRY|SECTOR|LR_CUS_MNT_REV|SUBURB_TOWN|AGE|Fraude|status_pret_F|LR_CARTE_BQ_F|MARITAL_STATUS_F|GENDER_F|NATIONALITY_F|Accor_Payoff_F|gouvernorat_F|secteur_activite_F|valider_par_F|duree_F|agence_F|produit_F|date_de_deboursement_F|code_pret_F|
+-----------+----------+----------+----------------+----------------+----------------------+-------------------+---------+---------------+--------+------+--------------+-----------+---+------+-------------+-------------+----------

                                                                                

EDA : Feature Engineering

In [27]:
indexed_data.groupBy("Fraude").count().show()
indexed_data.write.option("header",True) \
 .csv("indexed.csv")

                                                                                

+------+------+
|Fraude| count|
+------+------+
|     1|393096|
|     0|293462|
+------+------+



AnalysisException: [PATH_ALREADY_EXISTS] Path file:/data/indexed.csv already exists. Set mode as "overwrite" to overwrite the existing path.

In [28]:

from pyspark.sql.functions import expr

# Créer une session Spark
spark = SparkSession.builder.getOrCreate()

# Supposons que vous ayez un DataFrame appelé "data" contenant les données
# avec une colonne "Fraude" représentant les fraudes (0 pour non fraude, 1 pour fraude)
# et d'autres colonnes si nécessaire.

# Calculer le nombre total d'enregistrements
total_count = indexed_data.count()

# Calculer le nombre de fraudes
fraude_count = indexed_data.filter(col("Fraude") == 1).count()

# Calculer le nombre de non-fraudes
non_fraude_count = total_count - fraude_count

# Calculer le pourcentage de fraude et de non-fraude
pourcentage_fraude = (fraude_count / total_count) * 100
pourcentage_non_fraude = (non_fraude_count / total_count) * 100

# Afficher les résultats
print("Pourcentage de fraude: ", pourcentage_fraude)
print("Pourcentage de non-fraude: ", pourcentage_non_fraude)

[Stage 66:>                                                         (0 + 4) / 4]

Pourcentage de fraude:  57.25605120033559
Pourcentage de non-fraude:  42.74394879966441


                                                                                

In [29]:
import matplotlib.pyplot as plt

# Créer les données pour le graphique
labels = ['Fraude', 'Non-Fraude']
pourcentages = [pourcentage_fraude, pourcentage_non_fraude]

# Créer le graphique à secteurs (pie chart)
plt.pie(pourcentages, labels=labels, autopct='%1.1f%%')

# Ajouter un titre
plt.title('Pourcentage de Fraude et de Non-Fraude')

# Afficher le graphique
plt.show()



ModuleNotFoundError: No module named 'matplotlib'

In [30]:
import pandas as pd

In [31]:
indexed_data.dtypes

[('code_client', 'double'),
 ('TEL_MOBILE', 'double'),
 ('code_agent', 'double'),
 ('montant_demander', 'double'),
 ('montant_accorder', 'double'),
 ('code_secteur_du_client', 'double'),
 ('code_secteur_projet', 'double'),
 ('id', 'double'),
 ('ACCOUNT_OFFICER', 'double'),
 ('INDUSTRY', 'double'),
 ('SECTOR', 'double'),
 ('LR_CUS_MNT_REV', 'double'),
 ('SUBURB_TOWN', 'double'),
 ('AGE', 'int'),
 ('Fraude', 'int'),
 ('status_pret_F', 'double'),
 ('LR_CARTE_BQ_F', 'double'),
 ('MARITAL_STATUS_F', 'double'),
 ('GENDER_F', 'double'),
 ('NATIONALITY_F', 'double'),
 ('Accor_Payoff_F', 'double'),
 ('gouvernorat_F', 'double'),
 ('secteur_activite_F', 'double'),
 ('valider_par_F', 'double'),
 ('duree_F', 'double'),
 ('agence_F', 'double'),
 ('produit_F', 'double'),
 ('date_de_deboursement_F', 'double'),
 ('code_pret_F', 'double')]

In [32]:
from pyspark.ml.feature import VectorAssembler

assembler = VectorAssembler(inputCols=['code_client', 'TEL_MOBILE', 'code_pret_F',
                    'date_de_deboursement_F','produit_F', 
                    'agence_F','montant_demander', 'montant_accorder',
                    'duree_F','valider_par_F', 
                    'secteur_activite_F','code_secteur_du_client', 'code_secteur_projet', 'gouvernorat_F',
                    'status_pret_F','Accor_Payoff_F', 
                    'id','ACCOUNT_OFFICER', 'INDUSTRY', 'NATIONALITY_F',
                    'GENDER_F','SECTOR', 
                    'MARITAL_STATUS_F','LR_CARTE_BQ_F', 'LR_CUS_MNT_REV', 'SUBURB_TOWN',
                    'AGE'], outputCol='features')
data = assembler.transform(indexed_data)

In [33]:
data['features','Fraude'].show(100)


23/05/19 18:07:47 WARN DAGScheduler: Broadcasting large task binary with size 21.8 MiB
[Stage 69:>                                                         (0 + 1) / 1]

+--------------------+------+
|            features|Fraude|
+--------------------+------+
|[2097563.0,3.0,38...|     0|
|[2097615.0,3.0,30...|     1|
|[2551368.0,3.0,61...|     0|
|[2551451.0,3.0,97...|     0|
|[2551527.0,3.0,60...|     1|
|[2551726.0,3.0,60...|     0|
|[2551726.0,3.0,22...|     0|
|[2097834.0,3.0,14...|     1|
|[2551802.0,3.0,60...|     1|
|[2097847.0,3.0,18...|     1|
|[2098019.0,3.0,54...|     0|
|[2551915.0,3.0,26...|     1|
|[2098089.0,3.0,55...|     1|
|[2551922.0,3.0,65...|     0|
|[2551922.0,3.0,61...|     1|
|[2098089.0,3.0,30...|     1|
|[2551963.0,3.0,61...|     1|
|[2551992.0,3.0,61...|     0|
|[2098328.0,3.0,32...|     1|
|[2551992.0,3.0,37...|     1|
|[2098342.0,3.0,14...|     1|
|[2552033.0,3.0,61...|     0|
|[2098438.0,3.0,49...|     1|
|[2098438.0,3.0,30...|     1|
|[2552156.0,3.0,62...|     0|
|[2098438.0,3.0,13...|     1|
|[2552184.0,3.0,62...|     1|
|[2098540.0,3.0,27...|     0|
|[2552184.0,3.0,29...|     0|
|[2098540.0,3.0,51...|     0|
|[2098903.

                                                                                

In [34]:
(train, test )= data.randomSplit([0.8, 0.2], seed=10)


Logistic Regression :


In [35]:
from pyspark.ml.classification import LogisticRegression
#Create a Logistic Regression classifier.
logistic = LogisticRegression(labelCol = "Fraude", featuresCol = "features")
# Learn from the training data.
lrModel = logistic.fit(train)

23/05/19 18:08:05 WARN DAGScheduler: Broadcasting large task binary with size 21.9 MiB
23/05/19 18:08:08 ERROR Executor: Exception in task 3.0 in stage 70.0 (TID 113)]
org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (VectorAssembler$$Lambda$4062/95814728: (struct<code_client:double,TEL_MOBILE:double,code_pret_F:double,date_de_deboursement_F:double,produit_F:double,agence_F:double,montant_demander:double,montant_accorder:double,duree_F:double,valider_par_F:double,secteur_activite_F:double,code_secteur_du_client:double,code_secteur_projet:double,gouvernorat_F:double,status_pret_F:double,Accor_Payoff_F:double,id:double,ACCOUNT_OFFICER:double,INDUSTRY:double,NATIONALITY_F:double,GENDER_F:double,SECTOR:double,MARITAL_STATUS_F:double,LR_CARTE_BQ_F:double,LR_CUS_MNT_REV:double,SUBURB_TOWN:double,AGE_double_VectorAssembler_874f1c3c72ec:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>).
	at org.apache.spark.sql.error

Py4JJavaError: An error occurred while calling o952.fit.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 3 in stage 70.0 failed 1 times, most recent failure: Lost task 3.0 in stage 70.0 (TID 113) (b0e1bb9c76b4 executor driver): org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (VectorAssembler$$Lambda$4062/95814728: (struct<code_client:double,TEL_MOBILE:double,code_pret_F:double,date_de_deboursement_F:double,produit_F:double,agence_F:double,montant_demander:double,montant_accorder:double,duree_F:double,valider_par_F:double,secteur_activite_F:double,code_secteur_du_client:double,code_secteur_projet:double,gouvernorat_F:double,status_pret_F:double,Accor_Payoff_F:double,id:double,ACCOUNT_OFFICER:double,INDUSTRY:double,NATIONALITY_F:double,GENDER_F:double,SECTOR:double,MARITAL_STATUS_F:double,LR_CARTE_BQ_F:double,LR_CUS_MNT_REV:double,SUBURB_TOWN:double,AGE_double_VectorAssembler_874f1c3c72ec:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:217)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:260)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:260)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$4(RDD.scala:1234)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$6(RDD.scala:1235)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "error". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:291)
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
	at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
	... 32 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2785)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2721)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2720)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2720)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1206)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1206)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2984)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2923)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2912)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:971)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2263)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2358)
	at org.apache.spark.rdd.RDD.$anonfun$fold$1(RDD.scala:1172)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.RDD.fold(RDD.scala:1166)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$2(RDD.scala:1259)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1226)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$1(RDD.scala:1212)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:405)
	at org.apache.spark.rdd.RDD.treeAggregate(RDD.scala:1212)
	at org.apache.spark.ml.stat.Summarizer$.getClassificationSummarizers(Summarizer.scala:233)
	at org.apache.spark.ml.classification.LogisticRegression.$anonfun$train$1(LogisticRegression.scala:517)
	at org.apache.spark.ml.util.Instrumentation$.$anonfun$instrumented$1(Instrumentation.scala:191)
	at scala.util.Try$.apply(Try.scala:213)
	at org.apache.spark.ml.util.Instrumentation$.instrumented(Instrumentation.scala:191)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:497)
	at org.apache.spark.ml.classification.LogisticRegression.train(LogisticRegression.scala:287)
	at org.apache.spark.ml.Predictor.fit(Predictor.scala:114)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.lang.Thread.run(Thread.java:750)
Caused by: org.apache.spark.SparkException: [FAILED_EXECUTE_UDF] Failed to execute user defined function (VectorAssembler$$Lambda$4062/95814728: (struct<code_client:double,TEL_MOBILE:double,code_pret_F:double,date_de_deboursement_F:double,produit_F:double,agence_F:double,montant_demander:double,montant_accorder:double,duree_F:double,valider_par_F:double,secteur_activite_F:double,code_secteur_du_client:double,code_secteur_projet:double,gouvernorat_F:double,status_pret_F:double,Accor_Payoff_F:double,id:double,ACCOUNT_OFFICER:double,INDUSTRY:double,NATIONALITY_F:double,GENDER_F:double,SECTOR:double,MARITAL_STATUS_F:double,LR_CARTE_BQ_F:double,LR_CUS_MNT_REV:double,SUBURB_TOWN:double,AGE_double_VectorAssembler_874f1c3c72ec:double>) => struct<type:tinyint,size:int,indices:array<int>,values:array<double>>).
	at org.apache.spark.sql.errors.QueryExecutionErrors$.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala:217)
	at org.apache.spark.sql.errors.QueryExecutionErrors.failedExecuteUserDefinedFunctionError(QueryExecutionErrors.scala)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.sort_addToSorter_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenExec$$anon$1.hasNext(WholeStageCodegenExec.scala:760)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator$$anon$10.hasNext(Iterator.scala:460)
	at scala.collection.Iterator.foreach(Iterator.scala:943)
	at scala.collection.Iterator.foreach$(Iterator.scala:943)
	at scala.collection.AbstractIterator.foreach(Iterator.scala:1431)
	at scala.collection.TraversableOnce.foldLeft(TraversableOnce.scala:199)
	at scala.collection.TraversableOnce.foldLeft$(TraversableOnce.scala:192)
	at scala.collection.AbstractIterator.foldLeft(Iterator.scala:1431)
	at scala.collection.TraversableOnce.aggregate(TraversableOnce.scala:260)
	at scala.collection.TraversableOnce.aggregate$(TraversableOnce.scala:260)
	at scala.collection.AbstractIterator.aggregate(Iterator.scala:1431)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$4(RDD.scala:1234)
	at org.apache.spark.rdd.RDD.$anonfun$treeAggregate$6(RDD.scala:1235)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2(RDD.scala:853)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitions$2$adapted(RDD.scala:853)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:92)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:139)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:554)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1529)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:557)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more
Caused by: org.apache.spark.SparkException: Encountered null while assembling a row with handleInvalid = "error". Consider
removing nulls from dataset or using handleInvalid = "keep" or "skip".
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1(VectorAssembler.scala:291)
	at org.apache.spark.ml.feature.VectorAssembler$.$anonfun$assemble$1$adapted(VectorAssembler.scala:260)
	at scala.collection.IndexedSeqOptimized.foreach(IndexedSeqOptimized.scala:36)
	at scala.collection.IndexedSeqOptimized.foreach$(IndexedSeqOptimized.scala:33)
	at scala.collection.mutable.WrappedArray.foreach(WrappedArray.scala:38)
	at org.apache.spark.ml.feature.VectorAssembler$.assemble(VectorAssembler.scala:260)
	at org.apache.spark.ml.feature.VectorAssembler.$anonfun$transform$6(VectorAssembler.scala:143)
	... 32 more


In [None]:
prediction_LR = lrModel.transform(test)
