In [1]:
println(s"Current spark version is ${spark.version}")

Intitializing Scala interpreter ...

Spark Web UI available at http://dd486c79897e:4041
SparkContext available as 'sc' (version = 3.0.0, master = local[*], app id = local-1598872629226)
SparkSession available as 'spark'


Current spark version is 3.0.0


In [2]:
import org.apache.spark.sql.types.{StructType, StructField, IntegerType, LongType, StringType}

val dataSchema = new StructType()
    .add("target", IntegerType)
    .add("id", LongType)
    .add("raw_timestamp", StringType)
    .add("query_status", StringType)
    .add("author", StringType)
    .add("tweet", StringType)

val dataPath= "/home/jovyan/data/training.1600000.processed.noemoticon.csv"

val raw_sentiment = spark.read
    .format("csv")
    .option("header",false)
    .schema(dataSchema)
    .load(dataPath)
    .selectExpr("(case when target=4 then 1 else 0 end) as label","tweet")

raw_sentiment.groupBy($"label").count.show

+-----+------+
|label| count|
+-----+------+
|    1|800000|
|    0|800000|
+-----+------+



import org.apache.spark.sql.types.{StructType, StructField, IntegerType, LongType, StringType}
dataSchema: org.apache.spark.sql.types.StructType = StructType(StructField(target,IntegerType,true), StructField(id,LongType,true), StructField(raw_timestamp,StringType,true), StructField(query_status,StringType,true), StructField(author,StringType,true), StructField(tweet,StringType,true))
dataPath: String = /home/jovyan/data/training.1600000.processed.noemoticon.csv
raw_sentiment: org.apache.spark.sql.DataFrame = [label: int, tweet: string]


In [3]:
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row

val tokenizer = new Tokenizer()
    .setInputCol("tweet")
    .setOutputCol("words")

val hashingTF = new HashingTF()
    .setNumFeatures(1000)
    .setInputCol(tokenizer.getOutputCol)
    .setOutputCol("features")

val lr = new LogisticRegression()
    .setMaxIter(10)
    .setRegParam(0.001)

val pipeline = new Pipeline()
  .setStages(Array(tokenizer, hashingTF, lr))


import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.ml.classification.LogisticRegression
import org.apache.spark.ml.feature.{HashingTF, Tokenizer}
import org.apache.spark.ml.linalg.Vector
import org.apache.spark.sql.Row
tokenizer: org.apache.spark.ml.feature.Tokenizer = tok_e71f2a5c741d
hashingTF: org.apache.spark.ml.feature.HashingTF = HashingTF: uid=hashingTF_870f83a7e62b, binary=false, numFeatures=1000
lr: org.apache.spark.ml.classification.LogisticRegression = logreg_d1726e1bc19c
pipeline: org.apache.spark.ml.Pipeline = pipeline_76a1089f972d


In [None]:
val model = pipeline.fit(raw_sentiment)

In [None]:
model.write.overwrite().save("/home/jovyan/models/spark-ml-model")

In [None]:
val sameModel = PipelineModel.load("/home/jovyan/models/spark-ml-model")

In [None]:
val predictionsDF = sameModel.transform(raw_sentiment)

predictionsDF.show()

In [None]:
// predictionsDF.schema
// predictionsDF.describe()
// predictionsDF.stat
predictionsDF.printSchema()

In [None]:
import org.apache.spark.sql.functions._

val getProbability = udf((prediction: org.apache.spark.ml.linalg.Vector) => prediction(1))

In [None]:
predictionsDF.select(getProbability($"probability").alias("clean_probability")).show