In [None]:
import org.apache.spark.sql.types.{StructType, StringType}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.sql.functions.{current_timestamp, udf, when, window}
import org.apache.spark.sql.DataFrame

In [None]:
println (s"Current spark version is ${spark.version}")

In [None]:
val inputStreamPath = "/home/jovyan/data/events-stream"
val modelPath = "/home/jovyan/models/spark-ml-model"

val dataSchema = new StructType ()
    .add("tweet", StringType)

val inputDF = spark
    .readStream
    .schema (dataSchema)
    .option ("maxFilesPerTrigger", 1)
    .json (inputStreamPath)
    .withColumn ("load_dttm", current_timestamp ())

val model = PipelineModel.load (modelPath)

val getProbability = udf ((prediction : org.apache.spark.ml.linalg.Vector) => prediction (1))

In [None]:
val predictionsDF = model.transform (inputDF)

## Negative tweets probablilities

In [None]:
predictionsDF
    .select(
        $"tweet",
        getProbability ($"probability") as ("negative_probability"))
    .writeStream
    .foreachBatch {
        (batchDF : DataFrame, batchId : Long) =>
            batchDF.show ()
}.start ()

## Tweets prediction statistics

In [None]:
predictionsDF
    .select ("prediction", "tweet", "load_dttm")
    .writeStream
    .foreachBatch {
        (batchDF : DataFrame, batchId : Long) =>
            batchDF
                .withWatermark ("load_dttm", "10 seconds")
                .groupBy (window ($"load_dttm", "10 seconds") as "time_interval", $"prediction")
                .count ()
                .select (when ($"prediction" === 0, "Positive").otherwise ("Negative") as "tweet_tone",
                    $"count",
                    $"time_interval")
                .show ()
}.start ()