# Part 3: Spark Streaming


In [1]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("cs544")
         .config("spark.sql.shuffle.partitions", 10)
         .config("spark.ui.showConsoleProgress", False)
         .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.2')
         .getOrCreate())

df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "stations-json")
    .option("startingOffsets", "earliest")
    .load()
)

:: loading settings :: url = jar:file:/usr/local/lib/python3.10/dist-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-42d7d2f8-2015-47e2-83a8-1e430a67a5f4;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.2.2 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.2 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.1 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.1 in central
	found org.apache.htrace#htrace-core4;4.1.0-incubating in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.ap

In [2]:
from pyspark.sql.types import StructType, StructField, StringType, DateType, FloatType, IntegerType
from pyspark.sql.functions import col, from_json, date_add, expr, month, when

schema = StructType([
    StructField("station", StringType(), True),
    StructField("date", DateType(), True),
    StructField("degrees", FloatType(), True),
    StructField("raining", IntegerType(), True),
])


In [3]:
data = df.select(from_json(col("value").cast("string"), schema).alias("value")).select("value.*")

stats = (
    data.groupBy("station")
    .agg(
        expr("min(date) as start"),
        expr("max(date) as end"),
        expr("count(*) as measurements"),
        expr("avg(degrees) as avg"),
        expr("max(degrees) as max")
    )
    .orderBy("station")
)

In [4]:
s = stats.writeStream.format("console").trigger(processingTime="5 seconds").outputMode("complete").start()
s.awaitTermination(30)
s.stop()

23/04/27 22:21:02 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-0835e34d-a776-4715-8dc7-c4eeeb1faec8. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/04/27 22:21:02 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 0
-------------------------------------------
+-------+----------+----------+------------+------------------+----------+
|station|     start|       end|measurements|               avg|       max|
+-------+----------+----------+------------+------------------+----------+
|      A|2000-01-01|2025-05-19|        9271| 58.54733821597638| 112.31312|
|      B|2000-01-01|2025-05-19|        9271| 64.24334486571604| 114.48652|
|      C|2000-01-01|2025-05-19|        9271| 60.15539362681052| 114.12806|
|      D|2000-01-01|2025-05-19|        9271|53.667564692419525| 104.69655|
|      E|2000-01-01|2025-05-19|        9271| 48.02053792793322|102.036255|
|      F|2000-01-01|2025-05-19|        9271| 55.63299546107005| 109.81366|
|      G|2000-01-01|2025-05-19|        9271|43.290733723741454|  99.80991|
|      H|2000-01-01|2025-05-19|        9271|55.166817879843336|108.422554|
|      I|2000-01-01|2025-05-19|        9271| 70.98507145104085| 123.86119|
|  

23/04/27 22:21:21 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 5000 milliseconds, but spent 18826 milliseconds


-------------------------------------------
Batch: 1
-------------------------------------------
+-------+----------+----------+------------+------------------+----------+
|station|     start|       end|measurements|               avg|       max|
+-------+----------+----------+------------+------------------+----------+
|      A|2000-01-01|2025-06-03|        9286|58.577163748435105| 112.31312|
|      B|2000-01-01|2025-06-03|        9286| 64.26074389222109| 114.48652|
|      C|2000-01-01|2025-06-03|        9286| 60.18989694795037| 114.12806|
|      D|2000-01-01|2025-06-03|        9286| 53.70581512151384| 104.69655|
|      E|2000-01-01|2025-06-03|        9286| 48.02167179496791|102.036255|
|      F|2000-01-01|2025-06-03|        9286| 55.66624028670385| 109.81366|
|      G|2000-01-01|2025-06-03|        9286|43.322915709199656|  99.80991|
|      H|2000-01-01|2025-06-03|        9286|55.190007411952855|108.422554|
|      I|2000-01-01|2025-06-03|        9286| 71.01117011829616| 123.86119|
|  

23/04/27 22:21:32 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@3cc4cd53 is aborting.
23/04/27 22:21:32 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@3cc4cd53 aborted.


In [5]:
today = data.select("station", "date", "raining")
yesterday = data.select("station", date_add("date", 1).alias("date"), "degrees", "raining").withColumnRenamed("degrees", "sub1degrees").withColumnRenamed("raining", "sub1raining")
two_days_ago = data.select("station", date_add("date", 2).alias("date"), "degrees", "raining").withColumnRenamed("degrees", "sub2degrees").withColumnRenamed("raining", "sub2raining")

