In [1]:
import org.apache.spark.sql.types.{StructType, StringType}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
import org.apache.spark.streaming._

In [2]:
println(s"Current spark version is ${spark.version}")

Current spark version is 2.4.4


In [3]:
val inputStreamPath = "/home/jovyan/data/events-stream"

val dataSchema = new StructType()
    .add("tweet", StringType)

val inputDF = spark
    .readStream
    .schema(dataSchema)
    .option("maxFilesPerTrigger", 1)
    .json(inputStreamPath)

inputStreamPath = /home/jovyan/data/events-stream
dataSchema = StructType(StructField(tweet,StringType,true))
inputDF = [tweet: string]


[tweet: string]

In [4]:
val modelPath = "/home/jovyan/models/spark-ml-model"

val model = PipelineModel.load(modelPath)

modelPath = /home/jovyan/models/spark-ml-model
model = pipeline_9dba2f215e93


pipeline_9dba2f215e93

In [5]:
val getNegativeProbability = 
    udf((prediction: org.apache.spark.ml.linalg.Vector) => prediction(1))

getNegativeProbability = UserDefinedFunction(<function1>,DoubleType,Some(List(org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7)))


UserDefinedFunction(<function1>,DoubleType,Some(List(org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7)))

In [6]:
val predictionsDF = model
    .transform(inputDF)
    .withColumn("neg_probability", getNegativeProbability($"probability"))
    .select($"tweet", $"prediction", $"neg_probability")

predictionsDF = [tweet: string, prediction: double ... 1 more field]


[tweet: string, prediction: double ... 1 more field]

In [7]:
val predictions_stream = predictionsDF.writeStream.foreachBatch {
    (batchDF: DataFrame, batchId: Long) => batchDF.show()
}.start()

predictions_stream = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@6829cc79


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@6829cc79

+--------------------+----------+-------------------+
|               tweet|prediction|    neg_probability|
+--------------------+----------+-------------------+
|@Spitphyre ohhhh ...|       1.0| 0.5229983027094914|
|aww no pubefros? ...|       0.0|0.42704340603968527|
|does NOT want to ...|       0.0| 0.4951221335857083|
|have to go to wor...|       0.0|0.48464601632498255|
|is going to be - ...|       0.0| 0.4471516640537588|
|@barbiesdead i wa...|       0.0| 0.4763973438000497|
|@DanielKGlenn Ok ...|       0.0|0.36091964624114514|
|@Zzazz  my weeken...|       0.0|0.49735163144942296|
|I'm up, but I wen...|       0.0|0.46006371760813325|
|@jenswitzig &quot...|       0.0| 0.4702047324259545|
|watching funny mo...|       1.0| 0.5377644764806815|
|@betaboy Arrrrrgh...|       1.0| 0.5259401762003582|
|@Ilkee yeah, too ...|       0.0| 0.4900060643943062|
|Oh i love that mo...|       0.0| 0.4846421793506976|
|@rainbowbtrfly If...|       1.0| 0.5356092962905977|
|hmmmmm. really li...|      

+--------------------+----------+-------------------+
|               tweet|prediction|    neg_probability|
+--------------------+----------+-------------------+
|ok, considering i...|       0.0|0.48074564695038413|
|my procrastinatio...|       0.0| 0.4730724834964919|
|My sunburn huuuuu...|       1.0| 0.5217622596942644|
|Missed the bus......|       1.0| 0.5217622596942644|
|At work. Unfortun...|       0.0| 0.4837707816928815|
|blimey it is prop...|       0.0| 0.4496439525208043|
|@BronteFan2 What ...|       1.0| 0.5387450342438135|
|@rvailleux : woot...|       0.0| 0.4898415707664526|
|@officialTila Hea...|       1.0| 0.5217622596942644|
|is watching GG.. ...|       1.0| 0.5217622596942644|
|is thinking &quot...|       1.0| 0.5217622596942644|
|Taking a break fr...|       1.0| 0.5176311116042892|
|is waiting for @a...|       0.0| 0.4987018737006327|
|Just got the new ...|       1.0| 0.5217622596942644|
|@ash_786 I like y...|       0.0| 0.4920798462889506|
+--------------------+------

