In [4]:
import org.apache.spark.SparkConf
import org.apache.spark.sql.{Column, ColumnName, DataFrame, SparkSession}
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.when
import org.apache.spark.sql.functions.datediff
import org.apache.spark.sql.functions.round
import org.apache.spark.sql.functions.from_unixtime
import org.apache.spark.sql.functions.concat_ws
import org.apache.spark.sql.functions.lower



// Des réglages optionnels du job spark. Les réglages par défaut fonctionnent très bien pour ce TP.
// On vous donne un exemple de setting quand même
val conf = new SparkConf().setAll(Map(
  "spark.scheduler.mode" -> "FIFO",
  "spark.speculation" -> "false",
  "spark.reducer.maxSizeInFlight" -> "48m",
  "spark.serializer" -> "org.apache.spark.serializer.KryoSerializer",
  "spark.kryoserializer.buffer.max" -> "1g",
  "spark.shuffle.file.buffer" -> "32k",
  "spark.default.parallelism" -> "12",
  "spark.sql.shuffle.partitions" -> "12"
))

// Initialisation du SparkSession qui est le point d'entrée vers Spark SQL (donne accès aux dataframes, aux RDD,
// création de tables temporaires, etc., et donc aux mécanismes de distribution des calculs)
val spark = SparkSession
  .builder
  .config(conf)
  .appName("TP Spark : Preprocessor")
  .getOrCreate()
import spark.implicits._ //if don't import here then compiler doesn't understand the $ before "goal, deadline,..."

/** *****************************************************************************
  *
  * TP 2
  *
  *       - Charger un fichier csv dans un dataFrame
  *       - Pre-processing: cleaning, filters, feature engineering => filter, select, drop, na.fill, join, udf, distinct, count, describe, collect
  *       - Sauver le dataframe au format parquet
  *
  * if problems with unimported modules => sbt plugins update
  *
  * *******************************************************************************/
println("\n")
println("Hello World ! from Preprocessor")
println("\n")


val df: DataFrame = spark
  .read
  .option("header", true) // utilise la première ligne du (des) fichier(s) comme header
  .option("inferSchema", "true") // pour inférer le type de chaque colonne (Int, String, etc.)
  .csv("src/main/resources/train/train_clean.csv")

val dfCasted: DataFrame = df
  .withColumn("goal", $"goal".cast("Int"))
  .withColumn("deadline", $"deadline".cast("Int"))
  .withColumn("state_changed_at", $"state_changed_at".cast("Int"))
  .withColumn("created_at", $"created_at".cast("Int"))
  .withColumn("launched_at", $"launched_at".cast("Int"))
  .withColumn("backers_count", $"backers_count".cast("Int"))
  .withColumn("final_status", $"final_status".cast("Int"))


val df2: DataFrame = dfCasted.drop("disable_communication")
val dfNoFutur: DataFrame = df2.drop("backers_count", "state_changed_at")

def cleanCountry(country: String, currency: String): String = {
  if (country == "False")
    currency
  else
    country
}

def cleanCurrency(currency: String): String = {
  if (currency != null && currency.length != 3)
    null
  else
    currency
}

val cleanCountryUdf = udf(cleanCountry _)
val cleanCurrencyUdf = udf(cleanCurrency _)

val dfCountry: DataFrame = dfNoFutur
  .withColumn("country2", cleanCountryUdf($"country", $"currency"))
  .withColumn("currency2", cleanCurrencyUdf($"currency"))
  .drop("country", "currency")

val dfFinal: DataFrame = dfCountry
  .withColumn("final_status", when($"final_status".isNull || $"final_status" =!= 1, 0).otherwise($"final_status"))

val dfCampagne: DataFrame = dfFinal
  .withColumn("days_campaign", datediff(from_unixtime($"deadline"), from_unixtime($"created_at")))
  .withColumn("hours_prepa", round($"launched_at" / 60 - $"created_at" / 60))
  .withColumn("days_campaign", $"days_campaign".cast("Int"))
  .withColumn("hours_prepa", $"hours_prepa".cast("Int"))
  .withColumn("goal", $"goal".cast("Int"))

