# TP 3 : Machine learning avec Spark

#### Dans ce TP, on veut créer un modèle de classification entraîné sur les données qui ont été pré-traitées dans les TPs précédents. Pour que tout le monde reparte du même point, on télécharge le dataset prepared_trainingset (ce sont des fichiers parquet) situé dans le répertoire data.

In [29]:
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.IntegerType

import org.apache.spark.sql.SQLImplicits
import org.apache.spark.sql.{functions => F}

import org.apache.spark.ml.feature.{HashingTF, IDF, RegexTokenizer, StopWordsRemover, CountVectorizer, CountVectorizerModel}
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.OneHotEncoderEstimator
import org.apache.spark.ml.feature.VectorAssembler

import org.apache.spark.sql.functions._

import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.{Pipeline,PipelineModel}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}

import org.apache.spark.ml.tuning.TrainValidationSplit


import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.types.IntegerType
import org.apache.spark.sql.SQLImplicits
import org.apache.spark.sql.{functions=>F}
import org.apache.spark.ml.feature.{HashingTF, IDF, RegexTokenizer, StopWordsRemover, CountVectorizer, CountVectorizerModel}
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.OneHotEncoderEstimator
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.sql.functions._
import org.apache.spark.ml.linalg.Vectors
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.t...

In [30]:
    // Des réglages optionnels du job spark. Les réglages par défaut fonctionnent très bien pour ce TP.
    // On vous donne un exemple de setting quand même
    val conf = new SparkConf().setAll(Map(
      "spark.scheduler.mode" -> "FIFO",
      "spark.speculation" -> "false",
      "spark.reducer.maxSizeInFlight" -> "48m",
      "spark.serializer" -> "org.apache.spark.serializer.KryoSerializer",
      "spark.kryoserializer.buffer.max" -> "1g",
      "spark.shuffle.file.buffer" -> "32k",
      "spark.default.parallelism" -> "12",
      "spark.sql.shuffle.partitions" -> "12"
    ))

    // Initialisation du SparkSession qui est le point d'entrée vers Spark SQL (donne accès aux dataframes, aux RDD,
    // création de tables temporaires, etc., et donc aux mécanismes de distribution des calculs)
    val spark = SparkSession
      .builder
      .config(conf)
      .appName("TP 3 Spark : Machine Learning Avec Spark")
      .getOrCreate()

    /*******************************************************************************
      *         TP 3 : Machine learning avec Spark
      * Introduction
      * Chargement du DataFrame.
      * Utilisation des données textuelles
      *      Stage 1 : récupérer les mots des textes
      *      Stage 2 : retirer les stop words
      *      Stage 3 : computer la partie TF
      *      Stage 4 : computer la partie IDF
      * Conversion des variables catégorielles en variables numériques
      *      Stage 5 : convertir country2 en quantités numériques
      *      Stage 6 : convertir currency2 en quantités numériques
      *      Stages 7 et 8: One-Hot encoder ces deux catégories
      * Mettre les données sous une forme utilisable par Spark.ML
      *      Stage 9 : assembler tous les features en un unique vecteur
      *      Stage 10 : créer/instancier le modèle de classification
      * Création du Pipeline
      * Entraînement, test, et sauvegarde du modèle
      *      Split des données en training et test sets
      *      Entraînement du modèle
      *      Test du modèle
      * Réglage des hyper-paramètres (a.k.a. tuning) du modèle
      *      Grid search
      *      Test du modèle
      * Supplément
      *
      ********************************************************************************/
    println("\n")
    println("Hello World ! from Trainer")
    println("\n")



Hello World ! from Trainer




conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@7813b19d
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@352eebd6


#### Chargement du DataFrame :

In [31]:
//val sc = new SparkContext(sparkConf)
val sqlContext = new org.apache.spark.sql.SQLContext(sc)
import sqlContext.implicits._
val path_to_data : String = "/mnt/d/09_SPARK/git_Flooorent/cours-spark-telecom/data/prepared_trainingset/"
//import org.apache.spark.sql.sqlContext.implicits._
//val DataFrame = sqlContext.read.option("mergeSchema", "true").parquet("/mnt/d/09_SPARK/prepared_trainingset")
//val myDataFrame = spark.read.parquet("/mnt/d/09_SPARK/prepared_trainingset/*.parquet")
//val myDataFrameWithNull = spark.read.parquet(path_to_data+"*.parquet")
//myDataFrameWithNull.printSchema() 
val myDataFrame = spark.read.parquet(path_to_data+"*.parquet")
myDataFrame.printSchema() 