In [6]:
import os

checkpoint_path = os.path.join(os.getcwd(), "checkpoint")
output_path = os.path.join(os.getcwd(), "output")


features = (
    today.join(yesterday, ["station", "date"])
    .join(two_days_ago, ["station", "date"])
    .withColumn("month", month("date"))
    .select("station", "date", "month", "raining", "sub1degrees", "sub1raining", "sub2degrees", "sub2raining")
)

stream_query = (
    features.repartition(1)
    .writeStream.format("parquet")
    .option("checkpointLocation", checkpoint_path)
    .option("path", output_path)
    .trigger(processingTime="1 minutes")
    .start()
)
stream_query.awaitTermination(30)  

23/04/27 22:21:33 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


False

# Part 4: Spark ML


In [7]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

data = spark.read.parquet(output_path)
assembler = VectorAssembler(inputCols=["month", "sub1degrees", "sub1raining", "sub2degrees", "sub2raining"], outputCol="features")
data_features = assembler.transform(data)
train, test = data_features.randomSplit([0.8, 0.2], seed=20)

classifier = DecisionTreeClassifier(labelCol="raining", featuresCol="features")
model = classifier.fit(train)
print(model.toDebugString)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_6c49595bb873, depth=5, numNodes=23, numClasses=2, numFeatures=5
  If (feature 2 <= 0.5)
   Predict: 0.0
  Else (feature 2 > 0.5)
   If (feature 1 <= 37.52163887023926)
    If (feature 0 <= 2.5)
     If (feature 3 <= 32.25822067260742)
      If (feature 3 <= 21.409591674804688)
       Predict: 1.0
      Else (feature 3 > 21.409591674804688)
       Predict: 0.0
     Else (feature 3 > 32.25822067260742)
      If (feature 3 <= 47.68816947937012)
       Predict: 0.0
      Else (feature 3 > 47.68816947937012)
       Predict: 1.0
    Else (feature 0 > 2.5)
     If (feature 0 <= 11.5)
      If (feature 1 <= 35.317331314086914)
       Predict: 0.0
      Else (feature 1 > 35.317331314086914)
       Predict: 1.0
     Else (feature 0 > 11.5)
      If (feature 3 <= 45.004194259643555)
       Predict: 0.0
      Else (feature 3 > 45.004194259643555)
       Predict: 1.0
   Else (feature 1 > 37.52163887023926)
    If (feature 1 <= 39.879570007

In [16]:
predictions = model.transform(test)
evaluator = MulticlassClassificationEvaluator(labelCol="raining", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)
total = predictions.count()
raining_records = predictions.filter(predictions["raining"] == 1).count()
percentage_raining = (raining_records / total) * 100

print(f"Percentage accuracy: {accuracy * 100}%")
print(f"Percentage raining: {percentage_raining}%")

Percentage accuracy: 79.09810870429972%
Percentage raining: 33.707011897443316%


In [9]:
prediction_query = (
    model.transform(assembler.transform(features))
    .select("station", "date", "prediction")
    .writeStream.format("console")
    .trigger(processingTime="1 minutes")
    .start()
)

prediction_query.awaitTermination(30)
prediction_query.stop()

23/04/27 22:22:22 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-44e30da0-10bc-4856-b512-aab43bc4ca35. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/04/27 22:22:22 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 0
-------------------------------------------
+-------+----------+----------+
|station|      date|prediction|
+-------+----------+----------+
|      J|2000-01-13|       1.0|
|      I|2000-01-16|       0.0|
|      J|2000-01-18|       0.0|
|      J|2000-01-19|       0.0|
|      I|2000-01-20|       0.0|
|      J|2000-01-31|       0.0|
|      I|2000-02-03|       0.0|
|      J|2000-02-06|       1.0|
|      J|2000-02-11|       0.0|
|      I|2000-02-13|       0.0|
|      J|2000-02-13|       0.0|
|      F|2000-02-14|       0.0|
|      J|2000-02-15|       0.0|
|      F|2000-02-18|       0.0|
|      J|2000-02-22|       0.0|
|      I|2000-03-02|       1.0|
|      I|2000-03-07|       1.0|
|      J|2000-03-08|       1.0|
|      F|2000-03-15|       1.0|
|      I|2000-03-16|       0.0|
+-------+----------+----------+
only showing top 20 rows

