In [1]:
println(s"Current spark version is ${spark.version}")

Current spark version is 2.4.4


In [2]:
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, LongType, StringType}

val dataSchema = new StructType()
    .add("target", IntegerType)
    .add("id", LongType)
    .add("raw_timestamp", StringType)
    .add("query_status", StringType)
    .add("author", StringType)
    .add("tweet", StringType)

    
val dataPath= "/home/jovyan/data/training.1600000.processed.noemoticon.csv"

val raw_sentiment = spark.read
    .format("csv")
    .option("header",false)
    .schema(dataSchema)
    .load(dataPath)
    .selectExpr("(case when target=4 then 1 else 0 end) as label","tweet")

raw_sentiment.groupBy($"label").count.show

+-----+------+
|label| count|
+-----+------+
|    1|800000|
|    0|800000|
+-----+------+



dataSchema = StructType(StructField(target,IntegerType,true), StructField(id,LongType,true), StructField(raw_timestamp,StringType,true), StructField(query_status,StringType,true), StructField(author,StringType,true), StructField(tweet,StringType,true))
dataPath = /home/jovyan/data/training.1600000.processed.noemoticon.csv
raw_sentiment = [label: int, tweet: string]


[label: int, tweet: string]

In [3]:
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer, VectorAssembler}
import org.apache.spark.ml.regression.{RandomForestRegressionModel, RandomForestRegressor}
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}

val seed = 5043
val Array(trainingData, testData) = raw_sentiment.randomSplit(Array(0.5, 0.5), seed)

val tokenizer = new Tokenizer()
    .setInputCol("tweet")
    .setOutputCol("words")

val hashingTF = new HashingTF()
    .setNumFeatures(1000)
    .setInputCol(tokenizer.getOutputCol)
    .setOutputCol("features")

val rf = new RandomForestClassifier()
  .setMaxDepth(3)
  .setNumTrees(50)
  .setFeatureSubsetStrategy("auto")
  .setSeed(seed)

val labelConverter = new IndexToString()
  .setInputCol("prediction")
  .setOutputCol("predictedLabel")

val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, rf))

seed = 5043
trainingData = [label: int, tweet: string]
testData = [label: int, tweet: string]


tokenizer: org.apache.sp...


[label: int, tweet: string]

In [None]:
val model = pipeline.fit(trainingData)

In [13]:
model.write.overwrite().save("/home/jovyan/models/spark-ml-model")
println("Model has been saved.")

Model has been saved.


In [14]:
val sameModel = PipelineModel.load("/home/jovyan/models/spark-ml-model")

sameModel = pipeline_678b938ed79d


pipeline_678b938ed79d

In [15]:
val predictionsDF = sameModel.transform(testData)

predictionsDF.show()

+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|label|               tweet|               words|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|    0|                 ...|[, , , , , , , , ...|(1000,[107,372],[...|[24.0054049043150...|[0.48010809808630...|       1.0|
|    0|                 ...|[, , , , , , , , ...|(1000,[25,109,329...|[25.5729907350614...|[0.51145981470122...|       0.0|
|    0|               ju...|[, , , , , , , , ...|(1000,[115,307,32...|[26.6228554042316...|[0.53245710808463...|       0.0|
|    0|             i ju...|[, , , , , , , , ...|(1000,[4,307,329,...|[26.6435968852502...|[0.53287193770500...|       0.0|
|    0|           FUCK YOU!|[, , , , , , , , ...|(1000,[372,599,82...|[23.8423666437398...|[0.47684733287479...|       1.0|
|    0| 

predictionsDF = [label: int, tweet: string ... 5 more fields]


[label: int, tweet: string ... 5 more fields]

In [16]:
import org.apache.spark.sql.functions._

val getProbability = udf((prediction: org.apache.spark.ml.linalg.Vector) => prediction(1))

getProbability = UserDefinedFunction(<function1>,DoubleType,Some(List(org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7)))


UserDefinedFunction(<function1>,DoubleType,Some(List(org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7)))

In [17]:
predictionsDF.select(getProbability($"probability").alias("clean_probability")).show(10)

+-------------------+
|  clean_probability|
+-------------------+
| 0.5198919019136995|
|  0.488540185298772|
| 0.4675428919153667|
| 0.4671280622949945|
| 0.5231526671252036|
|0.47419280269610153|
|0.47819370484327733|
| 0.4811901755024816|
| 0.4651760771370384|
| 0.4883284909371508|
+-------------------+
only showing top 10 rows

