In [14]:
import org.apache.spark.sql.types.{StructType, StringType}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._

import org.apache.spark.sql.types.{StructType, StringType}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._


In [15]:
val inputStreamPath = "./data/events-stream"

val dataSchema = new StructType()
    .add("tweet", StringType)

val inputDF = spark
    .readStream
    .schema(dataSchema)
    .option("maxFilesPerTrigger", 1)
    .json(inputStreamPath)

inputStreamPath: String = ./data/events-stream
dataSchema: org.apache.spark.sql.types.StructType = StructType(StructField(tweet,StringType,true))
inputDF: org.apache.spark.sql.DataFrame = [tweet: string]


In [16]:
val trainedModel = PipelineModel.load("./models/spark-ml-model-rf")
val getProbabilityNegativeTweet = udf((prediction: org.apache.spark.ml.linalg.Vector) => prediction(1))
val getProbabilityPositiveTweet = udf((prediction: org.apache.spark.ml.linalg.Vector) => prediction(0))

+------+-----------+-----+
|window|is_positive|count|
+------+-----------+-----+
+------+-----------+-----+

+------+-----------+-----+
|window|is_positive|count|
+------+-----------+-----+
+------+-----------+-----+



trainedModel: org.apache.spark.ml.PipelineModel = pipeline_17a6fd2fa90f
getProbabilityNegativeTweet: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,DoubleType,Some(List(org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7)))
getProbabilityPositiveTweet: org.apache.spark.sql.expressions.UserDefinedFunction = UserDefinedFunction(<function1>,DoubleType,Some(List(org.apache.spark.ml.linalg.VectorUDT@3bfc3ba7)))


In [17]:
val predictionDf = trainedModel.transform(inputDF)

predictionDf: org.apache.spark.sql.DataFrame = [tweet: string, words: array<string> ... 6 more fields]


In [34]:
val markedPositiveDf = predictionDf
        .withColumn("probability_negative", getProbabilityNegativeTweet($"probability"))
        .withColumn("probability_positive", getProbabilityPositiveTweet($"probability"))
        .withColumn("is_positive", when($"probability_positive" > 0.5, 1).otherwise(0))

val timedDf = markedPositiveDf
    .withColumn("last_minute_timestamp", (unix_timestamp().minus(1 * 60) + rand() * 100).cast("timestamp"))

val countIsPositivePer10Sec = timedDf
    .withWatermark("last_minute_timestamp", "1 seconds")
    .groupBy(
        col("is_positive"),
        window(col("last_minute_timestamp"), "10 seconds", "10 seconds")
    ).count()

markedPositiveDf: org.apache.spark.sql.DataFrame = [tweet: string, words: array<string> ... 9 more fields]
timedDf: org.apache.spark.sql.DataFrame = [tweet: string, words: array<string> ... 10 more fields]
countIsPositivePer10Sec: org.apache.spark.sql.DataFrame = [is_positive: int, window: struct<start: timestamp, end: timestamp> ... 1 more field]


In [35]:
countIsPositivePer10Sec.writeStream.foreachBatch{ (batchDF: DataFrame, batchId: Long) =>
    batchDF.show(false)
}.start()

res14: org.apache.spark.sql.streaming.StreamingQuery = org.apache.spark.sql.execution.streaming.StreamingQueryWrapper@5cccb9d5


+-----------+------+-----+
|is_positive|window|count|
+-----------+------+-----+
+-----------+------+-----+

+-----------+------------------------------------------+-----+
|is_positive|window                                    |count|
+-----------+------------------------------------------+-----+
|1          |[2020-02-24 16:58:50, 2020-02-24 16:59:00]|1    |
|0          |[2020-02-24 16:57:40, 2020-02-24 16:57:50]|1    |
|0          |[2020-02-24 16:58:40, 2020-02-24 16:58:50]|1    |
|1          |[2020-02-24 16:58:40, 2020-02-24 16:58:50]|1    |
|0          |[2020-02-24 16:58:30, 2020-02-24 16:58:40]|3    |
|1          |[2020-02-24 16:58:20, 2020-02-24 16:58:30]|1    |
|0          |[2020-02-24 16:58:10, 2020-02-24 16:58:20]|2    |
|0          |[2020-02-24 16:59:00, 2020-02-24 16:59:10]|1    |
|0          |[2020-02-24 16:57:50, 2020-02-24 16:58:00]|1    |
|1          |[2020-02-24 16:58:00, 2020-02-24 16:58:10]|2    |
+-----------+------------------------------------------+-----+

+-------