In [1]:
import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.SparkConf
import org.apache.spark.ml.feature.{RegexTokenizer,StopWordsRemover,
                                    StringIndexer,CountVectorizer,
                                    CountVectorizerModel,VectorAssembler,
                                   IDF,OneHotEncoderEstimator}

import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}

// import org.apache.spark.mllib.evaluation.BinaryClassificationMetrics
// import org.apache.spark.ml.evaluation
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator

import org.apache.spark.ml.param.ParamMap


val conf = new SparkConf().setAll(Map(
      "spark.scheduler.mode" -> "FIFO",
      "spark.speculation" -> "false",
      "spark.reducer.maxSizeInFlight" -> "48m",
      "spark.serializer" -> "org.apache.spark.serializer.KryoSerializer",
      "spark.kryoserializer.buffer.max" -> "1g",
      "spark.shuffle.file.buffer" -> "32k",
      "spark.default.parallelism" -> "12",
      "spark.sql.shuffle.partitions" -> "12",
      "spark.driver.maxResultSize" -> "2g"
    ))

val spark = SparkSession
  .builder
  .config(conf)
  .appName("TP Spark : Twitter")
  .getOrCreate()


/*******************************************************************************
  *
  *       TP 3
  *
  *       - lire le fichier sauvegarder précédemment
  *       - construire les Stages du pipeline, puis les assembler
  *       - trouver les meilleurs hyperparamètres pour l'entraînement du pipeline avec une grid-search
  *       - Sauvegarder le pipeline entraîné
  *
  *       if problems with unimported modules => sbt plugins update
  *
  ********************************************************************************/

println("hello world ! from Trainer")

Intitializing Scala interpreter ...

Spark Web UI available at http://jorge-UX510UWK.attlocal.net:4040
SparkContext available as 'sc' (version = 2.4.4, master = local[*], app id = local-1577819658322)
SparkSession available as 'spark'


hello world ! from Trainer


import org.apache.spark.SparkConf
import org.apache.spark.sql.{DataFrame, SparkSession}
import org.apache.spark.sql.functions._
import org.apache.spark.SparkConf
import org.apache.spark.ml.feature.{RegexTokenizer, StopWordsRemover, StringIndexer, CountVectorizer, CountVectorizerModel, VectorAssembler, IDF, OneHotEncoderEstimator}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.tuning.{ParamGridBuilder, TrainValidationSplit}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.param.ParamMap
conf: org.apache.spark.SparkConf = org.apache.spark.SparkConf@1f9f9a4
spark: org.apache.spark.sql.SparkSession = org.apache.spark.sql.SparkSession@359847a9


In [2]:
val df:DataFrame = spark
        .read
        .option("header", true)
        .option("separator",",")
        .option("inferSchema", "true")
        .parquet("data.parquet")
        

// df.select("project_id", "name", "desc", "goal").show(5)
// df.select("keywords", "final_status", "country2", "currency2").show(5)
// df.select("deadline2", "created_at2", "launched_at2", "days_campaign").show(5)
// df.select("hours_prepa", "text")
// df.select("label").show()
val cdf = df.drop("__index_level_0__")

df: org.apache.spark.sql.DataFrame = [text: string, words: array<string> ... 2 more fields]
cdf: org.apache.spark.sql.DataFrame = [text: string, words: array<string> ... 1 more field]


In [17]:
val tokenizer = new RegexTokenizer()
  .setPattern("\\W+")
  .setGaps(true)
  .setInputCol("text")
  .setOutputCol("tokens")
// val dfTokenized = tokenizer.transform(cdf)

val remover = new StopWordsRemover()
  .setInputCol(tokenizer.getOutputCol)
  .setOutputCol("filtered")

// val dfsw = remover.transform(dfTokenized)

val cvModel: CountVectorizer = new CountVectorizer()
    .setInputCol(remover.getOutputCol)
    .setOutputCol("vect")
    .setMinDF(50)

// val dfv = cvModel.fit(dfsw).transform(dfsw)

val idf = new IDF()
    .setInputCol(cvModel.getOutputCol)
    .setOutputCol("tfidf")

// val dfidf:DataFrame = idf.fit(dfv).transform(dfv)

val assembler = new VectorAssembler()
  .setInputCols(Array("tfidf"))
  .setOutputCol("features")

// val df_f = assembler.transform(dfidf).drop("text","words","tokens","vect","tfidf","filtered")

val lr = new LogisticRegression()
  .setElasticNetParam(0.0)
  .setFitIntercept(true)
  .setFeaturesCol("features")
  .setLabelCol("label")
  .setStandardization(true)
  .setPredictionCol("predictions")
  .setRawPredictionCol("raw_predictions")
  .setThresholds(Array(0.7, 0.3))
  .setTol(1.0e-6)
  .setMaxIter(20)



