In [None]:
println(s"Current spark version is ${spark.version}")

In [None]:
import org.apache.spark.sql.types.{StructType, StringType}
import org.apache.spark.ml.{Pipeline, PipelineModel}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.DataFrame


In [None]:
val getProbability = udf((prediction: org.apache.spark.ml.linalg.Vector) => prediction(1))


In [None]:
val inputStreamPath = "/home/jovyan/data/events-stream"

val dataSchema = new StructType()
    .add("tweet", StringType)

val inputDF = spark
    .readStream
    .schema(dataSchema)
    .option("maxFilesPerTrigger", 1)
    .json(inputStreamPath)

In [None]:
val modelPath = "/home/jovyan/models/spark-ml-model"
val model = PipelineModel.load(modelPath)


In [None]:
val predictionDF = model
    .transform(inputDF)
    .select($"tweet", getProbability($"probability").alias("clean_probability"), $"prediction")


In [None]:
val output1 = predictionDF
    .writeStream
    .foreachBatch { (batchDF: DataFrame, batchId: Long) => batchDF.show() }
    .start()

output1.awaitTermination(10000)

In [None]:
output1.stop()

In [None]:
val output2 = predictionDF
    .withColumn("timestamp", current_timestamp)
    .withWatermark("timestamp", "10 seconds")
    .groupBy(window($"timestamp", "10 seconds"), $"prediction")
    .count
    .writeStream
    .outputMode("append")
    .format("console")
    .option("truncate", "false")
    .start()

output2.awaitTermination(100000)

In [None]:
output2.stop()