# Part 3: Spark Streaming

In [1]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("cs544")
         .config("spark.sql.shuffle.partitions", 10)
         .config("spark.ui.showConsoleProgress", False)
         .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.2')
         .getOrCreate())

df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "stations-json")
    .option("startingOffsets", "earliest")
    .load()
)

:: loading settings :: url = jar:file:/usr/local/lib/python3.10/dist-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-43be028f-35ab-4eda-845e-8ebb3eb7decc;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.2.2 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.2 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.1 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.1 in central
	found org.apache.htrace#htrace-core4;4.1.0-incubating in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.ap

In [2]:
from pyspark.sql.functions import col, from_json
from pyspark.sql.functions import min, max, mean, count, month, date_add

schema = "station STRING, date DATE, degrees FLOAT, raining INT"
weather = (df.select(col("key").cast("string"),
          from_json(col("value").cast("string"), schema).alias("value"))
    .select("key", "value.*"))

In [3]:
counts_df = weather.groupby("station").agg(
    min("date").alias("start"),
    max("date").alias("end"),
    count("*").alias("measurements"),
    mean("degrees").alias("avg"),
    max("degrees").alias("max")
).orderBy("station")
s = counts_df.writeStream.format("console").trigger(processingTime="5 seconds").outputMode("complete").start()
s.awaitTermination(30)
s.stop()

23/04/26 03:20:07 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-35bea8d0-9f66-4621-9572-f6fe921288c3. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/04/26 03:20:07 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 0
-------------------------------------------
+-------+----------+----------+------------+------------------+---------+
|station|     start|       end|measurements|               avg|      max|
+-------+----------+----------+------------+------------------+---------+
|      A|2000-01-01|2000-01-07|           7|  12.7029287815094|19.410671|
|      B|2000-01-01|2000-01-07|           7|44.903284890311106| 51.58526|
|      C|2000-01-01|2000-01-07|           7| 13.63097984450204|24.711754|
|      D|2000-01-01|2000-01-07|           7| 18.17362424305507|23.583237|
|      E|2000-01-01|2000-01-07|           7| 35.36222948346819| 46.30291|
|      F|2000-01-01|2000-01-07|           7| 15.10127067565918|22.011278|
|      G|2000-01-01|2000-01-07|           7|19.930963924952916|28.490503|
|      H|2000-01-01|2000-01-07|           7|27.077222279139928| 32.36006|
|      I|2000-01-01|2000-01-07|           7|26.303656714303152|39.473343|
|      J|2000-0

23/04/26 03:20:22 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 5000 milliseconds, but spent 15137 milliseconds


-------------------------------------------
Batch: 1
-------------------------------------------
+-------+----------+----------+------------+------------------+---------+
|station|     start|       end|measurements|               avg|      max|
+-------+----------+----------+------------+------------------+---------+
|      A|2000-01-01|2000-01-08|           8|11.512250810861588|19.410671|
|      B|2000-01-01|2000-01-08|           8|45.179919719696045| 51.58526|
|      C|2000-01-01|2000-01-08|           8|12.517588704824448|24.711754|
|      D|2000-01-01|2000-01-08|           8| 18.14974683523178|23.583237|
|      E|2000-01-01|2000-01-08|           8| 33.52541899681091| 46.30291|
|      F|2000-01-01|2000-01-08|           8|17.100830793380737|31.097752|
|      G|2000-01-01|2000-01-08|           8|21.092774271965027|29.225447|
|      H|2000-01-01|2000-01-08|           8|28.311657190322876|  36.9527|
|      I|2000-01-01|2000-01-08|           8| 26.99494695663452|39.473343|
|      J|2000-0

23/04/26 03:20:37 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@302d5889 is aborting.
23/04/26 03:20:37 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@302d5889 aborted.
23/04/26 03:20:37 WARN TaskSetManager: Lost task 7.0 in stage 23.0 (TID 158) (ad50f7eb2261 executor driver): TaskKilled (Stage cancelled)
23/04/26 03:20:37 WARN TaskSetManager: Lost task 8.0 in stage 23.0 (TID 159) (ad50f7eb2261 executor driver): TaskKilled (Stage cancelled)