root
 |-- project_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- goal: integer (nullable = true)
 |-- keywords: string (nullable = true)
 |-- final_status: integer (nullable = true)
 |-- country2: string (nullable = true)
 |-- currency2: string (nullable = true)
 |-- deadline2: string (nullable = true)
 |-- created_at2: string (nullable = true)
 |-- launched_at2: string (nullable = true)
 |-- days_campaign: integer (nullable = true)
 |-- hours_prepa: double (nullable = true)
 |-- text: string (nullable = true)



sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@41e39d2b
import sqlContext.implicits._
path_to_data: String = /mnt/d/09_SPARK/git_Flooorent/cours-spark-telecom/data/prepared_trainingset/
myDataFrame: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 12 more fields]


Let's remove all the null values.

#### Utilisation des données textuelles :
- Stage 1 : récupérer les mots des textes

In [32]:
myDataFrame
    .select("text")
    .show

+--------------------+
|                text|
+--------------------+
|american options ...|
|iheadbones bone c...|
|the fridge magazi...|
|support new men's...|
|can('t) a psychol...|
|fragmented fate e...|
|transport (suspen...|
|the secret life o...|
|cc survival decep...|
|the best protein ...|
|paradise falls pa...|
|the chalet woodsh...|
|vagabond mobile g...|
|southern shakespe...|
|leviathan: montau...|
|the candle tray h...|
|sun skin the miss...|
|7sonic debut stud...|
|the hades pit: a ...|
|the fitness refin...|
+--------------------+
only showing top 20 rows



In [33]:
val tokenizer = new RegexTokenizer()
  .setPattern("\\W+")
  .setGaps(true)
  .setInputCol("text")
  .setOutputCol("tokens")
val countTokens = udf { (words: Seq[String]) => words.length }
//val regexTokenized = tokenizer.transform(myDataFrame.select("text"))
val regexTokenized = tokenizer.transform(myDataFrame)
regexTokenized.select("text", "tokens")
    .withColumn("tokens", countTokens(col("tokens"))).show(false)
regexTokenized
    .select("text","tokens")
    .show

+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+
|text                                                                                                                                                                                                                                               |tokens|
+---------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+------+
|american options for greater productivity -- (paper) -- looking to create a hemp based paper/ cardboard recycling manufacturing facility american-options-for-greater-productivity-paper                                                        

tokenizer: org.apache.spark.ml.feature.RegexTokenizer = regexTok_df9b0f8a2ea3
countTokens: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,IntegerType,Some(List(ArrayType(StringType,true))))
regexTokenized: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 13 more fields]


In [34]:
regexTokenized.printSchema()

root
 |-- project_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- goal: integer (nullable = true)
 |-- keywords: string (nullable = true)
 |-- final_status: integer (nullable = true)
 |-- country2: string (nullable = true)
 |-- currency2: string (nullable = true)
 |-- deadline2: string (nullable = true)
 |-- created_at2: string (nullable = true)
 |-- launched_at2: string (nullable = true)
 |-- days_campaign: integer (nullable = true)
 |-- hours_prepa: double (nullable = true)
 |-- text: string (nullable = true)
 |-- tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)



- Stage 2 : retirer les stop words 

In [35]:
val remover = new StopWordsRemover()
  .setInputCol(tokenizer.getOutputCol) //  .setInputCol("tokens")
  .setOutputCol("filtered")

val dfFiltered     =   remover.transform(regexTokenized) //.toDF().show(true)
dfFiltered.select("tokens").show(10)