val dfNoTime: DataFrame = dfCampagne
  .drop("launched_at", "created_at", "deadline")
  .withColumn("text", lower(concat_ws(" ", $"name", $"desc", $"keywords")))

val monDataFrameFinal: DataFrame = dfNoTime
  .withColumn("days_campaign", when($"days_campaign".isNull,-1).otherwise($"days_campaign"))
  .withColumn("hours_prepa", when($"hours_prepa" .isNull,-1).otherwise($"hours_prepa"))
  .withColumn("goal", when($"goal".isNull,-1).otherwise($"goal"))
  .withColumn("currency2", when($"currency2".isNull,"unknown").otherwise($"currency2"))
  .withColumn("country2", when($"country2".isNull,"unknown").otherwise($"country2"))

//dfNoTime.select("days_campaign").groupBy("days_campaign").count().show(100) //to see if days_compaign is converted

monDataFrameFinal.show()
monDataFrameFinal.write.parquet("src/main/resources/preprocessed")




Hello World ! from Preprocessor


+--------------+--------------------+--------------------+-----+--------------------+------------+--------+---------+-------------+-----------+--------------------+
|    project_id|                name|                desc| goal|            keywords|final_status|country2|currency2|days_campaign|hours_prepa|                text|
+--------------+--------------------+--------------------+-----+--------------------+------------+--------+---------+-------------+-----------+--------------------+
|kkst1451568084| drawing for dollars|I like drawing pi...|   20| drawing-for-dollars|           1|      US|      USD|            9|         37|drawing for dolla...|
|kkst1474482071|Sponsor Dereck Bl...|I  Dereck Blackbu...|  300|sponsor-dereck-bl...|           0|      US|      USD|           17|        256|sponsor dereck bl...|
| kkst183622197|       Mr. Squiggles|So I saw darkpony...|   30|        mr-squiggles|           0|      US|      USD|           10|        

import org.apache.spark.SparkConf
import org.apache.spark.sql.{Column, ColumnName, DataFrame, SparkSession}
import org.apache.spark.sql.functions.udf
import org.apache.spark.sql.functions.when
import org.apache.spark.sql.functions.datediff
import org.apache.spark.sql.functions.round
import org.apache.spark.sql.functions.from_unixtime
import org.apache.spark.sql.functions.concat_ws
import org.apache.spark.sql.functions.lower
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@48a86ece
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@d5d32bf
import spark.implicits._
df: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 12 more fields]
dfCasted: org.apache.spark.sql.DataFrame = [project_id: string, name: string ... 12 more fields]
...

In [5]:
import org.apache.spark.SparkConf
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.RegexTokenizer
import org.apache.spark.ml.feature.StopWordsRemover
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.ml.feature.CountVectorizer
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.OneHotEncoderEstimator
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.{Pipeline}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.sql.{DataFrame, SparkSession}


val conf = new SparkConf().setAll(Map(
  "spark.scheduler.mode" -> "FIFO",
  "spark.speculation" -> "false",
  "spark.reducer.maxSizeInFlight" -> "48m",
  "spark.serializer" -> "org.apache.spark.serializer.KryoSerializer",
  "spark.kryoserializer.buffer.max" -> "1g",
  "spark.shuffle.file.buffer" -> "32k",
  "spark.default.parallelism" -> "12",
  "spark.sql.shuffle.partitions" -> "12",
  "spark.driver.maxResultSize" -> "2g"
))

val spark = SparkSession
  .builder
  .config(conf)
  .appName("TP Spark : Trainer")
  .getOrCreate()

/*******************************************************************************
  *
  *       TP 3
  *
  *       - lire le fichier sauvegarder précédemment
  *       - construire les Stages du pipeline, puis les assembler
  *       - trouver les meilleurs hyperparamètres pour l'entraînement du pipeline avec une grid-search
  *       - Sauvegarder le pipeline entraîné
  *
  *       if problems with unimported modules => sbt plugins update
  *
  ********************************************************************************/


