In [2]:
// package paristech

import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.log4j.{Level, Logger}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{RegexTokenizer, StopWordsRemover}
import org.apache.spark.ml.feature.CountVectorizerModel
import org.apache.spark.ml.feature.CountVectorizer
import org.apache.spark.ml.feature.IDF
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.OneHotEncoderEstimator
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}







object Trainer {

  def main(args: Array[String]): Unit = {

    val conf = new SparkConf().setAll(Map(
      "spark.scheduler.mode" -> "FIFO",
      "spark.speculation" -> "false",
      "spark.reducer.maxSizeInFlight" -> "48m",
      "spark.serializer" -> "org.apache.spark.serializer.KryoSerializer",
      "spark.kryoserializer.buffer.max" -> "1g",
      "spark.shuffle.file.buffer" -> "32k",
      "spark.default.parallelism" -> "12",
      "spark.sql.shuffle.partitions" -> "12",
      "spark.driver.maxResultSize" -> "2g"
    ))

    val spark = SparkSession
      .builder
      .config(conf)
      .appName("TP Spark : Trainer")
      .getOrCreate()

    val rootLogger = Logger.getRootLogger()
    rootLogger.setLevel(Level.ERROR)


    /*******************************************************************************
      *
      *       TP 3
      *
      *       - lire le fichier sauvegarder précédemment
      *       - construire les Stages du pipeline, puis les assembler
      *       - trouver les meilleurs hyperparamètres pour l'entraînement du pipeline avec une grid-search
      *       - Sauvegarder le pipeline entraîné
      *
      *       if problems with unimported modules => sbt plugins update
      *
      ********************************************************************************/

    println("hello world ! from Trainer")

    // #### Processing ####
    // Lecture du fichier cleané du TP précédent
    val parquetFileDF = spark.read.parquet("TP3_input/prepared_trainingset")

    // Split en mots
    val tokenizer = new RegexTokenizer()
      .setPattern("\\W+")
      .setGaps(true)
      .setInputCol("text")
      .setOutputCol("tokens")

    // Retrait des Stop Words
    val remover = new StopWordsRemover()
      .setInputCol("tokens")
      .setOutputCol("filtered")

    // Term frequency
    val cvModel: CountVectorizer = new CountVectorizer()
      .setInputCol("filtered")
      .setOutputCol("rawfeatures")

    // Inverse document frequency
    val idf = new IDF()
      .setInputCol("rawfeatures")
      .setOutputCol("tfidf")

    // country2 en numérique
    val indexer = new StringIndexer()
      .setInputCol("country2")
      .setOutputCol("country_indexed")

    // currency 2 en numérique
    val indexer2 = new StringIndexer()
      .setInputCol("currency2")
      .setOutputCol("currency_indexed")

    // One-Hot encoding
    val encoder = new OneHotEncoderEstimator()
      .setInputCols(Array("country_indexed", "currency_indexed"))
      .setOutputCols(Array("country_onehot", "currency_onehot"))

    // #### ML ####
    // Creation du vecteur features
    val assembler = new VectorAssembler()
      .setInputCols(Array("tfidf", "days_campaign", "hours_prepa","goal","country_onehot","currency_onehot"))
      .setOutputCol("features")

    // Classifieur regression logistique
    val lr = new LogisticRegression()
      .setElasticNetParam(0.0)
      .setFitIntercept(true)
      .setFeaturesCol("features")
      .setLabelCol("final_status")
      .setStandardization(true)
      .setPredictionCol("predictions")
      .setRawPredictionCol("raw_predictions")
      .setThresholds(Array(0.7, 0.3))
      .setTol(1.0e-6)
      .setMaxIter(20)

    // Pipeline
    val pipeline = new Pipeline()
      .setStages(Array(tokenizer, remover, cvModel,idf , indexer, indexer2, encoder, assembler, lr))

    // Split données en training et test
    val Array(training,test) = parquetFileDF.randomSplit(Array(0.9, 0.1))

    // Entraînement du modèle sur train
    val model = pipeline.fit(training)

    // Calcul de la prédiction
    val dfWithSimplePredictions = model.transform(test)
    dfWithSimplePredictions.groupBy("final_status", "predictions").count.show()

    // Performance du modèle
    val evaluator = new MulticlassClassificationEvaluator()
      .setLabelCol("final_status")
      .setPredictionCol("predictions")
      .setMetricName("f1")

    // Taux de bonnes prédictions
    val accuracy = evaluator.evaluate(dfWithSimplePredictions)
    println(s"Accuracy reg log: ${accuracy}")

    // Recherche d'un meilleur modèle par Gridsearch
    val paramGrid = new ParamGridBuilder()
      .addGrid(cvModel.minDF, Array(55.0, 75.0, 95.0))
      .addGrid(lr.regParam, Array(10e-8, 10e-6, 10e-4, 10e-2))
      .addGrid(lr.elasticNetParam,Array(0.0, 0.5, 1.0))
      .build()


    // Evaluation en train / validation
    val trainValidationSplit = new TrainValidationSplit()
      .setEstimator(pipeline)
      .setEvaluator(evaluator)
      .setEstimatorParamMaps(paramGrid)
      .setTrainRatio(0.7)
      .setParallelism(2)

    // fitting du meilleur modèle sur train
    val model_best = trainValidationSplit.fit(training)

    // prediction sur test
    val dfWithSimplePredictions_best = model_best.transform(test)

    // affichage
    dfWithSimplePredictions_best.select("features", "final_status", "predictions")
      .show()

    // accuracy du meilleur modèle
    val accuracy_best = evaluator.evaluate(dfWithSimplePredictions_best)
    println(s"Accuracy best reg log: ${accuracy_best}")



    // #### enregistrement du meilleur modèle ####
    model.write.overwrite().save("TP3_output/spark-log-model")
    model_best.write.overwrite().save("TP3_output/spark-log-bestmodel")

  }
}


