### Download data

In [None]:
! wget http://cs.stanford.edu/people/alecmgo/trainingandtestdata.zip -O ./data/sentiment.zip


In [None]:
! cd /home/jovyan/work/data && unzip sentiment.zip

### import Data

In [1]:
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer, HashingTF, Tokenizer, StopWordsRemover, RegexTokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, LongType, StringType}
import org.apache.spark.sql.functions._

println(s"Current spark version is ${spark.version}")

Intitializing Scala interpreter ...

Spark Web UI available at http://eee0fd17218b:4040
SparkContext available as 'sc' (version = 2.4.5, master = local[*], app id = local-1586786518617)
SparkSession available as 'spark'


Current spark version is 2.4.5


import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.evaluation.MulticlassClassificationEvaluator
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer, HashingTF, Tokenizer, StopWordsRemover, RegexTokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, LongType, StringType}
import org.apache.spark.sql.functions._


In [2]:
// import data 
val dataSchema = new StructType()
    .add("target", IntegerType)
    .add("id", LongType)
    .add("raw_timestamp", StringType)
    .add("query_status", StringType)
    .add("author", StringType)
    .add("tweet", StringType)

val dataPath= "./data/training.1600000.processed.noemoticon.csv"

val data = spark.read
    .format("csv")
    .option("header",false)
    .schema(dataSchema)
    .load(dataPath)
    //.withColumn("cleaned_tweet", regexp_replace(regexp_replace(regexp_replace(regexp_replace($"tweet", "@\\w+", ""), "http\\S+", ""), "#\\w+", ""), "[0-9]", "")) // remove mentioned users, links,  hashtags and numbers
    .selectExpr("(case when target=4 then 1 else 0 end) as label", "trim(tweet) as tweet")

// split into train/test
val Array(trainData, testData) = data.randomSplit(Array(0.7, 0.3))

val cleanedTrainData = trainData
    .withColumn("cleaned_tweet", regexp_replace(regexp_replace(regexp_replace(regexp_replace($"tweet", "@\\w+", ""), "http\\S+", ""), "#\\w+", ""), "[0-9]", "")) // remove mentioned users, links,  hashtags and numbers
    .selectExpr("label", "cleaned_tweet as tweet")

val cleanedTestData = testData
    .withColumn("cleaned_tweet", regexp_replace(regexp_replace(regexp_replace(regexp_replace($"tweet", "@\\w+", ""), "http\\S+", ""), "#\\w+", ""), "[0-9]", "")) // remove mentioned users, links,  hashtags and numbers
    .selectExpr("label", "cleaned_tweet as tweet")

// check
// cleanedTrainData.groupBy($"label").count.show
// cleanedTrainData(5, false)
// cleanedTestData.groupBy($"label").count.show
// cleanedTestData(5, false)

dataSchema: org.apache.spark.sql.types.StructType = StructType(StructField(target,IntegerType,true), StructField(id,LongType,true), StructField(raw_timestamp,StringType,true), StructField(query_status,StringType,true), StructField(author,StringType,true), StructField(tweet,StringType,true))
dataPath: String = ./data/training.1600000.processed.noemoticon.csv
data: org.apache.spark.sql.DataFrame = [label: int, tweet: string]
trainData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [label: int, tweet: string]
testData: org.apache.spark.sql.Dataset[org.apache.spark.sql.Row] = [label: int, tweet: string]
cleanedTrainData: org.apache.spark.sql.DataFrame = [label: int, tweet: string]
cleanedTestData: org.apache.spark.sql.DataFrame = [label: int, tweet: string]


### Create Pipeline

In [3]:
// A tokenizer that converts the input string to lowercase and then splits it by white spaces.
val tokenizer = new RegexTokenizer()
    .setInputCol("tweet")
    .setOutputCol("words")
    .setPattern("\\W+")
    .setMinTokenLength(2) // remove <2 char words

// remove stop words
val filteredTokenizer = new StopWordsRemover()
    .setInputCol(tokenizer.getOutputCol)
    .setOutputCol("filtered_words")

// transforms the rows for the input column into a sparse term frequency vector.
val hashingTF = new HashingTF()
    .setNumFeatures(1000)
    .setInputCol(filteredTokenizer.getOutputCol)
    .setOutputCol("features")

// create a RandomForest model.
val rf = new RandomForestClassifier()
  .setLabelCol("label")
  .setFeaturesCol("features")
  .setNumTrees(10)

// create pipeline
val pipeline = new Pipeline()
  .setStages(Array(tokenizer, filteredTokenizer, hashingTF, rf))

// check
//hashingTF.transform(filteredTokenizer.transform(tokenizer.transform(data))).show(5, false)

tokenizer: org.apache.spark.ml.feature.RegexTokenizer = regexTok_54848d624aaf
filteredTokenizer: org.apache.spark.ml.feature.StopWordsRemover = stopWords_b0b33133565f
hashingTF: org.apache.spark.ml.feature.HashingTF = hashingTF_ffb0f5fffbc1
rf: org.apache.spark.ml.classification.RandomForestClassifier = rfc_6969fd6a7d46
pipeline: org.apache.spark.ml.Pipeline = pipeline_e1c3d5ad6049


### Train && Prediction

In [4]:
// Train model.
val model = pipeline.fit(cleanedTrainData)
model.write.overwrite().save("./models/spark-ml-model")

model: org.apache.spark.ml.PipelineModel = pipeline_e1c3d5ad6049


In [5]:
// Make predictions.
val sameModel = PipelineModel.load("./models/spark-ml-model")
val predictionsDF = sameModel.transform(cleanedTestData)
predictionsDF.show(10, false)
predictionsDF.printSchema()

val getProbability = udf((prediction: org.apache.spark.ml.linalg.Vector) => prediction(1))
predictionsDF.select(getProbability($"probability").alias("clean_probability")).show

+-----+-----------------------------------------------------------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------------------------------------------------------------+------------------------------------------------------------------------------------------------------------------+----------------------------------------------------------------------------------------------------------------+---------------------------------------+----------------------------------------+----------+
|label|tweet                                                                                                                                    |words                                                                                                                                      |filtered_words                                                                                               

sameModel: org.apache.spark.ml.PipelineModel = pipeline_e1c3d5ad6049
predictionsDF: org.apache.spark.sql.DataFrame = [label: int, tweet: string ... 6 more fields]
getProbability: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,DoubleType,Some(List(org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7)))


In [None]:
spark.stop()