In [1]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("cs544")
         .config("spark.sql.shuffle.partitions", 10)
         .config("spark.ui.showConsoleProgress", False)
         .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.12:3.2.2')
         .getOrCreate())

df = (
    spark.readStream.format("kafka")
    .option("kafka.bootstrap.servers", "kafka:9092")
    .option("subscribe", "stations-json")
    .option("startingOffsets", "earliest")
    .load()
)

:: loading settings :: url = jar:file:/usr/local/lib/python3.10/dist-packages/pyspark/jars/ivy-2.5.0.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /root/.ivy2/cache
The jars for the packages stored in: /root/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-a2a313bc-0911-4981-a77c-4e824354b6cf;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.2.2 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.2.2 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.7.1 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.30 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.1 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.1 in central
	found org.apache.htrace#htrace-core4;4.1.0-incubating in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.ap

In [2]:
from pyspark.sql.functions import col, expr, from_json, date_add, month, avg

### Part 3: Spark Streaming

In [3]:
df = (spark.read.format("kafka")
 .option("kafka.bootstrap.servers", "kafka:9092")
 .option("subscribe", "stations-json").load())

In [4]:
df.isStreaming

False

In [5]:
df.count()

7435

In [6]:
df.dtypes

[('key', 'binary'),
 ('value', 'binary'),
 ('topic', 'string'),
 ('partition', 'int'),
 ('offset', 'bigint'),
 ('timestamp', 'timestamp'),
 ('timestampType', 'int')]

In [7]:
df.show()