import org.apache.spark.SparkConf
import org.apache.spark.sql.SparkSession
import org.apache.log4j.{Level, Logger}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{RegexTokenizer, StopWordsRemover}
import org.apache.spark.ml.feature.CountVectorizerModel
import org.apache.spark.ml.feature.CountVectorizer
import org.apache.spark.ml.feature.IDF
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.OneHotEncoderEstimator
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.tuning.{CrossValidator, ParamGridBuilder}
import org.apache.spark.ml.tuning.{ParamGridBuilder, ...

In [3]:
val conf = new SparkConf().setAll(Map(
      "spark.scheduler.mode" -> "FIFO",
      "spark.speculation" -> "false",
      "spark.reducer.maxSizeInFlight" -> "48m",
      "spark.serializer" -> "org.apache.spark.serializer.KryoSerializer",
      "spark.kryoserializer.buffer.max" -> "1g",
      "spark.shuffle.file.buffer" -> "32k",
      "spark.default.parallelism" -> "12",
      "spark.sql.shuffle.partitions" -> "12",
      "spark.driver.maxResultSize" -> "2g"
    ))

    val spark = SparkSession
      .builder
      .config(conf)
      .appName("TP Spark : Trainer")
      .getOrCreate()

    val rootLogger = Logger.getRootLogger()
    rootLogger.setLevel(Level.ERROR)


    /*******************************************************************************
      *
      *       TP 3
      *
      *       - lire le fichier sauvegarder précédemment
      *       - construire les Stages du pipeline, puis les assembler
      *       - trouver les meilleurs hyperparamètres pour l'entraînement du pipeline avec une grid-search
      *       - Sauvegarder le pipeline entraîné
      *
      *       if problems with unimported modules => sbt plugins update
      *
      ********************************************************************************/

    println("hello world ! from Trainer")

    // #### Processing ####
    // Lecture du fichier cleané du TP précédent
    val parquetFileDF = spark.read.parquet("TP3_input/prepared_trainingset")

    // Split en mots
    val tokenizer = new RegexTokenizer()
      .setPattern("\\W+")
      .setGaps(true)
      .setInputCol("text")
      .setOutputCol("tokens")

    // Retrait des Stop Words
    val remover = new StopWordsRemover()
      .setInputCol("tokens")
      .setOutputCol("filtered")

    // Term frequency
    val cvModel: CountVectorizer = new CountVectorizer()
      .setInputCol("filtered")
      .setOutputCol("rawfeatures")

    // Inverse document frequency
    val idf = new IDF()
      .setInputCol("rawfeatures")
      .setOutputCol("tfidf")

    // country2 en numérique
    val indexer = new StringIndexer()
      .setInputCol("country2")
      .setOutputCol("country_indexed")

    // currency 2 en numérique
    val indexer2 = new StringIndexer()
      .setInputCol("currency2")
      .setOutputCol("currency_indexed")

    // One-Hot encoding
    val encoder = new OneHotEncoderEstimator()
      .setInputCols(Array("country_indexed", "currency_indexed"))
      .setOutputCols(Array("country_onehot", "currency_onehot"))

    // #### ML ####
    // Creation du vecteur features
    val assembler = new VectorAssembler()
      .setInputCols(Array("tfidf", "days_campaign", "hours_prepa","goal","country_onehot","currency_onehot"))
      .setOutputCol("features")

    // Classifieur regression logistique
    val lr = new LogisticRegression()
      .setElasticNetParam(0.0)
      .setFitIntercept(true)
      .setFeaturesCol("features")
      .setLabelCol("final_status")
      .setStandardization(true)
      .setPredictionCol("predictions")
      .setRawPredictionCol("raw_predictions")
      .setThresholds(Array(0.7, 0.3))
      .setTol(1.0e-6)
      .setMaxIter(20)

    // Pipeline
    val pipeline = new Pipeline()
      .setStages(Array(tokenizer, remover, cvModel,idf , indexer, indexer2, encoder, assembler, lr))

    // Split données en training et test
    val Array(training,test) = parquetFileDF.randomSplit(Array(0.9, 0.1))

    // Entraînement du modèle sur train
    val model = pipeline.fit(training)

    // Calcul de la prédiction
    val dfWithSimplePredictions = model.transform(test)
    dfWithSimplePredictions.groupBy("final_status", "predictions").count.show()

    // Performance du modèle
    val evaluator = new MulticlassClassificationEvaluator()
      .setLabelCol("final_status")
      .setPredictionCol("predictions")
      .setMetricName("f1")

    // Taux de bonnes prédictions
    val accuracy = evaluator.evaluate(dfWithSimplePredictions)
    println(s"Accuracy reg log: ${accuracy}")

    // Recherche d'un meilleur modèle par Gridsearch
    val paramGrid = new ParamGridBuilder()
      .addGrid(cvModel.minDF, Array(55.0, 75.0, 95.0))
      .addGrid(lr.regParam, Array(10e-8, 10e-6, 10e-4, 10e-2))
      .addGrid(lr.elasticNetParam,Array(0.0, 0.5, 1.0))
      .build()


    // Evaluation en train / validation
    val trainValidationSplit = new TrainValidationSplit()
      .setEstimator(pipeline)
      .setEvaluator(evaluator)
      .setEstimatorParamMaps(paramGrid)
      .setTrainRatio(0.7)
      .setParallelism(2)

    // fitting du meilleur modèle sur train
    val model_best = trainValidationSplit.fit(training)

    // prediction sur test
    val dfWithSimplePredictions_best = model_best.transform(test)

    // affichage
    dfWithSimplePredictions_best.select("features", "final_status", "predictions")
      .show()

    // accuracy du meilleur modèle
    val accuracy_best = evaluator.evaluate(dfWithSimplePredictions_best)
    println(s"Accuracy best reg log: ${accuracy_best}")



    // #### enregistrement du meilleur modèle ####
    model.write.overwrite().save("TP3_output/spark-log-model")
    model_best.write.overwrite().save("TP3_output/spark-log-bestmodel")



hello world ! from Trainer
+------------+-----------+-----+
|final_status|predictions|count|
+------------+-----------+-----+
|           1|        0.0| 1798|
|           0|        1.0| 2357|
|           1|        1.0| 1613|
|           0|        0.0| 4987|
+------------+-----------+-----+

Accuracy reg log: 0.6206550633911129
+--------------------+------------+-----------+
|            features|final_status|predictions|
+--------------------+------------+-----------+
|(3856,[0,22,29,71...|           0|        1.0|
|(3856,[27,67,84,7...|           0|        0.0|
|(3856,[105,192,45...|           0|        0.0|
|(3856,[9,89,198,2...|           0|        0.0|
|(3856,[5,31,32,42...|           0|        0.0|
|(3856,[31,83,171,...|           0|        0.0|
|(3856,[13,14,32,5...|           0|        0.0|
|(3856,[4,10,33,10...|           0|        0.0|
|(3856,[27,28,77,2...|           0|        0.0|
|(3856,[16,38,66,6...|           1|        1.0|
|(3856,[11,13,55,6...|           0|        1.0|

conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@43f30f03
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@59ebc348
rootLogger: org.apache.log4j.Logger = org.apache.log4j.spi.RootLogger@9fc12d8
parquetFileDF: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 12 more fields]
tokenizer: org.apache.spark.ml.feature.RegexTokenizer = regexTok_0643b980f4c6
remover: org.apache.spark.ml.feature.StopWordsRemover = stopWords_35cbf84cac45
cvModel: org.apache.spark.ml.feature.CountVectorizer = cntVec_717d3e696521
idf: org.apache.spark.ml.feature.IDF = idf_5b80d3668cd6
indexer: org.apache.spark.ml.feature.StringIndexer = strIdx_3ed3a5bcdaf4
indexer2: org.apache.spark.ml.feature.StringIndexer = strIdx_d4e60c4fd7c7
encoder: org.apache.spark...