# Part 3: Spark Streaming

In [2]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("cs544")
         .config("spark.sql.shuffle.partitions", 10)
         .config("spark.ui.showConsoleProgress", False)
         .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.2')
         .getOrCreate())

df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "stations-json")
    .option("startingOffsets", "earliest")
    .load()
)

:: loading settings :: url = jar:file:/usr/local/lib/python3.10/dist-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-69cb86fe-edf7-4995-aa70-542ec8e0bc7a;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.2.2 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.2 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.1 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.1 in central
	found org.apache.htrace#htrace-core4;4.1.0-incubating in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.ap

In [3]:
from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, DateType, DoubleType, IntegerType

schema = StructType([
    StructField("station", StringType()),
    StructField("date", DateType()),
    StructField("degrees", DoubleType()),
    StructField("raining", IntegerType())
])

reports = (df.select(col("key").cast("string"),
          from_json(col("value").cast("string"), schema).alias("value"))
 .select("key", "value.*")
)

In [4]:
from pyspark.sql.functions import avg, min, max, count, first, last, asc

counts_df = (reports.groupBy(col("station")).agg(
    min(col("date")).alias("start"),
    max(col("date")).alias("end"),
    count(col("degrees")).alias("measurements"),
    avg(col("degrees")).alias("avg"),
    max(col("degrees")).alias("max"))
             .orderBy(asc("station"))
)

s = counts_df.writeStream.format("console").trigger(processingTime="5 seconds").outputMode("complete").start()
s.awaitTermination(30)
s.stop()

23/04/28 22:14:36 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-57c89ebd-d15c-4e3e-a666-ee1a835d5bab. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/04/28 22:14:36 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 0
-------------------------------------------
+-------+----------+----------+------------+------------------+-----------------+
|station|     start|       end|measurements|               avg|              max|
+-------+----------+----------+------------+------------------+-----------------+
|      A|2000-01-01|2000-05-11|         132| 44.37531906496356|83.14623638884372|
|      B|2000-01-01|2000-05-11|         132| 41.82788631855551|77.93431499850729|
|      C|2000-01-01|2000-05-11|         132| 42.13010361404884| 69.7491512914404|
|      D|2000-01-01|2000-05-11|         132|   39.349177322068|77.82482218878721|
|      E|2000-01-01|2000-05-11|         132| 51.06049152420281|98.05591787335769|
|      F|2000-01-01|2000-05-11|         132|  49.3181409588246|84.62408916989543|
|      G|2000-01-01|2000-05-11|         132| 39.15908788362907|75.99755768600086|
|      H|2000-01-01|2000-05-11|         132|38.142825655778246|83.25866153092528|
|

23/04/28 22:14:53 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 5000 milliseconds, but spent 16915 milliseconds


-------------------------------------------
Batch: 1
-------------------------------------------
+-------+----------+----------+------------+------------------+-----------------+
|station|     start|       end|measurements|               avg|              max|
+-------+----------+----------+------------+------------------+-----------------+
|      A|2000-01-01|2000-05-22|         143|46.750392265033014|85.93474454206387|
|      B|2000-01-01|2000-05-22|         143| 44.16263853766018|79.53021122933745|
|      C|2000-01-01|2000-05-22|         143| 43.57648005980836|70.84355761504722|
|      D|2000-01-01|2000-05-22|         143| 42.00611481234511|84.58195456530389|
|      E|2000-01-01|2000-05-22|         143|53.738681913803006|98.05591787335769|
|      F|2000-01-01|2000-05-22|         143| 51.32604380969941|84.62408916989543|
|      G|2000-01-01|2000-05-22|         143|  41.4940250697281|76.50130524519331|
|      H|2000-01-01|2000-05-22|         143| 39.89146115520232|83.25866153092528|
|

23/04/28 22:15:07 WARN TaskSetManager: Lost task 4.0 in stage 20.0 (TID 148) (9b4885b177fc executor driver): TaskKilled (Stage cancelled)
23/04/28 22:15:07 WARN TaskSetManager: Lost task 5.0 in stage 20.0 (TID 149) (9b4885b177fc executor driver): TaskKilled (Stage cancelled)


## Rain Forecast Dataset

In [5]:
today = reports.select(col("station"), col("date"), col("raining"))

In [6]:
from pyspark.sql.functions import *

yesterday = (reports.withColumn("date", date_add("date", 1))
             .select(col("station"), col("date"), month(col("date")).alias("month"), 
                     col("degrees").alias("sub1degrees"), col("raining").alias("sub1raining")))

two_days_ago = (reports.withColumn("date", date_add("date", 2))
             .select(col("station"), col("date"), month(col("date")).alias("month"), 
                     col("degrees").alias("sub2degrees"), col("raining").alias("sub2raining")))

features = yesterday.join(two_days_ago, on = ["station", "date", "month"])

In [7]:
today_features = today.join(features, on = ["date", "station"])

import os
if "reports" in os.listdir() and "checkpoint" in os.listdir():
    ! rm -r reports
    ! rm -r checkpoint

stream = (today_features.repartition(1)
          .writeStream
          .format("parquet")
          .option("path", "reports")
          .option("checkpointLocation", "checkpoint")
          .trigger(processingTime = "1 minute")
          .start()
         )

23/04/28 22:15:40 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


# Part 4: Spark ML

In [8]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [9]:
va = VectorAssembler(inputCols=["month", "sub1degrees", "sub1raining", "sub2degrees", "sub2raining"], outputCol="features")
data = va.transform(spark.read.parquet("reports"))