+----+--------------------+-------------+---------+------+--------------------+-------------+
| key|               value|        topic|partition|offset|           timestamp|timestampType|
+----+--------------------+-------------+---------+------+--------------------+-------------+
|[45]|[7B 22 64 61 74 6...|stations-json|        1|     0|2023-04-30 00:56:...|            0|
|[4F]|[7B 22 64 61 74 6...|stations-json|        1|     1|2023-04-30 00:56:...|            0|
|[45]|[7B 22 64 61 74 6...|stations-json|        1|     2|2023-04-30 00:56:...|            0|
|[4F]|[7B 22 64 61 74 6...|stations-json|        1|     3|2023-04-30 00:56:...|            0|
|[45]|[7B 22 64 61 74 6...|stations-json|        1|     4|2023-04-30 00:56:...|            0|
|[4F]|[7B 22 64 61 74 6...|stations-json|        1|     5|2023-04-30 00:56:...|            0|
|[45]|[7B 22 64 61 74 6...|stations-json|        1|     6|2023-04-30 00:56:...|            0|
|[4F]|[7B 22 64 61 74 6...|stations-json|        1|     7|20

In [8]:
schema = "date date, station string, degrees float, raining integer"
(df.select(col("key").cast("string"),
          from_json(col("value").cast("string"), schema).alias("value"))
         .select("key", "value.*")).show()

+---+----------+-------+----------+-------+
|key|      date|station|   degrees|raining|
+---+----------+-------+----------+-------+
|  E|2000-01-01|      E|  28.45602|      0|
|  O|2000-01-01|      O|  27.26475|      0|
|  E|2000-01-02|      E|  20.86555|      0|
|  O|2000-01-02|      O|  20.83462|      0|
|  E|2000-01-03|      E| 29.622593|      0|
|  O|2000-01-03|      O|15.8072195|      0|
|  E|2000-01-04|      E|  37.41072|      0|
|  O|2000-01-04|      O| 28.667984|      0|
|  E|2000-01-05|      E| 44.478657|      0|
|  O|2000-01-05|      O| 21.241236|      0|
|  E|2000-01-06|      E|  32.05573|      0|
|  O|2000-01-06|      O| 14.483433|      0|
|  E|2000-01-07|      E| 23.983406|      0|
|  O|2000-01-07|      O| 25.098196|      0|
|  E|2000-01-08|      E| 30.396551|      0|
|  O|2000-01-08|      O| 20.354261|      0|
|  E|2000-01-09|      E| 29.803612|      0|
|  O|2000-01-09|      O| 28.349361|      0|
|  E|2000-01-10|      E| 20.564102|      0|
|  O|2000-01-10|      O| 23.3843

In [9]:
df = (spark.readStream.format("kafka")
 .option("kafka.bootstrap.servers", "kafka:9092")
 .option("subscribe", "stations-json")
 .option("startingOffsets", "earliest").load())

In [10]:
station= (df.select(col("key").cast("string"),
          from_json(col("value").cast("string"), schema).alias("value"))
         .select("key", "value.*"))

In [11]:
station.isStreaming

True

In [12]:
counts_df = (station.select(station.date, station.station, station.degrees)
             .groupby("station")
             .agg(
                 expr("MIN(date)").alias("start"),
                 expr("MAX(date)").alias("end"),
                 expr("COUNT(degrees)").alias("measurements"),
                 expr("AVG(degrees)").alias("avg"),
                 expr("MAX(degrees)").alias("max")
                     )
            )

s = counts_df.writeStream.format("console").trigger(processingTime="5 seconds").outputMode("complete").start()
s.awaitTermination(30)
s.stop()

23/04/30 01:05:04 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-08a89f13-e9df-41b6-bc2a-3fea3a2bd55d. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/04/30 01:05:04 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 0
-------------------------------------------
+-------+----------+----------+------------+------------------+----------+
|station|     start|       end|measurements|               avg|       max|
+-------+----------+----------+------------+------------------+----------+
|      F|2000-01-01|2001-05-23|         509|45.224777637624086|  89.89394|
|      K|2000-01-01|2001-05-23|         509|48.656605058194145|  91.99202|
|      I|2000-01-01|2001-05-23|         509| 67.94582215929313| 114.58096|
|      N|2000-01-01|2001-05-23|         509| 51.14132382897825| 97.028625|
|      E|2000-01-01|2001-05-23|         509| 52.79992546501234| 100.50049|
|      J|2000-01-01|2001-05-23|         509| 44.07271267224857|  88.68911|
|      A|2000-01-01|2001-05-23|         509| 60.11602631439629| 114.12946|
|      H|2000-01-01|2001-05-23|         509| 52.08816163619515|106.363304|
|      G|2000-01-01|2001-05-23|         509| 38.81074190242234|  89.80698|
|  

23/04/30 01:05:14 WARN ProcessingTimeExecutor: Current batch is falling behind. The trigger interval is 5000 milliseconds, but spent 9404 milliseconds


-------------------------------------------
Batch: 1
-------------------------------------------
+-------+----------+----------+------------+------------------+----------+
|station|     start|       end|measurements|               avg|       max|
+-------+----------+----------+------------+------------------+----------+
|      K|2000-01-01|2001-05-30|         516|48.999159538468646|  91.99202|
|      F|2000-01-01|2001-05-30|         516| 45.42966654503992|  89.89394|
|      I|2000-01-01|2001-05-30|         516| 68.28605990816456| 114.58096|
|      N|2000-01-01|2001-05-30|         516| 51.25425077524296| 97.028625|
|      E|2000-01-01|2001-05-30|         516| 52.90850942818693| 100.50049|
|      J|2000-01-01|2001-05-30|         516|44.293935769057086|  88.68911|
|      A|2000-01-01|2001-05-30|         516| 60.48156513169754| 114.12946|
|      H|2000-01-01|2001-05-30|         516|52.407432743745254|106.363304|
|      G|2000-01-01|2001-05-30|         516|38.998446699347376|  89.80698|
|  

23/04/30 01:05:35 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@3f4ca92b is aborting.
23/04/30 01:05:35 ERROR WriteToDataSourceV2Exec: Data source write support org.apache.spark.sql.execution.streaming.sources.MicroBatchWrite@3f4ca92b aborted.


### Rain Forecast Dataset

In [13]:
today = (station.select(station.station, station.date, station.raining))
today

23/04/30 01:05:35 WARN TaskSetManager: Lost task 0.0 in stage 17.0 (TID 105) (81bf1d9b86f1 executor driver): TaskKilled (Stage cancelled)
23/04/30 01:05:35 WARN TaskSetManager: Lost task 1.0 in stage 17.0 (TID 106) (81bf1d9b86f1 executor driver): TaskKilled (Stage cancelled)


DataFrame[station: string, date: date, raining: int]

In [14]:
!rm -rf _spark_metadata/
!rm -rf checkpoint/

In [15]:
features = station.select(station.station, station.date, month(station.date).alias("month"))
features

onePrior = (station.select(station.station, date_add(station.date, 1).alias("date"),
                           station.degrees.alias("sub1degrees"), station.raining.alias("sub1raining")))

twoPrior = (station.select(station.station, date_add(station.date, 2).alias("date"),
                           station.degrees.alias("sub2degrees"), station.raining.alias("sub2raining")))
# join the prior dataframes, then join to features  
combined = onePrior.join(twoPrior,["station","date"])

features = features.join(combined, ["station", "date"])

# finally join features and today
forecast = today.join(features, ["station", "date"])

m = forecast.repartition(1).writeStream.format("parquet").option("path", "/notebooks/parquet").trigger(processingTime="1 minute").option("checkpointLocation", "/notebooks/parquet").start()

23/04/30 01:05:36 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


### Part 4

In [16]:
from pyspark.ml.feature import VectorAssembler
from pyspark.ml.classification import DecisionTreeClassifier
from pyspark.ml.evaluation import MulticlassClassificationEvaluator

In [17]:
# Read in Parquet Files
data = spark.read.parquet("/notebooks/parquet/part-00000-a2d1eaee-7e44-45f2-9557-cfadb5371aff-c000.snappy.parquet")

In [18]:
va = VectorAssembler(inputCols=["month", "sub1degrees", "sub2degrees", "sub1raining", "sub2raining"], outputCol="features")
data = va.transform(data)

In [19]:
data = data.select(["raining", "features"])

In [20]:
train_data, test_data = data.randomSplit([0.8, 0.2], seed=42)

In [28]:
train_data.count(), test_data.count()

(642, 123)

In [22]:
dt_classifier = DecisionTreeClassifier(featuresCol="features", labelCol="raining")
dt_model = dt_classifier.fit(train_data)

In [23]:
print(dt_model.toDebugString)

DecisionTreeClassificationModel: uid=DecisionTreeClassifier_78b0560859be, depth=5, numNodes=35, numClasses=2, numFeatures=5
  If (feature 3 <= 0.5)
   If (feature 1 <= 31.847373008728027)
    If (feature 4 <= 0.5)
     Predict: 0.0
    Else (feature 4 > 0.5)
     If (feature 1 <= 26.69634437561035)
      If (feature 1 <= 24.737836837768555)
       Predict: 0.0
      Else (feature 1 > 24.737836837768555)
       Predict: 1.0
     Else (feature 1 > 26.69634437561035)
      Predict: 0.0
   Else (feature 1 > 31.847373008728027)
    If (feature 2 <= 30.248023986816406)
     Predict: 0.0
    Else (feature 2 > 30.248023986816406)
     If (feature 2 <= 34.400983810424805)
      If (feature 1 <= 33.586740493774414)
       Predict: 1.0
      Else (feature 1 > 33.586740493774414)
       Predict: 0.0
     Else (feature 2 > 34.400983810424805)
      Predict: 0.0
  Else (feature 3 > 0.5)
   If (feature 4 <= 0.5)
    If (feature 2 <= 45.63200759887695)
     If (feature 2 <= 35.56413459777832)
      If

In [24]:
predictions = dt_model.transform(test_data)
evaluator = MulticlassClassificationEvaluator(labelCol="raining", predictionCol="prediction")

accuracy = evaluator.evaluate(predictions)

print("Prediction Accuracy: ", accuracy)

data.agg(avg(col("raining"))).show()

Prediction Accuracy:  0.9225494438605919
+-------------------+
|       avg(raining)|
+-------------------+
|0.09411764705882353|
+-------------------+



In [25]:
filtered_data = forecast.filter(col("station") == "A") # gets us only station A

In [26]:
va2 = VectorAssembler(inputCols=["month", "sub1degrees", "sub2degrees", "sub1raining", "sub2raining"], outputCol = "features")
va_data = va2.transform(filtered_data).select("station", "date", "features")

In [27]:
final = dt_model.transform(va_data).select("station", "date", "prediction").writeStream.format("console").outputMode("append").start()

final.awaitTermination(60)
final.stop()

23/04/30 01:05:47 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-cee4da1d-6fb7-4b42-ad7b-784ede8777ac. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
23/04/30 01:05:47 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.


-------------------------------------------
Batch: 0
-------------------------------------------
+-------+----------+----------+
|station|      date|prediction|
+-------+----------+----------+
|      A|2000-01-10|       0.0|
|      A|2000-01-23|       0.0|
|      A|2000-01-26|       0.0|
|      A|2000-02-29|       1.0|
|      A|2000-03-12|       0.0|
|      A|2000-03-18|       0.0|
|      A|2000-03-21|       1.0|
|      A|2000-05-15|       1.0|
|      A|2000-05-16|       1.0|
|      A|2000-05-20|       1.0|
|      A|2000-05-22|       0.0|
|      A|2000-05-29|       0.0|
|      A|2000-06-16|       0.0|
|      A|2000-06-24|       1.0|
|      A|2000-06-27|       1.0|
|      A|2000-07-09|       0.0|
|      A|2000-07-25|       1.0|
|      A|2000-08-03|       0.0|
|      A|2000-08-05|       0.0|
|      A|2000-09-11|       1.0|
+-------+----------+----------+
only showing top 20 rows

-------------------------------------------
Batch: 1
-------------------------------------------
+-------+---

23/04/30 01:06:48 ERROR TorrentBroadcast: Store broadcast broadcast_497 fail, remove all pieces of the broadcast