+--------------------+
|              tokens|
+--------------------+
|[american, option...|
|[iheadbones, bone...|
|[the, fridge, mag...|
|[support, new, me...|
|[can, t, a, psych...|
|[fragmented, fate...|
|[transport, suspe...|
|[the, secret, lif...|
|[cc, survival, de...|
|[the, best, prote...|
+--------------------+
only showing top 10 rows



remover: org.apache.spark.ml.feature.StopWordsRemover = stopWords_2f4d0da2d085
dfFiltered: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 14 more fields]


- Stage 3 : computer la partie TF

In [36]:
dfFiltered.printSchema()

root
 |-- project_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- goal: integer (nullable = true)
 |-- keywords: string (nullable = true)
 |-- final_status: integer (nullable = true)
 |-- country2: string (nullable = true)
 |-- currency2: string (nullable = true)
 |-- deadline2: string (nullable = true)
 |-- created_at2: string (nullable = true)
 |-- launched_at2: string (nullable = true)
 |-- days_campaign: integer (nullable = true)
 |-- hours_prepa: double (nullable = true)
 |-- text: string (nullable = true)
 |-- tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filtered: array (nullable = true)
 |    |-- element: string (containsNull = true)



In [37]:
// fit a CountVectorizerModel from the corpus
val countVectorizer: CountVectorizer = new CountVectorizer()
  .setInputCol(remover.getOutputCol)
  .setOutputCol("rawFeatures")
  .setMinDF(1)
val tfModel = countVectorizer.fit(dfFiltered)
val featurizedData = tfModel.transform(dfFiltered)
featurizedData.select("filtered", "rawFeatures").show(20)

+--------------------+--------------------+
|            filtered|         rawFeatures|
+--------------------+--------------------+
|[american, option...|(97856,[33,43,91,...|
|[iheadbones, bone...|(97856,[700,874,8...|
|[fridge, magazine...|(97856,[0,7,87,30...|
|[support, new, me...|(97856,[0,60,86,1...|
|[psychological, h...|(97856,[3,10,13,6...|
|[fragmented, fate...|(97856,[88,114,15...|
|[transport, suspe...|(97856,[2,19,44,6...|
|[secret, life, mo...|(97856,[11,19,31,...|
|[cc, survival, de...|(97856,[9,118,288...|
|[best, protein, d...|(97856,[49,79,218...|
|[paradise, falls,...|(97856,[9,10,23,1...|
|[chalet, woodshop...|(97856,[7,33,184,...|
|[vagabond, mobile...|(97856,[4,14,105,...|
|[southern, shakes...|(97856,[49,103,11...|
|[leviathan, monta...|(97856,[32,42,110...|
|[candle, tray, ha...|(97856,[48,90,150...|
|[sun, skin, missi...|(97856,[110,571,8...|
|[7sonic, debut, s...|(97856,[1,18,42,4...|
|[hades, pit, fema...|(97856,[3,52,53,6...|
|[fitness, refiner...|(97856,[33

countVectorizer: org.apache.spark.ml.feature.CountVectorizer = cntVec_59c226a0eac4
tfModel: org.apache.spark.ml.feature.CountVectorizerModel = cntVec_59c226a0eac4
featurizedData: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 15 more fields]


In [38]:
//val hashingTF = new HashingTF()
//    .setInputCol("filtered")
//    .setOutputCol("rawFeatures")
//    .setNumFeatures(20000)
//dfFiltered.getClass
//val featurizedData2 = hashingTF.transform(dfF) //.show(true)
//featurizedData2.show

- Stage 4 : computer la partie IDF

In [39]:
featurizedData.printSchema()

root
 |-- project_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- goal: integer (nullable = true)
 |-- keywords: string (nullable = true)
 |-- final_status: integer (nullable = true)
 |-- country2: string (nullable = true)
 |-- currency2: string (nullable = true)
 |-- deadline2: string (nullable = true)
 |-- created_at2: string (nullable = true)
 |-- launched_at2: string (nullable = true)
 |-- days_campaign: integer (nullable = true)
 |-- hours_prepa: double (nullable = true)
 |-- text: string (nullable = true)
 |-- tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filtered: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- rawFeatures: vector (nullable = true)



In [40]:
val idf = new IDF()
        .setInputCol(countVectorizer.getOutputCol) // .setInputCol("rawFeatures")
        .setOutputCol("features")
val idfModel = idf.fit(featurizedData)
val rescaledData = idfModel.transform(featurizedData)

idf: org.apache.spark.ml.feature.IDF = idf_1f9830d082e8
idfModel: org.apache.spark.ml.feature.IDFModel = idf_1f9830d082e8
rescaledData: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 16 more fields]


#### Conversion des variables catégorielles en variables numériques :
- Stage 5 : convertir **country2** en quantités numériques dans une colonne **country_indexed.**

In [41]:
val indexer = new StringIndexer()
          .setInputCol("country2")
          .setOutputCol("country_indexed")

val DFindexed = indexer.fit(rescaledData).transform(rescaledData)
DFindexed.select("country2","country_indexed").distinct().show

+--------+---------------+
|country2|country_indexed|
+--------+---------------+
|      AU|            3.0|
|      IE|            9.0|
|      US|            0.0|
|      GB|            1.0|
|      CA|            2.0|
|      NO|            8.0|
|      DE|           10.0|
|      DK|            7.0|
|      NL|            4.0|
|      NZ|            5.0|
|      SE|            6.0|
+--------+---------------+



indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_faaa50b96c65
DFindexed: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 17 more fields]


- Stage 6 : convertir **currency2** en quantités numériques dans une colonne **currency_indexed.**

In [42]:
val indexerCur = new StringIndexer()
          .setInputCol("currency2")
          .setOutputCol("currency_indexed")

val DFindexedCur = indexerCur.fit(DFindexed).transform(DFindexed)
DFindexedCur.select("currency2","currency_indexed").distinct().show

+---------+----------------+
|currency2|currency_indexed|
+---------+----------------+
|      GBP|             1.0|
|      NZD|             5.0|
|      DKK|             7.0|
|      AUD|             3.0|
|      CAD|             2.0|
|      EUR|             4.0|
|      USD|             0.0|
|      SEK|             6.0|
|      NOK|             8.0|
+---------+----------------+



indexerCur: org.apache.spark.ml.feature.StringIndexer = strIdx_a7c28c7ccada
DFindexedCur: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 18 more fields]


- Stages 7 et 8: One-Hot encoder ces deux catégories avec un "one-hot encoder" en créant les colonnes **country_onehot** et **currency_onehot.** 

In [43]:
val oneHotEncoder = new OneHotEncoderEstimator()
          .setInputCols(Array("country_indexed", "currency_indexed"))
          .setOutputCols(Array("country_onehot", "currency_onehot"))

val DFOne = oneHotEncoder.fit(DFindexedCur).transform(DFindexedCur)
DFOne.select("country_onehot","currency_onehot").show

+--------------+---------------+
|country_onehot|currency_onehot|
+--------------+---------------+
|(10,[0],[1.0])|  (8,[0],[1.0])|
|(10,[0],[1.0])|  (8,[0],[1.0])|
|(10,[0],[1.0])|  (8,[0],[1.0])|
|(10,[0],[1.0])|  (8,[0],[1.0])|
|(10,[0],[1.0])|  (8,[0],[1.0])|
|(10,[0],[1.0])|  (8,[0],[1.0])|
|(10,[4],[1.0])|  (8,[4],[1.0])|
|(10,[1],[1.0])|  (8,[1],[1.0])|
|(10,[1],[1.0])|  (8,[1],[1.0])|
|(10,[0],[1.0])|  (8,[0],[1.0])|
|(10,[1],[1.0])|  (8,[1],[1.0])|
|(10,[0],[1.0])|  (8,[0],[1.0])|
|(10,[1],[1.0])|  (8,[1],[1.0])|
|(10,[0],[1.0])|  (8,[0],[1.0])|
|(10,[0],[1.0])|  (8,[0],[1.0])|
|(10,[0],[1.0])|  (8,[0],[1.0])|
|(10,[0],[1.0])|  (8,[0],[1.0])|
|(10,[0],[1.0])|  (8,[0],[1.0])|
|(10,[1],[1.0])|  (8,[1],[1.0])|
|(10,[1],[1.0])|  (8,[1],[1.0])|
+--------------+---------------+
only showing top 20 rows



oneHotEncoder: org.apache.spark.ml.feature.OneHotEncoderEstimator = oneHotEncoder_167a84521c5d
DFOne: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 20 more fields]


In [44]:
DFOne.printSchema()

root
 |-- project_id: string (nullable = true)
 |-- name: string (nullable = true)
 |-- desc: string (nullable = true)
 |-- goal: integer (nullable = true)
 |-- keywords: string (nullable = true)
 |-- final_status: integer (nullable = true)
 |-- country2: string (nullable = true)
 |-- currency2: string (nullable = true)
 |-- deadline2: string (nullable = true)
 |-- created_at2: string (nullable = true)
 |-- launched_at2: string (nullable = true)
 |-- days_campaign: integer (nullable = true)
 |-- hours_prepa: double (nullable = true)
 |-- text: string (nullable = true)
 |-- tokens: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- filtered: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- rawFeatures: vector (nullable = true)
 |-- features: vector (nullable = true)
 |-- country_indexed: double (nullable = false)
 |-- currency_indexed: double (nullable = false)
 |-- country_onehot: vector (nullable = true)
 |-- currency_onehot: vecto

#### Mettre les données sous une forme utilisable par Spark.ML :

La plupart des algorithmes de machine learning dans Spark requièrent que les colonnes utilisées en input du modèle (les features du modèle) soient regroupées dans une seule colonne qui contient des vecteurs. On veut donc passer de 

| *Feature A* | *Feature B* | *Feature C* | *Label*  |
| :---------: | :---------: | :---------: | :------: |
| 0.5         | 1           | 3.5         | 0        |
| 0.6         | 1           | 1.2         | 1        

à

|*Features*     |*Label*|
|:-------------:|:-----:|
| (0.5, 1, 3.5) | 0     |
| (0.6, 1, 1.2) | 1     |


- Stage 9 : assembler tous les features en un unique vecteur

In [45]:
val assembler = new VectorAssembler()
  .setInputCols(Array("features", "days_campaign", "hours_prepa", "goal", "country_onehot", "currency_onehot"))
  .setOutputCol("join_features")

val DFGroupFeat = assembler.transform(DFOne)
DFGroupFeat.select("join_features").show

+--------------------+
|       join_features|
+--------------------+
|(97877,[33,43,91,...|
|(97877,[700,874,8...|
|(97877,[0,7,87,30...|
|(97877,[0,60,86,1...|
|(97877,[3,10,13,6...|
|(97877,[88,114,15...|
|(97877,[2,19,44,6...|
|(97877,[11,19,31,...|
|(97877,[9,118,288...|
|(97877,[49,79,218...|
|(97877,[9,10,23,1...|
|(97877,[7,33,184,...|
|(97877,[4,14,105,...|
|(97877,[49,103,11...|
|(97877,[32,42,110...|
|(97877,[48,90,150...|
|(97877,[110,571,8...|
|(97877,[1,18,42,4...|
|(97877,[3,52,53,6...|
|(97877,[33,89,260...|
+--------------------+
only showing top 20 rows



assembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_3eb77e3f98ae
DFGroupFeat: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 21 more fields]


- Stage 10 : créer/instancier le modèle de classification

In [46]:
val lr = new LogisticRegression()
    .setElasticNetParam(0.0)
    .setFitIntercept(true)
    .setFeaturesCol("features")
    .setLabelCol("final_status")
    .setStandardization(true)
    .setPredictionCol("predictions")
    .setRawPredictionCol("raw_predictions")
    .setThresholds(Array(0.7, 0.3))
    .setTol(1.0e-6)
    .setMaxIter(50) //comparer les deux valeurs et commenter

lr: org.apache.spark.ml.classification.LogisticRegression = logreg_5d9197fccd6f


#### Création du Pipeline :

In [47]:
val myPipeline = new Pipeline().setStages(Array(tokenizer, remover, 
                                                countVectorizer, idf, 
                                                indexer, indexerCur, 
                                                oneHotEncoder, assembler, lr))

myPipeline: org.apache.spark.ml.Pipeline = pipeline_1c315e5f4e56


#### Entraînement, test, et sauvegarde du modèle :
- Split des données en training et test sets

In [48]:
val Array(training, test) = myDataFrame.randomSplit(Array(0.9, 0.1), 98765L)

training: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [project_id: string, name: string ... 12 more fields]
test: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [project_id: string, name: string ... 12 more fields]


- Entraînement du modèle

In [49]:
val model1 = myPipeline.fit(training)

model1: org.apache.spark.ml.PipelineModel = pipeline_1c315e5f4e56


- Test du modèle

In [50]:
val dfWithSimplePredictions = model1.transform(test)
dfWithSimplePredictions.groupBy("final_status", "predictions").count.show()
val evaluator = new MulticlassClassificationEvaluator()
                    .setLabelCol("final_status")
                    .setPredictionCol("predictions")
                    .setMetricName("f1")

+------------+-----------+-----+
|final_status|predictions|count|
+------------+-----------+-----+
|           1|        0.0| 1892|
|           0|        1.0| 2310|
|           1|        1.0| 1589|
|           0|        0.0| 4963|
+------------+-----------+-----+



dfWithSimplePredictions: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 24 more fields]
evaluator: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_7c160f7e701d


In [51]:
println("Modèle simple")
val f1score = evaluator.evaluate(dfWithSimplePredictions)
println("f1score du modele simple sur les donnees = %.3f\n" + f1score)

Modèle simple
f1score du modele simple sur les donnees = %.3f
0.6145469784062658


f1score: Double = 0.6145469784062658


#### Réglage des hyper-paramètres (a.k.a. tuning) du modèle :

La façon de procéder présentée plus haut permet rapidement d'entraîner un modèle et d'avoir une mesure de sa performance. Mais que se passe-t-il si 

    - on souhaite utiliser 300 itérations au maximum plutôt que 50 (i.e. la ligne .setMaxIter(50) ) ? 
    
**Réponse :**

Avec un nombre maximum d'itérations réglé à 300, on constate que les **temps de calculs** deviennent beaucoup plus longs et même parfois le calcul n'est pas finalisé en raison de la consommation importante de **mémoire vive**.

    - on souhaite modifier le paramètre de régularisation du modèle ? 

**Réponse :**

Si on souhaite modifier le paramètre de régularisation du modèle il faut relancer tout le pipeline, ce qui n'est pas pratique et on arrive difficilement à trouver le paramètre optimum en procédant ainsi. On va voir que la méthode de grid search permet de trouver de manière pratique les hyperparamètres optimums pour le modèle.



    - on souhaite modifier le paramètre minDF de la classe CountVectorizer (qui permet de ne prendre que les mots apparaissant dans au moins minDF documents) ?

**Réponse :**

Le fait de négliger les mots qui n'apparaissent que très peu de fois dans le texte devrait permettre de robustifier le modèle, de le rendre plus général. On accélère aussi surtout le processing dans la mesure on ne ne considère pas les mots que n'apparaissent pratiquement pas, on limite le nombre de mots sur lequel on fait le processing, donc **on réduit le temps et la mémoire nécessaires au calculs.**


Il faudrait à chaque fois modifier le(s) paramètre(s) à la main, ré-entraîner le modèle, re-calculer la performance du modèle obtenu sur l'ensemble de test, puis finalement choisir le meilleur modèle (i.e. celui avec la meilleure performance sur les données de test) parmi tous ces modèles entraînés. C'est ce qu'on appelle le réglage des hyper-paramètres ou encore tuning du modèle. Et c'est fastidieux.


La plupart des algorithmes de machine learning possèdent des hyper-paramètres, par exemple le nombre de couches et de neurones dans un réseau de neurones, le nombre d’arbres et leur profondeur maximale dans les random forests, etc. Qui plus est, comme mentionné précédemment avec le paramètre minDF de la classe CountVectorizer, on peut également se retrouver avec des hyper-paramètres au niveau des stages de préprocessing. L'objectif est donc de trouver la meilleure combinaison possible de tous ces hyper-paramètres.

**- Grid search**

Une des techniques pour régler automatiquement les hyper-paramètres est la grid
search qui consiste à : 

   - créer une grille de valeurs à tester pour les hyper-paramètres 
    
   - en chaque point de la grille 
    
        - séparer le training set en un ensemble de training (70%) et un ensemble de validation (30%)
    
        - entraîner un modèle sur le training set
    
        - calculer l’erreur du modèle sur le validation set

   - sélectionner le point de la grille (<=> garder les valeurs d’hyper-paramètres de ce point) où l’erreur de validation est la plus faible i.e. là où le modèle a le mieux appris
    
Pour la régularisation de notre régression logistique on veut tester les valeurs de 10e-8 à 10e-2 par pas de 2.0 en échelle logarithmique (on veut tester les valeurs 10e-8, 10e-6, 10e-4 et 10e-2). Pour le paramètre minDF de CountVectorizer on veut tester les valeurs de 55 à 95 par pas de 20. En chaque point de la grille on veut utiliser 70% des données pour l’entraînement et 30% pour la validation. On veut utiliser le **f1-score** pour comparer les différents modèles en chaque point de la grille. 

Préparer la grid-search pour satisfaire les conditions explicitées ci-dessus puis lancer la grid-search sur le dataset "training" préparé précédemment.

In [52]:
// Réglage des hyper-paramètres du modèle
// par Grid search
val paramGrid = new ParamGridBuilder()
      .addGrid(lr.regParam, Array(10e-8, 10e-6, 10e-4, 10e-2))
      .addGrid(countVectorizer.minDF, Array[Double](55, 75, 95))
      .build()

val cross_valid = new TrainValidationSplit()
      .setEstimator(myPipeline)
      .setEvaluator(evaluator)
      .setEstimatorParamMaps(paramGrid)
      .setTrainRatio(0.7) // 70% training, 30% validation

val model2 = cross_valid.fit(training)

paramGrid: Array[org.apache.spark.ml.param.ParamMap] =
Array({
	cntVec_59c226a0eac4-minDF: 55.0,
	logreg_5d9197fccd6f-regParam: 1.0E-7
}, {
	cntVec_59c226a0eac4-minDF: 55.0,
	logreg_5d9197fccd6f-regParam: 1.0E-5
}, {
	cntVec_59c226a0eac4-minDF: 55.0,
	logreg_5d9197fccd6f-regParam: 0.001
}, {
	cntVec_59c226a0eac4-minDF: 55.0,
	logreg_5d9197fccd6f-regParam: 0.1
}, {
	cntVec_59c226a0eac4-minDF: 75.0,
	logreg_5d9197fccd6f-regParam: 1.0E-7
}, {
	cntVec_59c226a0eac4-minDF: 75.0,
	logreg_5d9197fccd6f-regParam: 1.0E-5
}, {
	cntVec_59c226a0eac4-minDF: 75.0,
	logreg_5d9197fccd6f-regParam: 0.001
}, {
	cntVec_59c226a0eac4-minDF: 75.0,
	logreg_5d9197fccd6f-regParam: 0.1
}, {
	cntVec_59c226a0eac4-minDF: 95.0,
	logreg_5d9197fccd6f-regParam: 1.0E-7
}, {
	cntVec_59c226a0eac4-minDF: 95.0,
	logreg_5d9197f...

- Test du modèle

On a vu que pour évaluer de façon non biaisée la pertinence du modèle obtenu, il fallait le tester sur des données qu'il n'avait jamais vues pendant son entraînement. Ça vaut également pour les données utilisées pour sélectionner le meilleur modèle de la grid search (training et validation)! C’est pour cela que nous avons construit le dataset de test que nous avons laissé de côté jusque là. 

Appliquer le meilleur modèle trouvé avec la grid-search aux données de test. Mettre les résultats dans le DataFrame dfWithPredictions . Afficher le f1-score du modèle sur les données de test. 

Afficher 

dfWithPredictions.groupBy("final_status", "predictions").count.show()

Sauvegarder le modèle entraîné pour pouvoir le réutiliser plus tard.

In [53]:
val dfWithPredictions: DataFrame = model2.transform(test)
dfWithPredictions.groupBy("final_status", "predictions").count.show()

+------------+-----------+-----+
|final_status|predictions|count|
+------------+-----------+-----+
|           1|        0.0| 1093|
|           0|        1.0| 2941|
|           1|        1.0| 2388|
|           0|        0.0| 4332|
+------------+-----------+-----+



dfWithPredictions: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 24 more fields]


In [54]:
println("Modèle paramétrique (grid search)")
val f1score2 = evaluator.evaluate(dfWithPredictions)
println("f1score du modele paramétrique (avec grid search) sur les données = %.3f".format(f1score2))

Modèle paramétrique (grid search)
f1score du modele paramétrique (avec grid search) sur les données = 0.637


f1score2: Double = 0.6369300415318682


#### Supplément
Pour plus d’information sur la façon dont est parallélisée la méthode de Newton (pour trouver le maximum de la fonction de coût définissant la régression logistique, qui est le log de la vraisemblance) :

- [http://www.slideshare.net/dbtsai/2014-0620-mlor-36132297](http://www.slideshare.net/dbtsai/2014-0620-mlor-36132297)

- [http://www.research.rutgers.edu/~lihong/pub/Zinkevich11Parallelized.pdf](http://www.research.rutgers.edu/~lihong/pub/Zinkevich11Parallelized.pdf)
     
- [https://arxiv.org/pdf/1605.06049v1.pdf](https://arxiv.org/pdf/1605.06049v1.pdf)