In [10]:
train_data, test_data = data.randomSplit([0.8, 0.2], seed=555)

In [11]:
dt_classifier = DecisionTreeClassifier(featuresCol = "features", labelCol = "raining")
dt_model = dt_classifier.fit(train_data)
print(dt_model.toDebugString)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_d8273381379e, depth=5, numNodes=21, numClasses=2, numFeatures=5
  If (feature 2 <= 0.5)
   Predict: 0.0
  Else (feature 2 > 0.5)
   If (feature 1 <= 39.65653895139251)
    If (feature 0 <= 3.5)
     If (feature 1 <= 35.37284361547289)
      Predict: 0.0
     Else (feature 1 > 35.37284361547289)
      If (feature 1 <= 37.631436006253544)
       Predict: 1.0
      Else (feature 1 > 37.631436006253544)
       Predict: 0.0
    Else (feature 0 > 3.5)
     If (feature 3 <= 41.764969818280626)
      Predict: 1.0
     Else (feature 3 > 41.764969818280626)
      If (feature 3 <= 44.136453106919475)
       Predict: 0.0
      Else (feature 3 > 44.136453106919475)
       Predict: 1.0
   Else (feature 1 > 39.65653895139251)
    If (feature 1 <= 81.25537642647976)
     If (feature 3 <= 46.37543614039134)
      Predict: 1.0
     Else (feature 3 > 46.37543614039134)
      If (feature 0 <= 2.5)
       Predict: 0.0
      Else (feature 0 > 2.5)


In [12]:
predictions = dt_model.transform(test_data)

evaluator = MulticlassClassificationEvaluator(predictionCol = "prediction", 
                                              labelCol = "raining", metricName = "accuracy")
accuracy = evaluator.evaluate(predictions)
avg_raining = test_data.filter(col("raining") == 1).count() / test_data.count()

predict_schema = StructType([
    StructField("avg_correct", DoubleType()),
    StructField("avg_raining", DoubleType()),
])

predict_analysis = spark.createDataFrame([(accuracy, avg_raining)], schema = predict_schema)
predict_analysis.show()

+------------------+-------------------+
|       avg_correct|        avg_raining|
+------------------+-------------------+
|0.8185745140388769|0.30309575233981284|
+------------------+-------------------+



## Model Deployment

In [13]:
from pyspark.ml.pipeline import Pipeline, PipelineModel

In [14]:
va_model = VectorAssembler(inputCols=["month", "sub1degrees", "sub1raining", "sub2degrees", "sub2raining"], 
                           outputCol="predict_features")
dt_classifier = DecisionTreeClassifier(featuresCol = "predict_features", labelCol = "raining")

In [15]:
pipe = Pipeline(stages=[
    va_model,
    dt_classifier
])

pipe_model = pipe.fit(train_data)

predictions = pipe_model.transform(features)

last_part = (predictions.filter(col("station") == "A")
     .select(col("station"), col("date"), col("prediction"))
     .writeStream
     .outputMode("append")
     .format("console")
     .start()
    )

23/04/28 22:21:10 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-a68f67e4-5871-4fb3-ad62-74e00866d07d. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/04/28 22:21:10 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 0
-------------------------------------------
+-------+----------+----------+
|station|      date|prediction|
+-------+----------+----------+
|      A|2000-02-15|       0.0|
|      A|2000-02-17|       0.0|
|      A|2000-02-21|       0.0|
|      A|2000-02-24|       0.0|
|      A|2000-03-15|       1.0|
|      A|2000-04-01|       0.0|
|      A|2000-04-02|       0.0|
|      A|2000-04-14|       1.0|
|      A|2000-04-19|       1.0|
|      A|2000-05-23|       1.0|
|      A|2000-05-24|       1.0|
|      A|2000-06-14|       0.0|
|      A|2000-06-15|       0.0|
|      A|2000-06-26|       1.0|
|      A|2000-07-11|       0.0|
|      A|2000-07-27|       1.0|
|      A|2000-08-07|       0.0|
|      A|2000-08-09|       0.0|
|      A|2000-08-16|       0.0|
|      A|2000-09-03|       1.0|
+-------+----------+----------+
only showing top 20 rows

-------------------------------------------
Batch: 1
-------------------------------------------
+-------+---

In [16]:
last_part.stop()

23/04/28 22:21:36 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@30d2bd61 is aborting.
23/04/28 22:21:36 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@30d2bd61 aborted.
23/04/28 22:21:36 ERROR DiskBlockObjectWriter: Exception occurred while reverting partial writes to file /tmp/blockmgr-75436352-b253-4cb7-a721-dc1cc0ab9264/09/temp_shuffle_d51569e6-b846-4701-8ee9-46774a464b38, null
23/04/28 22:21:36 ERROR DiskBlockObjectWriter: Exception occurred while reverting partial writes to file /tmp/blockmgr-75436352-b253-4cb7-a721-dc1cc0ab9264/22/temp_shuffle_b4f8ca16-e046-4081-9585-00e482b9094f, null
23/04/28 22:21:36 ERROR DiskBlockObjectWriter: Exception occurred while reverting partial writes to file /tmp/blockmgr-75436352-b253-4cb7-a721-dc1cc0ab9264/03/temp_shuffle_c3bea394-d252-458b-8fbe-9b2a9e1a506d, null
23/04/28 22:21:36 ERROR DiskBlockOb

In [17]:
stream.stop()