val df = spark.read.load("src/main/resources/preprocessed")
//df.show(100)

//Stage1: récupérer les mots des textes
val tokenizer = new RegexTokenizer()
  .setPattern("\\W+")
  .setGaps(true)
  .setInputCol("text") //input is a ligne of words
  .setOutputCol("tokens") //output is a list of seperated words

//val regexTokenized = tokenizer.transform(df)

//Stage 2 : retirer les stop words
val remover = new StopWordsRemover()
  .setInputCol("tokens")
  .setOutputCol("removed") //output is a list of seperated words without propositions etc...

//val stopremoved = remover.transform(regexTokenized)

//Stage 3 : computer la partie TF
val vectorizer = new CountVectorizer()
  .setInputCol("removed")
  .setOutputCol("vector") //output is a array of a int and two vectors of index and frequency of words
  .setVocabSize(100)
  .setMinDF(2)

//val vectorized = vectorizer.fit(stopremoved).transform(stopremoved)

//Stage 4 : computer la partie IDF
val idf = new IDF()
  .setInputCol("vector")
  .setOutputCol("tfidf") //output is the tfidf of vector

//val idfModel = idf.fit(vectorized).transform(vectorized)

//Stage 5 : convertir country2 en quantités numériques
val indexercountry = new StringIndexer()
  .setInputCol("country2")
  .setOutputCol("country_indexed") //output gives country a index
  .setHandleInvalid("keep")

//val countryindexed = indexercountry.fit(idfed).transform(idfed)

//Stage 6 : convertir currency2 en quantités numériques
val indexercurrency = new StringIndexer()
  .setInputCol("currency2")
  .setOutputCol("currency_indexed") //output gives currency a index
  .setHandleInvalid("keep")

//val currencyindexed = indexercurrency.fit(countryindexed).transform(countryindexed)

//Stages 7 et 8: One-Hot encoder ces deux catégories
val encoder = new OneHotEncoderEstimator()
  .setInputCols(Array("country_indexed", "currency_indexed"))
  .setOutputCols(Array("country_onehot", "currency_onehot")) //output convert into onehout vectors

//val encoded = encoder.fit(currencyindexed).transform(currencyindexed)

//Stage 9 : assembler tous les features en un unique vecteur
val assembler = new VectorAssembler()
  .setInputCols(Array("tfidf", "days_campaign", "hours_prepa", "goal", "country_onehot", "currency_onehot"))
  .setOutputCol("features")

//Stage 10 : créer/instancier le modèle de classification
val lr = new LogisticRegression()
  .setElasticNetParam(0.0)
  .setFitIntercept(true)
  .setFeaturesCol("features")
  .setLabelCol("final_status")
  .setStandardization(true)
  .setPredictionCol("predictions")
  .setRawPredictionCol("raw_predictions")
  .setThresholds(Array(0.7, 0.3))
  .setTol(1.0e-6)
  .setMaxIter(20)

//Stage 11: création du Pipeline
val pipeline = new Pipeline()
  .setStages(Array(tokenizer,remover,vectorizer,idf,indexercountry,indexercurrency,encoder, assembler,lr))

//Stage 12: split des données en training et test sets
val Array(train, test) = df.randomSplit(Array(0.9, 0.1), seed = 12345)

//Stage 13: entraînement du modèle
val model = pipeline.fit(train)

//Stage 14: Test du modèle
val dfWithSimplePredictions = model.transform(test)

dfWithSimplePredictions.groupBy("final_status", "predictions").count.show()

val evalution = new MulticlassClassificationEvaluator()
  .setLabelCol("final_status")
  .setMetricName("f1")
  .setPredictionCol("predictions")