In [12]:
schema = "station STRING, date DATE, raining INT"
today = (df.select(col("key").cast("string"),
          from_json(col("value").cast("string"), schema).alias("value"))
    .select("value.*"))

schema = "station STRING, date DATE"
features = (df.select(col("key").cast("string"),
          from_json(col("value").cast("string"), schema).alias("value"))
         .select("value.*").withColumn('month', month("date")))

schema = "station STRING, date DATE, degrees FLOAT, raining INT"
yesterday = (df.select(col("key").cast("string"),
          from_json(col("value").cast("string"), schema).alias("value"))
             .select("value.*").withColumn("date", date_add("date", 1))
            .withColumnRenamed('degrees', 'sub1degrees').withColumnRenamed('raining', 'sub1raining'))

yesterday2 = (df.select(col("key").cast("string"),
          from_json(col("value").cast("string"), schema).alias("value"))
             .select("value.*").withColumn("date", date_add("date", 2))
             .withColumnRenamed('degrees', 'sub2degrees').withColumnRenamed('raining', 'sub2raining'))

features = features.join(yesterday, ['station', 'date'], 'inner').join(yesterday2, ['station', 'date'], 'inner')
joined = today.join(features,['date', 'station'], 'inner')

In [13]:
today

DataFrame[station: string, date: date, raining: int]

In [14]:
features

DataFrame[station: string, date: date, month: int, sub1degrees: float, sub1raining: int, sub2degrees: float, sub2raining: int]

In [16]:
path = "/notebooks/parquet"
check = "/notebooks/check"
query = joined.repartition(1).writeStream.trigger(processingTime='1 minute').format("parquet").option("path", path).option("checkpointLocation", check).start()
query.awaitTermination(100)
query.stop()

23/04/26 03:24:30 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
23/04/26 03:26:10 WARN Shell: Interrupted while joining on: Thread[Thread-4983,5,main]
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1257)
	at java.lang.Thread.join(Thread.java:1331)
	at org.apache.hadoop.util.Shell.joinThread(Shell.java:1043)
	at org.apache.hadoop.util.Shell.runCommand(Shell.java:1003)
	at org.apache.hadoop.util.Shell.run(Shell.java:901)
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1213)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:1307)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:1289)
	at org.apache.hadoop.fs.FileUtil.readLink(FileUtil.java:211)
	at org.apache.hadoop.fs.RawLocalFileSystem.deprecatedGetFileLinkStatusInternal(RawLocalFileSystem.java:1113)
	at org.apache.hadoop.fs.RawLocalFileSystem.getFileLinkStat

# Part 4: Spark ML

In [17]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [18]:
data = spark.read.format("parquet").load("/notebooks/parquet/*")
x = ["month", "sub1degrees", "sub1raining", "sub2degrees", "sub2raining"]
assembler = VectorAssembler(inputCols=x, outputCol="features")
data = assembler.transform(data)

In [19]:
train_data, test_data = data.randomSplit([0.8, 0.2], seed=123)

In [20]:
dt_classifier = DecisionTreeClassifier(labelCol="raining", featuresCol="features")
dt_model = dt_classifier.fit(train_data)

In [21]:
print(dt_model.toDebugString)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_c6968cf17132, depth=5, numNodes=15, numClasses=2, numFeatures=5
  If (feature 2 <= 0.5)
   If (feature 1 <= 32.92364501953125)
    Predict: 0.0
   Else (feature 1 > 32.92364501953125)
    If (feature 3 <= 46.83807373046875)
     If (feature 1 <= 39.43841552734375)
      Predict: 0.0
     Else (feature 1 > 39.43841552734375)
      If (feature 3 <= 30.99543285369873)
       Predict: 1.0
      Else (feature 3 > 30.99543285369873)
       Predict: 0.0
    Else (feature 3 > 46.83807373046875)
     Predict: 0.0
  Else (feature 2 > 0.5)
   If (feature 3 <= 46.83807373046875)
    If (feature 1 <= 42.481191635131836)
     Predict: 1.0
    Else (feature 1 > 42.481191635131836)
     Predict: 0.0
   Else (feature 3 > 46.83807373046875)
    Predict: 1.0



