# PysPark ML Lib - Prédiction du diabète par régression logistique

> Réalisé par : Abdelmajid EL HOU - Consultant Data <br>
[ePortfolio](https://abdelmajidlh.github.io/ePortfolio/)  | [Github](https://github.com/AbdelmajidLh) | [Linkedin](https://www.linkedin.com/in/aelhou/)

## Contexte & Objectif

Le jeu de données provient du *National Institute of Diabetes and Digestive and Kidney Diseases*. L'objectif est de **prédire**, à partir de mesures diagnostiques, si un patient est diabétique. 

Le dataset est composé uniquement de femmes (> 21 ans) et est disponible sur Kaggle ([lien ici](https://www.kaggle.com/datasets/mathchi/diabetes-data-set)).

## Installation & importation des librairies

In [45]:
# installation des librairies
#! pip install pyspark

from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import BinaryClassificationEvaluator
from pyspark.ml.classification import LogisticRegressionModel
import pandas as ps

## lancer une session Spark (sparkSession)

Ce code crée une nouvelle session `Spark`.

La méthode `SparkSession.builder()` crée un nouveau builder pour construire une session `Spark`. L'appel à `appName("spark")` définit le nom de l'application Spark. Enfin, `getOrCreate()` crée une nouvelle session Spark si elle n'existe pas déjà, ou retourne la session existante si elle existe. La nouvelle session Spark est stockée dans la variable `spark`.

In [4]:
spark = SparkSession.builder.appName("spark").getOrCreate()

## Importation & exploration des données

Le dataset est téléchargé à partir de github. Ces données viennent de Kaggle.



In [5]:
#! git clone https://github.com/education454/diabetes_dataset
df = spark.read.csv('diabetes_dataset/diabetes.csv', header = True, inferSchema=True) # predit le type de colonnes
df.show(6)

Cloning into 'diabetes_dataset'...
remote: Enumerating objects: 6, done.[K
remote: Counting objects: 100% (6/6), done.[K
remote: Compressing objects: 100% (5/5), done.[K
remote: Total 6 (delta 0), reused 0 (delta 0), pack-reused 0[K
Unpacking objects: 100% (6/6), 13.00 KiB | 4.33 MiB/s, done.
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|      0|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|            0|            0|      0|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
| 

la colonne `Outcome` est la variable de sortie. `0 : normal, 1 : diabétique`.

In [6]:
# vérifier le type des colonnes dans le df
df.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)



In [7]:
# Vérifier les dimensions du df
print(df.count(), ':', len(df.columns))

2000 : 9


In [8]:
# Calculer le nombre de malades et de normaux dans le dataset
df.groupBy("Outcome").count().show()

+-------+-----+
|Outcome|count|
+-------+-----+
|      1|  684|
|      0| 1316|
+-------+-----+



* Nous avons 684 malades sur l'ensemble des 2000 individus.

In [9]:
# faire un describe sur le df
df.describe().show()

+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|summary|      Pregnancies|           Glucose|     BloodPressure|    SkinThickness|          Insulin|               BMI|DiabetesPedigreeFunction|               Age|           Outcome|
+-------+-----------------+------------------+------------------+-----------------+-----------------+------------------+------------------------+------------------+------------------+
|  count|             2000|              2000|              2000|             2000|             2000|              2000|                    2000|              2000|              2000|
|   mean|           3.7035|          121.1825|           69.1455|           20.935|           80.254|32.192999999999984|     0.47092999999999974|           33.0905|             0.342|
| stddev|3.306063032730656|32.068635649902916|19.188314815604098|16.103242909926

Le tableau montre les statistiques basiques pour les colonnes numériques. La valeur minimale pour le glucose, l'insuline et la pression sanguine est égale à `0` !. Ces valeurs necessitent d'être nettoyées.

## Nettoyage des données - data cleaning

In [10]:
# Vérifier les valeurs manquantes dans le df
for col in df.columns:
  print(col + ":", df[df[col].isNull()].count())

Pregnancies: 0
Glucose: 0
BloodPressure: 0
SkinThickness: 0
Insulin: 0
BMI: 0
DiabetesPedigreeFunction: 0
Age: 0
Outcome: 0


Notre *dataset* ne contiens aucune valeur manquante.

In [11]:
# créer une fonction pour compter le nombre de valeurs 0 et leur pourcentage par colonne
def count_zeros(df, columns):
  for col in columns:
    num_zeros = df.filter(df[col] == 0).count()
    total_rows = df.count()
    percentage = (num_zeros / total_rows) * 100
    print("{} : {} ({:.2f}%)".format(col, num_zeros, percentage))

In [12]:
liste_cols = ['Glucose', 'Bloodpressure', 'SkinThickness', 'Insulin', 'BMI']
count_zeros(df, liste_cols)

Glucose : 13 (0.65%)
Bloodpressure : 90 (4.50%)
SkinThickness : 573 (28.65%)
Insulin : 956 (47.80%)
BMI : 28 (1.40%)


Ces pourcentages indiquent que la colonne Glucose a le plus faible pourcentage de valeurs nulles, avec seulement 0,65%. La colonne Insulin a le pourcentage le plus élevé de valeurs nulles, avec 47,80%. Les autres colonnes ont des pourcentages de valeurs nulles compris entre 1,40% et 28,65%.

In [13]:
# afficher la valeur moyenne pour chaque colonne et faire le remplacement
## méthode 1 :
for i in df.columns[1:6]:
  mean_val = df.agg({i:'mean'}).first()[0]
  print("la valeur moyenne de la colonne {} est : {}".format(i, int(mean_val)))
  # update the values : si la condition (val ==0) est vrai
  df = df.withColumn(i, when(df[i]==0, int(mean_val)).otherwise(df[i]))

df.show(10)

la valeur moyenne de la colonne Glucose est : 121
la valeur moyenne de la colonne BloodPressure est : 69
la valeur moyenne de la colonne SkinThickness est : 20
la valeur moyenne de la colonne Insulin est : 80
la valeur moyenne de la colonne BMI est : 32
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+
|          2|    138|           62|           35|     80|33.6|                   0.127| 47|      1|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|
|          0|    145|           69|           20|     80|44.2|                    0.63| 31|      1|
|          0|    135|           68|           42|    250|42.3|                   0.365| 24|      1|
|          1|    139|           62|           

> Les valeurs sont bien remplacées.

## Construire et entrainer le modèle de machine learning
### Feature ingeneering

In [16]:
# calculer la corrélation entre la variable de réponse et les autres variables
for col in df.columns:
  print('La correlation de  {} avec la variable outcome est {}.'.format(col, df.stat.corr('Outcome', col)))
  #print(f'La correlation de  {col} avec la variable outcome est {df.stat.corr('Outcome', col)}.')

La correlation de  Pregnancies avec la variable outcome est 0.22443699263363961.
La correlation de  Glucose avec la variable outcome est 0.48796646527321064.
La correlation de  BloodPressure avec la variable outcome est 0.17171333286446713.
La correlation de  SkinThickness avec la variable outcome est 0.1659010662889893.
La correlation de  Insulin avec la variable outcome est 0.1711763270226193.
La correlation de  BMI avec la variable outcome est 0.2827927569760082.
La correlation de  DiabetesPedigreeFunction avec la variable outcome est 0.1554590791569403.
La correlation de  Age avec la variable outcome est 0.23650924717620253.
La correlation de  Outcome avec la variable outcome est 1.0.


Les résultats de corrélation indiquent que certains des facteurs peuvent avoir un impact significatif sur le résultat de la régression logistique. 

* Les variables `Glucose` et `BMI` ont les **plus fortes corrélations** avec le résultat, ce qui signifie qu'elles sont les plus susceptibles d'avoir un impact sur le résultat de la régression logistique. 
* Les variables `Pregnancies`, `BloodPressure`, `SkinThickness`, `Insulin` et `DiabetesPedigreeFunction` ont des **corrélations plus faibles** avec le résultat, ce qui signifie qu'elles sont moins susceptibles d'avoir un impact sur le résultat de la régression logistique. 
* La variable `Age` a une corrélation **modérée** avec le résultat, ce qui signifie qu'elle peut avoir un impact modéré sur le résultat de la régression logistique.

In [19]:
# creer un vectorAssembler : c'est un feature transformer qui merge les différentes colonnes dans un seul vecteur (features).
inputCols = ['Pregnancies' , 'Glucose' , 'BloodPressure', 'SkinThickness' , 'Insulin' , 'BMI' , 'DiabetesPedigreeFunction' ,'Age'  ]
assembler = VectorAssembler(inputCols= inputCols, outputCol='features')
output_data = assembler.transform(df)

In [20]:
# vérifier si la colonne features est rajoutée au dataframe
output_data.printSchema()

root
 |-- Pregnancies: integer (nullable = true)
 |-- Glucose: integer (nullable = true)
 |-- BloodPressure: integer (nullable = true)
 |-- SkinThickness: integer (nullable = true)
 |-- Insulin: integer (nullable = true)
 |-- BMI: double (nullable = true)
 |-- DiabetesPedigreeFunction: double (nullable = true)
 |-- Age: integer (nullable = true)
 |-- Outcome: integer (nullable = true)
 |-- features: vector (nullable = true)



In [23]:
# afficher les données (3 lignes)
output_data.show(3)

+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|Pregnancies|Glucose|BloodPressure|SkinThickness|Insulin| BMI|DiabetesPedigreeFunction|Age|Outcome|            features|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
|          2|    138|           62|           35|     80|33.6|                   0.127| 47|      1|[2.0,138.0,62.0,3...|
|          0|     84|           82|           31|    125|38.2|                   0.233| 23|      0|[0.0,84.0,82.0,31...|
|          0|    145|           69|           20|     80|44.2|                    0.63| 31|      1|[0.0,145.0,69.0,2...|
+-----------+-------+-------------+-------------+-------+----+------------------------+---+-------+--------------------+
only showing top 3 rows



## Entrainer le modèle

In [25]:
# selectionner les colonnes d'interer
final_df = output_data.select('features', 'Outcome')
final_df.show(2)

+--------------------+-------+
|            features|Outcome|
+--------------------+-------+
|[2.0,138.0,62.0,3...|      1|
|[0.0,84.0,82.0,31...|      0|
+--------------------+-------+
only showing top 2 rows



In [26]:
# split to training (70%) and test (30%)
train, test = final_df.randomSplit([0.7, 0.3])

# créer le modèle
models = LogisticRegression(labelCol='Outcome')

# entrainer le modèle
model = models.fit(train)

In [33]:
# afficher le sommaire
summary = model.summary

In [37]:
## les prédictions
summary.predictions.show()
summary.predictions.describe().show()

+--------------------+-------+--------------------+--------------------+----------+
|            features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|[0.0,57.0,60.0,20...|    0.0|[4.00332216107346...|[0.98207237452611...|       0.0|
|[0.0,57.0,60.0,20...|    0.0|[4.00332216107346...|[0.98207237452611...|       0.0|
|[0.0,67.0,76.0,20...|    0.0|[2.49183817491278...|[0.92356766128755...|       0.0|
|[0.0,67.0,76.0,20...|    0.0|[2.49183817491278...|[0.92356766128755...|       0.0|
|[0.0,73.0,69.0,20...|    0.0|[4.30571835570287...|[0.98668839876298...|       0.0|
|[0.0,74.0,52.0,10...|    0.0|[3.72867897545375...|[0.97653908603919...|       0.0|
|[0.0,74.0,52.0,10...|    0.0|[3.72867897545375...|[0.97653908603919...|       0.0|
|[0.0,74.0,52.0,10...|    0.0|[3.72867897545375...|[0.97653908603919...|       0.0|
|[0.0,78.0,88.0,29...|    0.0|[2.83901818968011...|[0.94474823506578...|    

## Evaluation du modèle

`BinaryClassificationEvaluator` de pysparkML est un outil d'évaluation qui permet de **mesurer la performance d'un modèle de classification binaire**. Il fournit des métriques telles que la `précision`, le `rappel (recall`), l'`aire sous la courbe ROC (AUC) : par défaut` et la `précision-rappel`. Ces métriques peuvent être utilisées pour comparer les **performances** des modèles et ainsi déterminer le meilleur modèle à utiliser.

In [39]:
# feed test data in the model and evaluate it
predictions = model.evaluate(test)

In [42]:
# voir les prédictions
predictions.predictions.show(15)

+--------------------+-------+--------------------+--------------------+----------+
|            features|Outcome|       rawPrediction|         probability|prediction|
+--------------------+-------+--------------------+--------------------+----------+
|[0.0,73.0,69.0,20...|      0|[4.30571835570287...|[0.98668839876298...|       0.0|
|[0.0,78.0,88.0,29...|      0|[2.83901818968011...|[0.94474823506578...|       0.0|
|[0.0,84.0,64.0,22...|      0|[2.62281665948290...|[0.93231566313563...|       0.0|
|[0.0,84.0,64.0,22...|      0|[2.62281665948290...|[0.93231566313563...|       0.0|
|[0.0,84.0,82.0,31...|      0|[2.77536879386290...|[0.94133019594463...|       0.0|
|[0.0,91.0,68.0,32...|      0|[2.37396197071794...|[0.91482010212770...|       0.0|
|[0.0,91.0,80.0,20...|      0|[2.57197052327726...|[0.92903571915015...|       0.0|
|[0.0,93.0,60.0,25...|      0|[2.87871469953420...|[0.94678414228822...|       0.0|
|[0.0,94.0,69.0,20...|      0|[2.74622565305314...|[0.93969983462034...|    

Dans la plupart des cas (sur les 15 lignes affichées), le modèle de regression prédit bien le résultats.

In [44]:
from pyspark.ml import evaluation
# évaluer le modèle
evaluator = BinaryClassificationEvaluator(rawPredictionCol='rawPrediction', labelCol='Outcome')
evaluator.evaluate(model.transform(test))

0.8306818181818183

Les résultats montrent que le modèle de régression a une **précision** de **83%**, ce qui est assez bon. Cela signifie que le modèle est capable de prédire avec une précision élevée si un individu aura un résultat positif (malade) ou négatif (diabétique).

## Sauvegarder le modèle

> On sauvegarde un modèle de machine learning afin de pouvoir le réutiliser à l'avenir. En effet, une fois que le modèle a été entraîné et qu'il fonctionne de manière optimale, il est important de le sauvegarder afin de pouvoir le réutiliser pour de nouvelles prédictions. Cela permettra d'économiser du temps et des ressources, car il n'est pas nécessaire de retrainer le modèle pour chaque nouvelle prédiction.

In [None]:
model.save("LogReg_model")

## Réutiliser le modèle sauvegardé

In [51]:
model = LogisticRegressionModel.load('LogReg_model')

## Sources :

[School of Disruptive Innovation](https://www.udemy.com/course/data-science-hands-on-diabetes-prediction-with-pyspark-mllib/learn/lecture/20880872#overview)