//Stage 15: réglage des hyper-paramètres (a.k.a. tuning) du modèle - Grid search
val paramGrid = new ParamGridBuilder()
  .addGrid(lr.regParam, Array(10e-8, 10e-6, 10e-4, 10e-2))
  .addGrid(vectorizer.minDF,Array(55.0,75.0,95.0))
  .build()

val trainValidationSplit = new TrainValidationSplit()
  .setEstimator(pipeline)
  .setEvaluator(evalution)
  .setEstimatorParamMaps(paramGrid)
  .setTrainRatio(0.7)
  .setParallelism(2)

val model_grid = trainValidationSplit.fit(train)

//Stage 16: réglage des hyper-paramètres (a.k.a. tuning) du modèle - Test du modèle
val dfWithGridPredictions = model_grid.transform(test)

println("Le f1 score pour le test WithSimplePredictions est: ", evalution.evaluate(dfWithSimplePredictions))
println("Le f1 score pour le test WithGridPredictions est:", evalution.evaluate(dfWithGridPredictions))
println("Nous pouvons apercevoir que le score du test WithGridPredictions est mieux que le score du test WithSimplePredictions. Maintenant on va voir si le score du test WithGridPredictions atteind sa maximum, on va ajouter plus de parametres: ")

val paramGrid2 = new ParamGridBuilder()
  .addGrid(lr.regParam, Array(10e-8, 10e-7, 10e-6, 10e-5, 10e-4, 10e-3, 10e-2))
  .addGrid(vectorizer.minDF,Array(55.0,60.0,70.0,75.0,80.0,85.0,90.0,95.0))
  .build()

val trainValidationSplit2 = new TrainValidationSplit()
  .setEstimator(pipeline)
  .setEvaluator(evalution)
  .setEstimatorParamMaps(paramGrid2)
  .setTrainRatio(0.7)
  .setParallelism(2)

val model_grid2 = trainValidationSplit2.fit(train)

val dfWithGridPredictions2 = model_grid2.transform(test)

println("Le f1 score pour le test WithSimplePredictions est: ", evalution.evaluate(dfWithSimplePredictions))
println("Le f1 score pour le test WithGridPredictions est:", evalution.evaluate(dfWithGridPredictions))
println("Nous pouvons apercevoir que le score pour le test WithGridPredictions est toujours pareil. Cela vaut dire que la modele atteind tres probablement la meilleure performance sur les paramtres précédents.")


+------------+-----------+-----+
|final_status|predictions|count|
+------------+-----------+-----+
|           1|        0.0|  792|
|           0|        1.0| 3921|
|           1|        1.0| 2559|
|           0|        0.0| 3425|
+------------+-----------+-----+

(Le f1 score pour le test WithSimplePredictions est: ,0.569911631467866)
(Le f1 score pour le test WithGridPredictions est:,0.5685901480529418)
Nous pouvons apercevoir que le score du test WithGridPredictions est mieux que le score du test WithSimplePredictions. Maintenant on va voir si le score du test WithGridPredictions atteind sa maximum, on va ajouter plus de parametres: 
(Le f1 score pour le test WithSimplePredictions est: ,0.569911631467866)
(Le f1 score pour le test WithGridPredictions est:,0.5685901480529418)
Nous pouvons apercevoir que le score pour le test WithGridPredictions est toujours pareil. Cela vaut dire que la modele atteind tres probablement la meilleure performance sur les paramtres précédents.


import org.apache.spark.SparkConf
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.RegexTokenizer
import org.apache.spark.ml.feature.StopWordsRemover
import org.apache.spark.ml.feature.{HashingTF, IDF, Tokenizer}
import org.apache.spark.ml.feature.CountVectorizer
import org.apache.spark.ml.feature.StringIndexer
import org.apache.spark.ml.feature.OneHotEncoderEstimator
import org.apache.spark.ml.feature.VectorAssembler
import org.apache.spark.ml.Pipeline
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.sql.{DataFrame, SparkSession}
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@7e05f99
spark: org.apache.spa...