# Part 3: Spark Streaming

In [60]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("cs544")
         .config("spark.sql.shuffle.partitions", 10)
         .config("spark.ui.showConsoleProgress", False)
         .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.2')
         .getOrCreate())

df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "stations-json")
    .option("startingOffsets", "earliest")
    .load()
)

In [61]:
from pyspark.sql.functions import col, from_json

schema = "station STRING, date DATE, degrees DOUBLE, raining INT"
stations = (df.select(col("key").cast("string"),
          from_json(col("value").cast("string"), schema).alias("value"))
    .select("key", "value.*"))

## Stats

In [3]:
import pyspark.sql.functions as functions

counts_df = stations.groupBy("station")\
                .agg(functions.min("date").alias("start"), \
                     functions.max("date").alias("end"), \
                     functions.count("*").alias("measurements"), \
                     functions.avg("degrees").alias("avg"), \
                     functions.max("degrees").alias("max")) \
                .sort("station")
counts_df

DataFrame[station: string, start: date, end: date, measurements: bigint, avg: double, max: double]

In [4]:
s = counts_df.writeStream.format("console").trigger(processingTime="5 seconds").outputMode("complete").start()
s.awaitTermination(30)
s.stop()

23/04/27 18:14:55 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-c3bf2db6-b62b-4358-99bc-f91c80640a0f. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/04/27 18:14:55 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 0
-------------------------------------------
+-------+----------+----------+------------+------------------+-----------------+
|station|     start|       end|measurements|               avg|              max|
+-------+----------+----------+------------+------------------+-----------------+
|      A|2000-01-01|2000-04-01|          92|27.484061964548992|52.50510850992144|
|      B|2000-01-01|2000-04-01|          92| 42.45665501320163|80.26884277730134|
|      C|2000-01-01|2000-04-01|          92|41.240703885123274|73.87817340852196|
|      D|2000-01-01|2000-04-01|          92|28.181801983563115|52.18858728283854|
|      E|2000-01-01|2000-04-01|          92|27.556855284817047|56.43119385063466|
|      F|2000-01-01|2000-04-01|          92| 49.18569199349005|68.60424517740242|
|      G|2000-01-01|2000-04-01|          92|26.041105858477586|51.30926520080542|
|      H|2000-01-01|2000-04-01|          92| 36.02207278907561|59.77849175827691|
|

23/04/27 18:15:12 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 5000 milliseconds, but spent 15882 milliseconds


-------------------------------------------
Batch: 1
-------------------------------------------
+-------+----------+----------+------------+------------------+------------------+
|station|     start|       end|measurements|               avg|               max|
+-------+----------+----------+------------+------------------+------------------+
|      A|2000-01-01|2000-04-12|         103|29.254329731599395| 53.07958331419695|
|      B|2000-01-01|2000-04-12|         103|45.215158346343884|   82.503942209696|
|      C|2000-01-01|2000-04-12|         103|41.983238439087245| 73.87817340852196|
|      D|2000-01-01|2000-04-12|         103| 31.56362212585022| 70.84373980312505|
|      E|2000-01-01|2000-04-12|         103| 30.14528824664039| 60.00186780638552|
|      F|2000-01-01|2000-04-12|         103| 49.25002579602891| 68.60424517740242|
|      G|2000-01-01|2000-04-12|         103|28.965797894056774|60.076972744479775|
|      H|2000-01-01|2000-04-12|         103| 36.73443103348826| 59.778491

23/04/27 18:15:26 WARN TaskSetManager: Lost task 4.0 in stage 20.0 (TID 148) (a370f3c803f3 executor driver): TaskKilled (Stage cancelled)
23/04/27 18:15:26 WARN TaskSetManager: Lost task 5.0 in stage 20.0 (TID 149) (a370f3c803f3 executor driver): TaskKilled (Stage cancelled)


## Rain Forecast Dataset

In [69]:
# Creating today DataFrame
today = stations.select("station", "date", "raining")

# t = today.writeStream.format("console").trigger(processingTime="5 seconds").outputMode("Append").start()
# t.awaitTermination(10)
# t.stop()

In [82]:
# Creating features DataFrame
df_yesterday = stations.select("station", 
                               col("degrees").alias("sub1degrees"), \
                               col("raining").alias("sub1raining"), \
                               functions.date_add("date", 1).alias("date"))

df_two_days_ago = stations.select("station", 
                                  col("degrees").alias("sub2degrees"), \
                                  col("raining").alias("sub2raining"), \
                                  functions.date_add("date", 2).alias("date"))
features = df_yesterday.join(df_two_days_ago, \
                             (df_yesterday.date == df_two_days_ago.date) & (df_yesterday.station == df_two_days_ago.station)) \
                    .select(df_yesterday.station, df_yesterday.date, functions.month(df_yesterday.date).alias("month"), "sub1degrees", "sub1raining", "sub2degrees", "sub2raining")

In [88]:
new_df = features.join(today) 
#new_df = features.join(today, (features.date == today.date) & (features.station == today.station)) 
s1 = (
    new_df
    .repartition(1)
    .writeStream.format("parquet")
    .outputMode("Append")
    .option("checkpointLocation", "/notebooks/checkpoint")
    .option("path", "/notebooks/station-files")
    .trigger(processingTime="60 seconds")
    .start()
)
s1.awaitTermination(10)
s1.stop()

23/04/27 22:18:10 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/04/27 22:18:13 ERROR MicroBatchExecution: Query [id = 3bc31b0f-50c0-4933-aa8f-4cf157624417, runId = 53093f13-94c7-43bb-adfa-073ad17d9a78] terminated with error
org.apache.spark.sql.AnalysisException: Stream-stream join without equality predicate is not supported;
Join Inner
:- Project [station#5520, date#7126, month(date#7126) AS month#7158, sub1degrees#7124, sub1raining#7125, sub2degrees#7131, sub2raining#7132]
:  +- Join Inner, ((date#7126 = date#7133) AND (station#5520 = station#7138))
:     :- Project [from_json(StructField(station,StringType,true), cast(value#5503 as string), Some(GMT)).station AS station#5520, from_json(StructField(degrees,DoubleType,true), cast(value#5503 as string), Some(GMT)).degrees AS sub1degrees#7124, from_json(StructField(raining,IntegerType,true), cast(value#5503 as string), Some(GMT)).raining AS sub1raining#71

In [89]:
# s = features.writeStream.format("console").trigger(processingTime="5 seconds").outputMode("Append").start()
# s.awaitTermination(30)
# s.stop()

# Part 4: Spark ML

In [None]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator