In [1]:
println(s"Current spark version is ${spark.version}")

Current spark version is 2.4.0


In [10]:
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, LongType, StringType}

val dataSchema = new StructType()
    .add("target", IntegerType)
    .add("id", LongType)
    .add("raw_timestamp", StringType)
    .add("query_status", StringType)
    .add("author", StringType)
    .add("tweet", StringType)

    
val dataPath= "/home/jovyan/data/training.1600000.processed.noemoticon.csv"

val raw_sentiment = spark.read
    .format("csv")
    .option("header",false)
    .schema(dataSchema)
    .load(dataPath)
    .selectExpr("(case when target=4 then 1 else 0 end) as label","tweet")

//val Array(trainingData, testData) = raw_sentiment.randomSplit(Array(1, 0.3)) 

val trainingData = raw_sentiment
trainingData.groupBy($"label").count.show

+-----+------+
|label| count|
+-----+------+
|    1|800000|
|    0|800000|
+-----+------+



dataSchema = StructType(StructField(target,IntegerType,true), StructField(id,LongType,true), StructField(raw_timestamp,StringType,true), StructField(query_status,StringType,true), StructField(author,StringType,true), StructField(tweet,StringType,true))
dataPath = /home/jovyan/data/training.1600000.processed.noemoticon.csv
raw_sentiment = [label: int, tweet: string]
trainingData = [label: int, tweet: string]


[label: int, tweet: string]

In [11]:
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}

val labelIndexer = new StringIndexer()
  .setInputCol("label")
  .setOutputCol("indexedLabel")
  .fit(raw_sentiment)

val tokenizer = new Tokenizer()
    .setInputCol("tweet")
    .setOutputCol("words")

val hashingTF = new HashingTF()
    .setInputCol(tokenizer.getOutputCol)
    .setOutputCol("features")
    .setNumFeatures(1000)

// Convert indexed labels back to original labels.
val labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")
  .setLabels(labelIndexer.labels)

val rf = new RandomForestClassifier()
  .setLabelCol("indexedLabel")
  .setFeaturesCol("features")
  .setNumTrees(10)

val pipeline = new Pipeline()
   .setStages(Array(labelIndexer, tokenizer, hashingTF, rf, labelConverter))


labelIndexer = strIdx_b7cb0c790bb2
tokenizer = tok_bb1ed8d27b86
hashingTF = hashingTF_dde35dadafbc
labelConverter = idxToStr_0baac0076352
rf = ...


...

In [12]:
val model = pipeline.fit(trainingData)
model.write.overwrite().save("/home/jovyan/models/spark-ml-model-rf2")



model = pipeline_b1b68980d298
sameModel = pipeline_b1b68980d298


pipeline_b1b68980d298

In [None]:
val sameModel = PipelineModel.load("/home/jovyan/models/spark-ml-model-rf2")
sameModel.transform(raw_sentiment).write.mode("overwrite").parquet("/home/jovyan/data/spark-ml-model-rf")

In [6]:
import org.apache.spark.sql.functions._
val getProbability = udf((prediction: org.apache.spark.ml.linalg.Vector, pos:Integer) => prediction(pos))
val predictionsDF = spark.read.parquet("/home/jovyan/data/spark-ml-model-rf")
         .withColumn("negative_probability",getProbability($"probability",lit(0)))
         .withColumn("positive_probability",getProbability($"probability",lit(1)))
predictionsDF.printSchema
predictionsDF.limit(10).show()

root
 |-- label: integer (nullable = true)
 |-- tweet: string (nullable = true)
 |-- indexedLabel: double (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = true)
 |-- predictedLabel: string (nullable = true)
 |-- negative_probability: double (nullable = false)
 |-- positive_probability: double (nullable = false)

+-----+--------------------+------------+--------------------+--------------------+--------------------+--------------------+----------+--------------+--------------------+--------------------+
|label|               tweet|indexedLabel|               words|            features|       rawPrediction|         probability|prediction|predictedLabel|negative_probability|positive_probability|
+-----+--------------------+------------+--------------------+-------------------

getProbability = UserDefinedFunction(<function2>,DoubleType,Some(List(org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7, IntegerType)))
predictionsDF = [label: int, tweet: string ... 9 more fields]


[label: int, tweet: string ... 9 more fields]

In [34]:
//Positive labels
predictionsDF
.select($"predictedLabel"
        ,$"label"
        ,$"tweet"
        ,$"prediction"
        ,$"positive_probability"
        ,$"negative_probability"
       )
.filter("predictedLabel=1")
.orderBy($"positive_probability".desc)
.limit(10)
.show(false)

+--------------+-----+------------------------------------------------------------------------------------------------------------------------------------------+----------+--------------------+--------------------+
|predictedLabel|label|tweet                                                                                                                                     |prediction|positive_probability|negative_probability|
+--------------+-----+------------------------------------------------------------------------------------------------------------------------------------------+----------+--------------------+--------------------+
|1             |1    |@OrtalS thank you babe &lt;3 and thanks for the candies! you want me to get fat, huh? just kidding. it's very sweet                       |1.0       |0.6464646760062969  |0.35353532399370313 |
|1             |1    |@denvercheetoh I thought I blew it with you when I endorsed Dunkin Donuts (kicks ass baby!). Thanks friend.           

In [32]:
//Negative labels
predictionsDF
.select($"predictedLabel"
        ,$"label"
        ,$"tweet"
        ,$"prediction"
        ,$"positive_probability"
        ,$"negative_probability"
       )
.filter("predictedLabel=0")
.orderBy($"negative_probability".desc)
.limit(10)
.show(false)

+--------------+-----+--------------------------------------------------------------------------------------------------------------------------------------------------+----------+--------------------+--------------------+
|predictedLabel|label|tweet                                                                                                                                             |prediction|positive_probability|negative_probability|
+--------------+-----+--------------------------------------------------------------------------------------------------------------------------------------------------+----------+--------------------+--------------------+
|0             |0    |I got stuck @ work doing overtime and I'm not happy. I miss my baby like crazy . I need to be right next to you, lying in your arms.              |0.0       |0.295349669140342   |0.7046503308596579  |
|0             |0    |this feeling like I'm gonna puke is called 'sadness' right? I didn't know how bad I'd 

In [33]:
//Forecast accuracy
println(predictionsDF.filter("predictedLabel<>label").count)
println(predictionsDF.count)
println(100.0*predictionsDF.filter("predictedLabel=label").count/predictionsDF.count )

615966
1600000
61.502125