In [22]:
predictions = dt_model.transform(test_data)

evaluator = MulticlassClassificationEvaluator(labelCol="raining", predictionCol="prediction", metricName="accuracy")
accuracy = evaluator.evaluate(predictions)

In [23]:
print("avg(raining): ", predictions.agg({'raining': 'mean'}).collect()[0][0])
print("avg(correct): ", accuracy)

avg(raining):  0.02666666666666667
avg(correct):  0.96


In [26]:
final_df = dt_model.transform(assembler.transform(features))
s = final_df.select('station', 'date', 'prediction').writeStream.format("console").trigger(processingTime="5 seconds").outputMode("append").start()
s.awaitTermination(30)
s.stop()

23/04/26 03:29:24 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-4f549e54-eec4-480a-894f-32a0f95a2c26. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/04/26 03:29:24 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 0
-------------------------------------------
+-------+----------+----------+
|station|      date|prediction|
+-------+----------+----------+
|      J|2000-01-13|       0.0|
|      I|2000-01-16|       0.0|
|      J|2000-01-18|       0.0|
|      J|2000-01-19|       0.0|
|      I|2000-01-20|       0.0|
|      J|2000-01-31|       0.0|
|      I|2000-02-03|       0.0|
|      J|2000-02-06|       0.0|
|      N|2000-01-03|       0.0|
|      N|2000-01-21|       1.0|
|      N|2000-01-30|       0.0|
|      K|2000-01-03|       0.0|
|      B|2000-01-04|       0.0|
|      B|2000-01-06|       0.0|
|      C|2000-01-06|       0.0|
|      C|2000-01-08|       0.0|
|      K|2000-01-09|       0.0|
|      A|2000-01-10|       0.0|
|      B|2000-01-12|       0.0|
|      L|2000-01-13|       0.0|
+-------+----------+----------+
only showing top 20 rows



23/04/26 03:29:37 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 5000 milliseconds, but spent 12679 milliseconds


-------------------------------------------
Batch: 1
-------------------------------------------
+-------+----------+----------+
|station|      date|prediction|
+-------+----------+----------+
|      J|2000-02-11|       0.0|
|      M|2000-02-11|       0.0|
|      L|2000-02-11|       0.0|
|      A|2000-02-12|       0.0|
|      O|2000-02-11|       0.0|
|      B|2000-02-12|       0.0|
|      N|2000-02-11|       0.0|
|      C|2000-02-12|       0.0|
|      K|2000-02-11|       0.0|
+-------+----------+----------+



23/04/26 03:29:43 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 5000 milliseconds, but spent 6317 milliseconds


-------------------------------------------
Batch: 2
-------------------------------------------
+-------+----------+----------+
|station|      date|prediction|
+-------+----------+----------+
|      F|2000-02-12|       0.0|
|      E|2000-02-12|       0.0|
|      D|2000-02-12|       0.0|
|      G|2000-02-12|       0.0|
|      H|2000-02-12|       0.0|
|      I|2000-02-12|       1.0|
+-------+----------+----------+



23/04/26 03:29:49 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 5000 milliseconds, but spent 6311 milliseconds
23/04/26 03:29:54 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@20caafcc is aborting.
23/04/26 03:29:54 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@20caafcc aborted.
23/04/26 03:29:54 WARN Shell: Interrupted while joining on: Thread[Thread-7867,5,main]
java.lang.InterruptedException
	at java.lang.Object.wait(Native Method)
	at java.lang.Thread.join(Thread.java:1257)
	at java.lang.Thread.join(Thread.java:1331)
	at org.apache.hadoop.util.Shell.joinThread(Shell.java:1043)
	at org.apache.hadoop.util.Shell.runCommand(Shell.java:1003)
	at org.apache.hadoop.util.Shell.run(Shell.java:901)
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1213)
	at org.apache.hadoop.util.S