#Cas pratique de Machine Learning avec Spark

Il s'agit pour nous dans cet exercice de travailler avec une serie de données issues de cartes de credits,
et de predire grace elles si un individu est un bon creantier (1) ou non (0) en utilisant trois modèles de machine
learning

In [2]:
#Initialiser Spark et importer les librairies requis

In [7]:
# "Trouver" PySpark et demarrer une session Spark
import findspark
findspark.init()
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()

In [8]:
#Importer les librairies requis
from pyspark.ml.feature import VectorAssembler, StringIndexer, VectorIndexer, MinMaxScaler
from pyspark.ml import Pipeline
from pyspark.sql.functions import *

from pyspark.ml.classification import LogisticRegression
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.classification import RandomForestClassifier

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [9]:
from pyspark.sql.types import *
from pyspark.sql.functions import *

from pyspark.ml.evaluation import MulticlassClassificationEvaluator

import pyspark

1- Chargement des données

Les données sont ressemblées dans un fichier texte ccdefault.csv (dans le dossier data)
dont la première ligne contient le nom des colonnes,
nous les lirons dans un dataframe.


In [15]:
# Lire le fichier csv
data = spark.read.csv("./data/ccdefault.csv", header = True, inferSchema = True)


In [16]:
#Presentation sommaire
print("Nombre d'observations:", data.count())
print("Nombre de colonnes:", len(data.schema.names))
print("Nom des colonnes:", data.schema.names)

Nombre d'observations: 30000
Nombre de colonnes: 25
Nom des colonnes: ['ID', 'LIMIT_BAL', 'SEX', 'EDUCATION', 'MARRIAGE', 'AGE', 'PAY_0', 'PAY_2', 'PAY_3', 'PAY_4', 'PAY_5', 'PAY_6', 'BILL_AMT1', 'BILL_AMT2', 'BILL_AMT3', 'BILL_AMT4', 'BILL_AMT5', 'BILL_AMT6', 'PAY_AMT1', 'PAY_AMT2', 'PAY_AMT3', 'PAY_AMT4', 'PAY_AMT5', 'PAY_AMT6', 'DEFAULT']


In [17]:
print(data.select("ID", "LIMIT_BAL", "SEX", "EDUCATION", "MARRIAGE", "AGE", "PAY_0", "PAY_0", 
                  "BILL_AMT1", "BILL_AMT2", "DEFAULT").show(5))

+---+---------+---+---------+--------+---+-----+-----+---------+---------+-------+
| ID|LIMIT_BAL|SEX|EDUCATION|MARRIAGE|AGE|PAY_0|PAY_0|BILL_AMT1|BILL_AMT2|DEFAULT|
+---+---------+---+---------+--------+---+-----+-----+---------+---------+-------+
|  1|    20000|  2|        2|       1| 24|    2|    2|     3913|     3102|      1|
|  2|   120000|  2|        2|       2| 26|   -1|   -1|     2682|     1725|      1|
|  3|    90000|  2|        2|       2| 34|    0|    0|    29239|    14027|      0|
|  4|    50000|  2|        2|       1| 37|    0|    0|    46990|    48233|      0|
|  5|    50000|  1|        2|       1| 57|   -1|   -1|     8617|     5670|      0|
+---+---------+---+---------+--------+---+-----+-----+---------+---------+-------+
only showing top 5 rows

None


In [18]:
print("Nombre de clients fiable Default == 1:", data.filter(data.DEFAULT == 1).count())
print("Nombre de clients non fiable Default == 0:", data.filter(data.DEFAULT == 0).count())

Nombre de clients fiable Default == 1: 6636
Nombre de clients non fiable Default == 0: 23364


On contaste qu'une très grande majorité de client ne sont pas fiables

#Mise en oeuvre de la pipeline