tokenizer: org.apache.spark.ml.feature.RegexTokenizer = regexTok_2876c84fc117
remover: org.apache.spark.ml.feature.StopWordsRemover = stopWords_0512eca57223
cvModel: org.apache.spark.ml.feature.CountVectorizer = cntVec_389e1c7061f2
idf: org.apache.spark.ml.feature.IDF = idf_c232dcaeb869
assembler: org.apache.spark.ml.feature.VectorAssembler = vecAssembler_47e38170de6c
lr: org.apache.spark.ml.classification.LogisticRegression = logreg_6058d0ef4f32


In [18]:
val pipeline = new Pipeline()
  .setStages(Array(tokenizer, remover,cvModel,idf, assembler,lr ))

pipeline: org.apache.spark.ml.Pipeline = pipeline_a168d63875b8


In [22]:
val model = pipeline.fit(cdf)

model: org.apache.spark.ml.PipelineModel = pipeline_a168d63875b8


In [46]:
val Array(train,test) = df.randomSplit(Array[Double](0.8, 0.2))
val size = (train.count,test.count)

val predictions = model.transform(test)
predictions.select("label","predictions","probability").show(100)

val evaluator = new MulticlassClassificationEvaluator()
      .setLabelCol("label")
      .setPredictionCol("predictions")
      .setMetricName("f1")

val f1 = evaluator.evaluate(predictions)
println("Test set accuracy = " + f1)

+-----+-----------+--------------------+
|label|predictions|         probability|
+-----+-----------+--------------------+
|    0|        0.0|[0.97928474532398...|
|    0|        0.0|[0.99956950473540...|
|    1|        1.0|[2.43898442362542...|
|    0|        0.0|[0.99961480568612...|
|    1|        1.0|[8.54685767194050...|
|    1|        1.0|[1.93431769717673...|
|    1|        1.0|[2.41317265429764...|
|    0|        0.0|[0.84248355487316...|
|    1|        0.0|[0.73792895281215...|
|    0|        0.0|[0.99999606299803...|
|    0|        0.0|[0.98488989028026...|
|    0|        0.0|[0.99970820005681...|
|    1|        1.0|[6.07360426761614...|
|    1|        1.0|[1.51391632357596...|
|    1|        1.0|[2.34438019719837...|
|    1|        1.0|[0.03215887160417...|
|    0|        0.0|[0.92983114913638...|
|    0|        0.0|[0.93549382588334...|
|    0|        0.0|[0.99980005710904...|
|    0|        0.0|[0.86232663151075...|
|    0|        0.0|[0.99968103804289...|
|    0|        0

train: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [text: string, words: array<string> ... 2 more fields]
test: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [text: string, words: array<string> ... 2 more fields]
size: (Long, Long) = (2490,620)
predictions: org.apache.spark.sql.DataFrame = [text: string, words: array<string> ... 10 more fields]
evaluator: org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator = mcEval_458e8f61e5ff
f1: Double = 0.9691296428994499


In [None]:
val a = "Maldito evo morales"
model.predict('')

In [94]:
val df_test:DataFrame = spark
        .read
        .option("header", true)
        .option("inferSchema", "true")
        .parquet("test.parquet")

val a = model.transform(df_test)

a.select("text","predictions","probability").show(100)
// #.show(100,truncate=false)



+--------------------+-----------+--------------------+
|                text|predictions|         probability|
+--------------------+-----------+--------------------+
|ÚLTIMA HORA | Can...|        1.0|[0.46320228452856...|
|Lo de los trolls ...|        1.0|[0.04020302526845...|
|@Marvazquez92 @Se...|        1.0|[0.17429679658708...|
|Michelle Bachelet...|        1.0|[0.09837693466784...|
|El embajador aseg...|        1.0|[0.23637814709972...|
|#NotiMippCI 📰🗞|...|        1.0|[0.35945668593017...|
|#Bolivia : En men...|        0.0|[0.97557900070806...|
|La policía de Bol...|        1.0|[0.00376085749873...|
|Ante la complicid...|        1.0|[0.00970450781144...|
|Así actúan las di...|        1.0|[0.05964818388108...|
|#ENMundo Bolivia ...|        1.0|[0.36970742007131...|
|MÁS CLARO AGUA. C...|        1.0|[0.08982487716934...|
|Denuncian cruelda...|        1.0|[0.12665341113813...|
|Uno de los objeti...|        1.0|[0.38564724971603...|
|La minoría fascis...|        1.0|[0.1480012600210

df_test: org.apache.spark.sql.DataFrame = [created_at: string, text: string ... 9 more fields]
a: org.apache.spark.sql.DataFrame = [created_at: string, text: string ... 17 more fields]
