In [1]:
import org.apache.spark.sql.types.{StructType, StringType}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame

In [2]:
println(s"Current spark version is ${spark.version}")

Current spark version is 2.4.4


In [13]:
val inputStreamPath = "/home/jovyan/data/events-stream"

val dataSchema = new StructType()
    .add("tweet", StringType)

val inputDF = spark
    .readStream
    .schema(dataSchema)
    .option("maxFilesPerTrigger", 1)
    .json(inputStreamPath)

inputStreamPath = /home/jovyan/data/events-stream
dataSchema = StructType(StructField(tweet,StringType,true))
inputDF = [tweet: string]


[tweet: string]

In [14]:
val modelPath = "/home/jovyan/models/spark-ml-model"
val model = PipelineModel.load(modelPath)

modelPath = /home/jovyan/models/spark-ml-model
model = pipeline_c89aba4b99ea


pipeline_c89aba4b99ea

In [15]:
val getProbability = udf((prediction: org.apache.spark.ml.linalg.Vector) => prediction(1))

getProbability = UserDefinedFunction(<function1>,DoubleType,Some(List(org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7)))


UserDefinedFunction(<function1>,DoubleType,Some(List(org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7)))

In [23]:
val predictionsDF = model
    .transform(inputDF)
    .withColumn("probability", getProbability($"probability"))
    .select($"tweet", $"probability")

predictionsDF = [tweet: string, probability: double]


[tweet: string, probability: double]

In [24]:
predictionsDF
    .writeStream
    .foreachBatch { (batchDF: DataFrame, batchId: Long) => batchDF.show()}
    .start()

+--------------------+------------------+
|               tweet|       probability|
+--------------------+------------------+
|This is not your ...|0.4918569254262565|
|Awesome time with...|0.5077273477098998|
|@puddingface Did ...|0.5000010676040774|
|@_nicolereyes so ...|0.4735470304108918|
|http://twitpic.co...|0.5014425616389502|
|Pressed the Lemon...|0.5341815688852201|
|couldn't get what...|0.4243608792208298|
|@GabrieleMilan Oh...|0.5022482287512597|
|@quisutdeus1984 g...|0.5382443038091115|
|@BPorky thy was m...|0.5206943677343097|
|is watching a movie | 0.524290099381844|
|@bobvolanti I'll ...|0.4915510921355743|
|GOT MY CAST OFF !...|0.4958528001907238|
|@AndyRobertsBHAM ...|0.5168572797762165|
|Landed in sioux f...|0.5298242998006263|
|www.argentinepost...|0.5464786575228862|
+--------------------+------------------+



org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@7e3ee70

+--------------------+-------------------+
|               tweet|        probability|
+--------------------+-------------------+
|@cinnamonclouds  ...|  0.524290099381844|
|@jennnamarie_ dyi...|0.48164851381071055|
|    Mary just left! |  0.524290099381844|
|@kokas26 i like t...| 0.5016033008669695|
|@LaurenConrad jus...| 0.4161748837277689|
|@mileycyrus miley...| 0.4915510921355743|
|@musicologist012 ...| 0.5151601673155273|
|Just watched the ...| 0.5151601673155273|
|watching grey gar...|  0.524290099381844|
|@sorskoot Install...|  0.524290099381844|
|@PaiL20 oh i did ...| 0.5393103331719392|
|@Little_Ren Hoora...| 0.5151601673155273|
|  No more exams!yay |  0.524290099381844|
|At the beach  so ...|  0.496879523837263|
|Thats the end of ...|  0.524290099381844|
|And that was my 5...|0.47419636748920696|
|@mikeyway cool ba...|  0.524290099381844|
+--------------------+-------------------+

+--------------------+-------------------+
|               tweet|        probability|
+---------

In [31]:
val windowedPredictionsCounts = model
    .transform(inputDF)
    .select($"tweet", $"prediction")
    .withColumn("timestamp", current_timestamp())
    .withWatermark("timestamp", "10 seconds")
    .groupBy(
        $"prediction",
        window($"timestamp", "10 seconds", "1 seconds"))
    .count()

windowedPredictionsCounts = [prediction: double, window: struct<start: timestamp, end: timestamp> ... 1 more field]


[prediction: double, window: struct<start: timestamp, end: timestamp> ... 1 more field]

In [32]:
windowedPredictionsCounts
    .writeStream
    .outputMode("append")
    .format("console")
    .start()

org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@3beed974

-------------------------------------------
Batch: 0
-------------------------------------------
+----------+------+-----+
|prediction|window|count|
+----------+------+-----+
+----------+------+-----+

-------------------------------------------
Batch: 1
-------------------------------------------
+----------+------+-----+
|prediction|window|count|
+----------+------+-----+
+----------+------+-----+

-------------------------------------------
Batch: 2
-------------------------------------------
+----------+--------------------+-----+
|prediction|              window|count|
+----------+--------------------+-----+
|       1.0|[2020-01-20 12:46...|   11|
|       0.0|[2020-01-20 12:46...|    5|
+----------+--------------------+-----+

-------------------------------------------
Batch: 3
-------------------------------------------
+----------+--------------------+-----+
|prediction|              window|count|
+----------+--------------------+-----+
|       0.0|[2020-01-20 12:46...|    5|
|