Les données seront scindées en deux parties (données d'entrainement et données test).
Les données numeriques seront normalisées.
Cependant les données des champs allant de PAY_0 à PAY_6 sont en realité des données labelisées et devraient être 
factorisées; mais dans notre exercices cela n'est pas vraiment important 
dans cet exercice 


In [20]:
#Abondon de l'ID des colonnes
data = data.select(data.schema.names[1:])

#Scinder le jeux de données en deux parties (entrainement et test)
splits = data.randomSplit([0.75, 0.25])
data_train = splits[0]
data_test = splits[1]

#Convertir les champs qualitatifs (SEX, EDUCATION, MARRIAGE)
categorical_features = data.schema.names[1:4]
catVect = VectorAssembler(inputCols = categorical_features, outputCol = "catFeatures")
catIdx = VectorIndexer(inputCol = catVect.getOutputCol(), outputCol = "idxCatFeatures")

#Normaliser les champs quantitatifs
numerical_features = data.schema.names[0:1] + data.schema.names[4:]
numVect = VectorAssembler(inputCols = numerical_features, outputCol = "numFeatures")
minMax = MinMaxScaler(inputCol = numVect.getOutputCol(), outputCol = "normFeatures")

# Definir le pipeline 
featVect = VectorAssembler(inputCols=["idxCatFeatures", "normFeatures"], outputCol = "features")
pipeline = Pipeline(stages = [catVect, catIdx, numVect, minMax, featVect])
pipeline_object = pipeline.fit(data_train)

#Exécuter les données d'entrainement et de test à travers le pipeline
data_train = pipeline_object.transform(data_train).select("features", col("DEFAULT").alias("label"))
data_test = pipeline_object.transform(data_test).select("features", col("DEFAULT").alias("label"))

21/09/10 19:51:14 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


In [21]:
print(data_train.show(5))
print("Nombre de données pour l'entrainement /le test:", data_train.count(), "/", data_test.count())


+--------------------+-----+
|            features|label|
+--------------------+-----+
|[0.0,1.0,1.0,0.0,...|    1|
|[0.0,1.0,1.0,0.0,...|    0|
|[0.0,1.0,1.0,0.0,...|    0|
|[0.0,1.0,1.0,0.0,...|    0|
|[0.0,1.0,1.0,0.0,...|    0|
+--------------------+-----+
only showing top 5 rows

None
Nombre de données pour l'entrainement /le test: 22458 / 7542


In [22]:
#ENTRAINEMENT ET EVALUATION DES MODELES DE CLASSIFICATION

Définir les fonctions des métriques d'évaluation

In [27]:
accuracy = MulticlassClassificationEvaluator(
    labelCol = "label", predictionCol = "prediction", metricName = "accuracy")
precision = MulticlassClassificationEvaluator(
    labelCol = "label", predictionCol = "prediction", metricName = "weightedPrecision")
recall = MulticlassClassificationEvaluator(
    labelCol = "label", predictionCol = "prediction", metricName = "weightedRecall")

In [33]:
#Logistic regression

logit = LogisticRegression(labelCol = "label", featuresCol = "features", maxIter = 20, regParam = 0.2)
model = logit.fit(data_train)
predictions_df = model.transform(data_test)


print("performance: {:.4}".format(accuracy.evaluate(predictions_df)))
print("Précision pondérée: {:.4}".format(precision.evaluate(predictions_df)))
print("Rappel pondéré: {:.4}".format(recall.evaluate(predictions_df)))

performance: 0.9801
Précision pondérée: 0.9805
Rappel pondéré: 0.9801


In [29]:
#Decision tree
tree = DecisionTreeClassifier(labelCol = "label", featuresCol = "features", maxDepth = 4, maxBins = 32, 
                              minInstancesPerNode = 1, minInfoGain = 0.0, impurity = "gini", seed = 123)
model = tree.fit(data_train)
predictions_df = model.transform(data_test)

print("performance: {:.4}".format(accuracy.evaluate(predictions_df)))
print("Précision pondérée: {:.4}".format(precision.evaluate(predictions_df)))
print("Rappel pondéré: {:.4}".format(recall.evaluate(predictions_df)))

Précision: 1.0
Précision pondérée: 1.0
Rappel pondéré: 1.0


In [34]:
#Random forest
rf = RandomForestClassifier(labelCol = "label", featuresCol = "features", maxDepth = 4, maxBins = 32, 
                            minInstancesPerNode = 1, minInfoGain=0.0, impurity = "gini", numTrees = 10, seed = 123) 
model = rf.fit(data_train)
predictions_df = model.transform(data_test)

print("performance: {:.4}".format(accuracy.evaluate(predictions_df)))
print("Précision pondérée: {:.4}".format(precision.evaluate(predictions_df)))
print("Rappel pondéré: {:.4}".format(recall.evaluate(predictions_df)))

performance: 1.0
Précision pondérée: 1.0
Rappel pondéré: 1.0


In [35]:
#Calculer les métriques (non pondérées) manuellement
tp = int(predictions_df.filter("prediction == 1.0 AND label == 1").count())
fp = int(predictions_df.filter("prediction == 1.0 AND label == 0").count())
tn = int(predictions_df.filter("prediction == 0.0 AND label == 0").count())
fn = int(predictions_df.filter("prediction == 0.0 AND label == 1").count())

print("vrais positifs:", tp)
print("faux positifs:", fp)
print("vrais negatifs:", tn)
print("faux negatifs:", fn)

print("performance: {:.4}".format((tp+tn)/(tp+fp+tn+fn)))
print("Precision: {:.4}".format((tp)/(tp+fp)))
print("Rappel: {:.4}".format((tp)/(tp+fn)))

vrais positifs: 1665
faux positifs: 0
vrais negatifs: 5877
faux negatifs: 0
performance: 1.0
Precision: 1.0
Rappel: 1.0


Au vu des performances mesurées on peut sans contexte dire "Random forest" et le "decision tree" sont indiquées pour 
cet apprentissage, même si la "logistic regression" donne aussi de bons résultats.

Avec ces données, on peut essayer de predire le montant idéal
c'est à dire assez proche de ce que le client souhaite tout en etant certain que celui rembourse. 

On avait aussi la possibilité de faire cet exercice avec la librairie sklearn,
il est curieux de savoir quels resultats l'on aurait obtenus