+--------------------+----------+-------------------+
|               tweet|prediction|    neg_probability|
+--------------------+----------+-------------------+
|@drtiet I am in s...|       0.0| 0.4962577627950444|
|        @whitespats |       1.0| 0.5217622596942644|
|Mafia Wars is exp...|       0.0| 0.4547069085821989|
|I really do not k...|       0.0| 0.4668140696203028|
| leaving home again |       1.0| 0.5217622596942644|
|watching SP1 inst...|       1.0| 0.5217622596942644|
|It's soooo hot. I...|       0.0| 0.4797476381178886|
|Wohoo! Going to s...|       1.0| 0.5060155281599423|
|happy mothers day...|       1.0|  0.511342546438851|
|@DcTannerHo i lov...|       0.0|0.46097340648721596|
|Wanna follow more...|       1.0| 0.5000718244764134|
|lovin my time in ...|       1.0| 0.5217622596942644|
|Study..study..stu...|       1.0| 0.5346279686983664|
|Thought I'd mix u...|       1.0| 0.5176311116042892|
|@ZnaTrainer My pl...|       1.0| 0.5217622596942644|
+--------------------+------

+--------------------+----------+-------------------+
|               tweet|prediction|    neg_probability|
+--------------------+----------+-------------------+
|mothers day sucks...|       1.0| 0.5094300515232023|
|Def cant b home f...|       0.0|0.47090663712636494|
|@NinaChantele Hop...|       0.0|  0.495651777362445|
|@Palchan Rest its...|       1.0| 0.5217622596942644|
|SOB! I hate going...|       0.0| 0.4654730959947928|
|Relaxing...not go...|       1.0| 0.5346279686983664|
|I'm not feeling h...|       1.0| 0.5210191607616257|
|@intelligensia I'...|       1.0| 0.5134531950981953|
|Noooooo! They are...|       0.0| 0.4906558198925297|
|I'm going to watc...|       1.0|  0.512030045103656|
|I shall stop twee...|       0.0|  0.477328524891388|
|@GingerCM @autism...|       0.0| 0.4925441128903264|
|I've put a new ar...|       1.0| 0.5164395583079437|
+--------------------+----------+-------------------+

+--------------------+----------+-------------------+
|               tweet|predi

In [8]:
val predictions_stats = predictionsDF
    .withColumn("timestamp", current_timestamp())
    .withWatermark("timestamp", "10 seconds")
    .groupBy($"prediction", window($"timestamp", "10 seconds", "1 second"))
    .count()

predictions_stats = [prediction: double, window: struct<start: timestamp, end: timestamp> ... 1 more field]


[prediction: double, window: struct<start: timestamp, end: timestamp> ... 1 more field]

In [9]:
val console_stream = predictions_stats
    .writeStream
    .outputMode("append")
    .format("console")
    .start()

console_stream = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@77d9e94


org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@77d9e94

-------------------------------------------
Batch: 0
-------------------------------------------
+----------+------+-----+
|prediction|window|count|
+----------+------+-----+
+----------+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+----------+------+-----+
|prediction|window|count|
+----------+------+-----+
+----------+------+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+----------+--------------------+-----+
|prediction|              window|count|
+----------+--------------------+-----+
|       1.0|[2019-10-16 12:27...|    4|
|       0.0|[2019-10-16 12:27...|   12|
|       1.0|[2019-10-16 12:27...|    4|
|       1.0|[2019-10-16 12:27...|    4|
|       0.0|[2019-10-16 12:27...|   12|
|       0.0|[2019-10-16 12:27...|   12|
|       1.0|[2019-10-16 12:27...|    4|
|       0.0|[2019-10-16 12:27...|   12|
|       0.0|[2019-10-16 12:27...|   12|
|       0.0|[2019-1