In [1]:
println(s"Current spark version is ${spark.version}")

Intitializing Scala interpreter ...

Spark Web UI available at http://210a44f35d45:4040
SparkContext available as 'sc' (version = 3.0.0, master = local[*], app id = local-1598191506104)
SparkSession available as 'spark'


Current spark version is 3.0.0


In [2]:
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, LongType, StringType}

val dataSchema = new StructType()
    .add("target", IntegerType)
    .add("id", LongType)
    .add("raw_timestamp", StringType)
    .add("query_status", StringType)
    .add("author", StringType)
    .add("tweet", StringType)

val dataPath= "/home/jovyan/data/training.1600000.processed.noemoticon.csv"

val raw_sentiment = spark.read
    .format("csv")
    .option("header",false)
    .schema(dataSchema)
    .load(dataPath)
    .selectExpr("(case when target=4 then 1 else 0 end) as label","tweet")

raw_sentiment.groupBy($"label").count.show

+-----+------+
|label| count|
+-----+------+
|    1|800000|
|    0|800000|
+-----+------+



import org.apache.spark.sql.types.{StructType, StructField, IntegerType, LongType, StringType}
dataSchema: org.apache.spark.sql.types.StructType = StructType(StructField(target,IntegerType,true), StructField(id,LongType,true), StructField(raw_timestamp,StringType,true), StructField(query_status,StringType,true), StructField(author,StringType,true), StructField(tweet,StringType,true))
dataPath: String = /home/jovyan/data/training.1600000.processed.noemoticon.csv
raw_sentiment: org.apache.spark.sql.DataFrame = [label: int, tweet: string]


In [3]:
//raw_sentiment.filter("label = 1").show(100, false)

In [5]:
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
val tokenizer = new Tokenizer()
    .setInputCol("tweet")
    .setOutputCol("words")

val hashingTF = new HashingTF()
    .setNumFeatures(1000)
    .setInputCol(tokenizer.getOutputCol)
    .setOutputCol("features")

// val lr = new LogisticRegression()
//     .setMaxIter(10)
//     .setRegParam(0.001)
val rf = new RandomForestClassifier()
  .setLabelCol("label")
  .setFeaturesCol(hashingTF.getOutputCol)
  .setNumTrees(10)

val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, rf))


import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
import org.apache.spark.ml.feature.{IndexToString, StringIndexer, VectorIndexer}
import org.apache.spark.ml.classification.{RandomForestClassificationModel, RandomForestClassifier}
tokenizer: org.apache.spark.ml.feature.Tokenizer = tok_5b3e2fd5355a
hashingTF: org.apache.spark.ml.feature.HashingTF = HashingTF: uid=hashingTF_ea283f04044a, binary=false, numFeatures=1000
rf: org.apache.spark.ml.classification.RandomForestClassifier = rfc_c551102eca04
p...


In [6]:
val model = pipeline.fit(raw_sentiment)

model: org.apache.spark.ml.PipelineModel = pipeline_6781f88228f0


In [7]:
model.write.overwrite().save("/home/jovyan/models/spark-ml-model")

In [8]:
val sameModel = PipelineModel.load("/home/jovyan/models/spark-ml-model")

sameModel: org.apache.spark.ml.PipelineModel = pipeline_6781f88228f0


In [9]:
val predictionsDF = sameModel.transform(raw_sentiment)

predictionsDF.show()

+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|label|               tweet|               words|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|    0|@switchfoot http:...|[@switchfoot, htt...|(1000,[10,21,81,1...|[4.82853511946235...|[0.48285351194623...|       1.0|
|    0|is upset that he ...|[is, upset, that,...|(1000,[121,193,20...|[4.75893820180163...|[0.47589382018016...|       1.0|
|    0|@Kenichan I dived...|[@kenichan, i, di...|(1000,[17,185,188...|[5.12521184154203...|[0.51252118415420...|       0.0|
|    0|my whole body fee...|[my, whole, body,...|(1000,[191,330,44...|[5.08705958038058...|[0.50870595803805...|       0.0|
|    0|@nationwideclass ...|[@nationwideclass...|(1000,[32,162,166...|[5.23795069436853...|[0.52379506943685...|       0.0|
|    0|@

predictionsDF: org.apache.spark.sql.DataFrame = [label: int, tweet: string ... 5 more fields]


In [10]:
predictionsDF.filter("label = 1").show(100)

+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|label|               tweet|               words|            features|       rawPrediction|         probability|prediction|
+-----+--------------------+--------------------+--------------------+--------------------+--------------------+----------+
|    1|I LOVE @Health4Ua...|[i, love, @health...|(1000,[17,216,240...|[4.79378079709192...|[0.47937807970919...|       1.0|
|    1|im meeting up wit...|[im, meeting, up,...|(1000,[20,26,29,1...|[5.08705958038058...|[0.50870595803805...|       0.0|
|    1|@DaRealSunisaKim ...|[@darealsunisakim...|(1000,[10,17,19,2...|[4.99354710998975...|[0.49935471099897...|       1.0|
|    1|Being sick can be...|[being, sick, can...|(1000,[73,76,112,...|[5.11956656772819...|[0.51195665677281...|       0.0|
|    1|@LovesBrooklyn2 h...|[@lovesbrooklyn2,...|(1000,[66,324,559...|[4.94152248273069...|[0.49415224827306...|       1.0|
|    1|@




In [11]:
predictionsDF.filter("label = 1 and prediction = 1").count()

res5: Long = 588582


In [12]:
predictionsDF.filter("label = 1").count()

res6: Long = 800000


In [8]:
// predictionsDF.schema
// predictionsDF.describe()
// predictionsDF.stat
predictionsDF.printSchema()

root
 |-- label: integer (nullable = false)
 |-- tweet: string (nullable = true)
 |-- words: array (nullable = true)
 |    |-- element: string (containsNull = true)
 |-- features: vector (nullable = true)
 |-- rawPrediction: vector (nullable = true)
 |-- probability: vector (nullable = true)
 |-- prediction: double (nullable = false)



In [13]:
import org.apache.spark.sql.functions._

val getProbability = udf((prediction: org.apache.spark.ml.linalg.Vector) => prediction(1))

import org.apache.spark.sql.functions._
getProbability: org.apache.spark.sql.expressions.UserDefinedFunction = SparkUserDefinedFunction($Lambda$4580/0x0000000841927040@740a4059,DoubleType,List(Some(class[value[0]: vector])),None,false,true)


In [15]:
predictionsDF.select(getProbability($"probability").alias("clean_probability")).show

+-------------------+
|  clean_probability|
+-------------------+
| 0.8467464788845905|
| 0.2719947263171818|
| 0.5705188849085713|
|0.46118183287283765|
|0.05123227139180558|
|  0.366597769438562|
| 0.4095537993787451|
| 0.9015464258815036|
|0.39991867201538905|
| 0.4152371193384714|
| 0.6408024160475768|
| 0.5167122483759568|
|   0.53312180624097|
|  0.355500164186055|
|0.05360343692502386|
| 0.1041160270943646|
| 0.3476012052729942|
| 0.4233147373497938|
| 0.6205675922280108|
| 0.6259620014110904|
+-------------------+
only showing top 20 rows

