In [1]:
println(s"Current spark version is ${spark.version}")

Current spark version is 2.4.5


In [2]:
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, LongType, StringType}

val dataSchema = new StructType()
    .add("target", IntegerType)
    .add("id", LongType)
    .add("raw_timestamp", StringType)
    .add("query_status", StringType)
    .add("author", StringType)
    .add("tweet", StringType)

val dataPath= "/home/jovyan/data/training.1600000.processed.noemoticon.csv"

val raw_sentiment = spark.read
    .format("csv")
    .option("header",false)
    .schema(dataSchema)
    .load(dataPath)
    .selectExpr("(case when target=4 then 1 else 0 end) as label","tweet")

raw_sentiment.groupBy($"label").count.show

+-----+------+
|label| count|
+-----+------+
|    1|800000|
|    0|800000|
+-----+------+



dataSchema = StructType(StructField(target,IntegerType,true), StructField(id,LongType,true), StructField(raw_timestamp,StringType,true), StructField(query_status,StringType,true), StructField(author,StringType,true), StructField(tweet,StringType,true))
dataPath = /home/jovyan/data/training.1600000.processed.noemoticon.csv
raw_sentiment = [label: int, tweet: string]


[label: int, tweet: string]

In [4]:
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.RandomForestClassifier
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

val tokenizer = new Tokenizer()
    .setInputCol("tweet")
    .setOutputCol("words")

val hashingTF = new HashingTF()
    .setNumFeatures(1000)
    .setInputCol(tokenizer.getOutputCol)
    .setOutputCol("features")

val lr = new RandomForestClassifier()
    .setNumTrees(50)
    .setMaxDepth(5)

val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, lr))


tokenizer = tok_3174d00ccda8
hashingTF = hashingTF_c2fa39d5a8ca
lr = rfc_54d93d43b418
pipeline = pipeline_48664a2a5b21


pipeline_48664a2a5b21

In [13]:
val model = pipeline.fit(raw_sentiment)

model = pipeline_af9a25246ddb


pipeline_af9a25246ddb

In [14]:
model.write.overwrite().save("/home/jovyan/models/spark-ml-model")

In [5]:
val sameModel = PipelineModel.load("/home/jovyan/models/spark-ml-model")

sameModel = pipeline_af9a25246ddb


pipeline_af9a25246ddb

In [6]:
val predictionsDF = sameModel.transform(raw_sentiment)

predictionsDF.select('tweet, 'label, 'prediction, 'probability).show()

+--------------------+-----+----------+--------------------+
|               tweet|label|prediction|         probability|
+--------------------+-----+----------+--------------------+
|@switchfoot http:...|    0|       1.0|[0.45106954307907...|
|is upset that he ...|    0|       0.0|[0.50429915856565...|
|@Kenichan I dived...|    0|       0.0|[0.54912760289376...|
|my whole body fee...|    0|       0.0|[0.50898221416942...|
|@nationwideclass ...|    0|       0.0|[0.53941852087146...|
|@Kwesidei not the...|    0|       0.0|[0.50533888223422...|
|         Need a hug |    0|       1.0|[0.48850973486230...|
|@LOLTrish hey  lo...|    0|       1.0|[0.44696064103529...|
|@Tatiana_K nope t...|    0|       1.0|[0.49295063562509...|
|@twittera que me ...|    0|       1.0|[0.48484592711371...|
|spring break in p...|    0|       1.0|[0.48191057084606...|
|I just re-pierced...|    0|       0.0|[0.53300137589424...|
|@caregiving I cou...|    0|       0.0|[0.54481750937258...|
|@octolinz16 It it...|  

predictionsDF = [label: int, tweet: string ... 5 more fields]


[label: int, tweet: string ... 5 more fields]

In [10]:
// predictionsDF.schema
// predictionsDF.describe()
// predictionsDF.stat
predictionsDF.printSchema()

root
 |-- label: integer (nullable = false)
 |-- tweet: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [7]:
import org.apache.spark.sql.functions._

val getProbability = udf((prediction: org.apache.spark.ml.linalg.Vector) => prediction(1))

getProbability = UserDefinedFunction(<function1>,DoubleType,Some(List(org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7)))


UserDefinedFunction(<function1>,DoubleType,Some(List(org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7)))

In [8]:
predictionsDF
    .withColumn("match", 'label === 'prediction)
    .select('tweet, 'label, 'prediction, 'match,
            getProbability($"probability").alias("1probability"))
    .groupBy('match)
    .count
    .show

+-----+-------+
|match|  count|
+-----+-------+
| true|1034786|
|false| 565214|
+-----+-------+



In [9]:
spark.stop()