## Entrainement de differents modèles pour calculer les probabilités de retour à l'emploi

In [1]:
from pyspark.sql import SparkSession

### Rechargement du dataset Personnes

In [2]:
%%time
MASTER="yarn"           # Spark distant sur CDP  (Spark pushdown)
# MASTER="local[*]"     # Spark local sur CML

spark = SparkSession.builder.appName("1_entrainement.ipynb").master(MASTER).getOrCreate()


[0;31m7.1.9 and 7.2.17 are the last CDP runtime releases where Spark 2 is supported.
Please migrate your Spark 2 applications to Spark 3.

Updating Spark 2 applications for Spark 3:
https://docs.cloudera.com/runtime/7.2.16/running-spark-applications/topics/spark-update-spark2-spark3.html
[0m
CPU times: user 98.4 ms, sys: 66.6 ms, total: 165 ms
Wall time: 1min 11s


In [3]:
sdf=spark.sql("select * from olivier.personnes")

Hive Session ID = e48777c5-4c54-4ce2-8ede-c9e9c1ccbb72


### Initialisation des experimentation MLFLOW pour l'Atelier 1

In [4]:
sdf.show(5)

[Stage 0:>                                                          (0 + 1) / 1]

+-----+---+-----+----------------+-------------+--------------+--------------------+---------+------------------+----------------+--------------------+--------------+-------------+
|   id|age| sexe|niveau_education|duree_chomage|experience_ant|          competence|formation|taux_chomage_local|secteur_activite|reseau_professionnel|support_social|retour_emploi|
+-----+---+-----+----------------+-------------+--------------+--------------------+---------+------------------+----------------+--------------------+--------------+-------------+
|95167| 42|homme|         LICENSE|            5|             8|    Public librarian|     true| 11.50570330550432|           Group|                  95|             1|            1|
|76849| 39|femme|         BTS/DUT|           17|             6| Associate Professor|     true| 2.208842935285328|             LLC|                  70|             7|            0|
|36869| 18|femme|          MASTER|           10|            13|              Lawyer|     true| 

                                                                                

In [5]:
# mlflow.set_experiment ("Experimentations Atelier 1")

## Lancement d'une Regression logistique de SparkML

In [6]:
sdf.printSchema()

root
 |-- id: long (nullable = true)
 |-- age: long (nullable = true)
 |-- sexe: string (nullable = true)
 |-- niveau_education: string (nullable = true)
 |-- duree_chomage: long (nullable = true)
 |-- experience_ant: long (nullable = true)
 |-- competence: string (nullable = true)
 |-- formation: boolean (nullable = true)
 |-- taux_chomage_local: double (nullable = true)
 |-- secteur_activite: string (nullable = true)
 |-- reseau_professionnel: long (nullable = true)
 |-- support_social: long (nullable = true)
 |-- retour_emploi: long (nullable = true)



In [7]:
sdf=sdf.drop('id')      # Suppression colonne id non requise pour l'étude
sdf.printSchema()

root
 |-- age: long (nullable = true)
 |-- sexe: string (nullable = true)
 |-- niveau_education: string (nullable = true)
 |-- duree_chomage: long (nullable = true)
 |-- experience_ant: long (nullable = true)
 |-- competence: string (nullable = true)
 |-- formation: boolean (nullable = true)
 |-- taux_chomage_local: double (nullable = true)
 |-- secteur_activite: string (nullable = true)
 |-- reseau_professionnel: long (nullable = true)
 |-- support_social: long (nullable = true)
 |-- retour_emploi: long (nullable = true)



#### Extraction des variables categorielles

In [8]:
from pyspark.ml import Pipeline
from pyspark.ml.feature import StringIndexer, OneHotEncoder

In [9]:
variables = sdf.columns[0:-1]   # Toutes les variables sauf la derniere qui est la variable à étudier

In [10]:
# Extract les colonnes categories (types string)
cat_cols=[]
feature_cols=[]
for colname, coltype in sdf.dtypes[1:-1]:
    if ( coltype == 'string'):
        cat_cols += [colname]
    else:
        feature_cols += [colname]


In [11]:
cat_cols

['sexe', 'niveau_education', 'competence', 'secteur_activite']

In [12]:
feature_cols

['duree_chomage',
 'experience_ant',
 'formation',
 'taux_chomage_local',
 'reseau_professionnel',
 'support_social']

#### Categorise (index) et encode les variables à catégoriser

In [13]:
stages = [] 

for colname in cat_cols:

   # Assigne un indice aux variable categorielle
   indexer = StringIndexer(inputCol=colname, outputCol=colname + "_index") 
        
   encoder = OneHotEncoder(inputCol=indexer.getOutputCol(), outputCol=colname + "_vec")  

   stages       += [indexer, encoder]             # A jout des étapes au pipeline de tranformation

#### Transforme toutes les variables en vecteur compréhensible pour la régression

In [14]:
from pyspark.ml.feature import VectorAssembler

assemblerInputs = feature_cols + [c + "_vec" for c in cat_cols]              # Réassemblage les variables

assembler = VectorAssembler(inputCols=assemblerInputs, outputCol="features") # Vectorise les variables 


stages += [assembler]      # Ajout du Vector assembleur eu pipeline de transformation

stages

[StringIndexer_35c7fedcfe60,
 OneHotEncoder_af0db368b021,
 StringIndexer_673eeb876edd,
 OneHotEncoder_72f3f7654ee7,
 StringIndexer_8247688a63fa,
 OneHotEncoder_3461c25882ea,
 StringIndexer_9ba5debed4a5,
 OneHotEncoder_d9be2970f445,
 VectorAssembler_619da16e9ea1]

In [15]:
pipeline = Pipeline(stages=stages)           # Initialisation du pipeline de transformation  
sdf = pipeline.fit(sdf).transform(sdf)       # Transformation

                                                                                

In [16]:
sdf.printSchema()

root
 |-- age: long (nullable = true)
 |-- sexe: string (nullable = true)
 |-- niveau_education: string (nullable = true)
 |-- duree_chomage: long (nullable = true)
 |-- experience_ant: long (nullable = true)
 |-- competence: string (nullable = true)
 |-- formation: boolean (nullable = true)
 |-- taux_chomage_local: double (nullable = true)
 |-- secteur_activite: string (nullable = true)
 |-- reseau_professionnel: long (nullable = true)
 |-- support_social: long (nullable = true)
 |-- retour_emploi: long (nullable = true)
 |-- sexe_index: double (nullable = false)
 |-- sexe_vec: vector (nullable = true)
 |-- niveau_education_index: double (nullable = false)
 |-- niveau_education_vec: vector (nullable = true)
 |-- competence_index: double (nullable = false)
 |-- competence_vec: vector (nullable = true)
 |-- secteur_activite_index: double (nullable = false)
 |-- secteur_activite_vec: vector (nullable = true)
 |-- features: vector (nullable = true)



## Entrainement du modèle de régression logistiques SparkML

#### Echantillonage train/test

In [17]:
(train, test) = sdf.randomSplit([0.7, 0.3], seed=42)
print(train.count())
print(test.count())

                                                                                

13686




5814


                                                                                

#### Entrainement

In [18]:
from pyspark.ml.classification import LogisticRegression

model = LogisticRegression(labelCol= 'retour_emploi', featuresCol='features', maxIter= 10).fit(train)

                                                                                

#### Predictions

In [19]:
predictions = model.transform(test)

In [20]:
predictions.select("retour_emploi", "prediction", "probability").show(5)

+-------------+----------+--------------------+
|retour_emploi|prediction|         probability|
+-------------+----------+--------------------+
|            1|       0.0|[0.51789573209195...|
|            1|       0.0|[0.58739918253130...|
|            1|       0.0|[0.69725823233148...|
|            0|       0.0|[0.60449178609026...|
|            0|       1.0|[0.43923599971406...|
+-------------+----------+--------------------+
only showing top 5 rows



In [21]:
predictions.printSchema()

root
 |-- age: long (nullable = true)
 |-- sexe: string (nullable = true)
 |-- niveau_education: string (nullable = true)
 |-- duree_chomage: long (nullable = true)
 |-- experience_ant: long (nullable = true)
 |-- competence: string (nullable = true)
 |-- formation: boolean (nullable = true)
 |-- taux_chomage_local: double (nullable = true)
 |-- secteur_activite: string (nullable = true)
 |-- reseau_professionnel: long (nullable = true)
 |-- support_social: long (nullable = true)
 |-- retour_emploi: long (nullable = true)
 |-- sexe_index: double (nullable = false)
 |-- sexe_vec: vector (nullable = true)
 |-- niveau_education_index: double (nullable = false)
 |-- niveau_education_vec: vector (nullable = true)
 |-- competence_index: double (nullable = false)
 |-- competence_vec: vector (nullable = true)
 |-- secteur_activite_index: double (nullable = false)
 |-- secteur_activite_vec: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = t

#### Evaluation du modèle

In [22]:
from pyspark.ml.evaluation import BinaryClassificationEvaluator

evaluator = BinaryClassificationEvaluator(labelCol='retour_emploi')
evaluator.evaluate(predictions)

                                                                                

0.5022290520993371

In [23]:
# Confusion Matrix
matrix = predictions.crosstab('retour_emploi', 'prediction')

                                                                                

In [24]:
import pandas as pd

df = matrix.toPandas()


In [25]:
df

Unnamed: 0,retour_emploi_prediction,0.0,1.0
0,1,1455,1466
1,0,1470,1423


In [26]:
print(f'Precision   : {df.iloc[0,2] / (df.iloc[0,2] + df.iloc[1,2])}')
print(f'Recall      : {df.iloc[0,2] / (df.iloc[0,2] + df.iloc[0,1])}')
print(f'Specificity : {df.iloc[1,1] / (df.iloc[1,1] + df.iloc[1,2])}')

Precision   : 0.507442021460713
Recall      : 0.5018829168093119
Specificity : 0.5081230556515728


#### Tuning du modèle et logging avec MLFlow

In [27]:
import mlflow

from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

In [28]:
MAXITER=10                                 # Initialisation des paramètres de tuning

param_grid = {
    'regParam': [0.1, 0.3, 0.5],
    'elasticNetParam': [0.1, 0.5, 0.9]
}


lr = LogisticRegression(labelCol= 'retour_emploi', featuresCol='features', maxIter=MAXITER)

grid = ParamGridBuilder().addGrid(lr.regParam, param_grid['regParam'])\
                         .addGrid(lr.elasticNetParam, param_grid['elasticNetParam'])\
                         .build()

evaluator = BinaryClassificationEvaluator(labelCol='retour_emploi')

tvs = TrainValidationSplit(estimator=lr, estimatorParamMaps=grid, evaluator=evaluator)

model = tvs.fit(train)

                                                                                

In [31]:
best_params = model.bestModel.extractParamMap()

predictions = model.bestModel.transform(test)

auc         = evaluator.evaluate(predictions, {evaluator.metricName: "areaUnderROC"})

matrix      = predictions.crosstab('retour_emploi', 'prediction')
df          = matrix.toPandas()

precision   = df.iloc[0,2] / (df.iloc[0,2] + df.iloc[1,2])
recall      = df.iloc[0,2] / (df.iloc[0,2] + df.iloc[0,1])
specificity = df.iloc[1,1] / (df.iloc[1,1] + df.iloc[1,2])


                                                                                

In [34]:
mlflow.set_experiment("Atelier ML")


with mlflow.start_run():
    for param_name, param_value in best_params.items():
        mlflow.log_param(param_name.name, param_value)
        
    mlflow.log_metric( "auc"        , auc         )
    mlflow.log_metric( "precision"  , precision   )
    mlflow.log_metric( "recall"     , recall      )
    mlflow.log_metric( "specificity", specificity )
    


In [35]:
spark.stop()