# Part 2: Streaming application using Spark Structured Streaming  
In this task, you will implement Spark Structured Streaming to consume the data from task 1 and perform prediction.  
  
Important:   
-	This task uses PySpark Structured Streaming with PySpark Dataframe APIs and PySpark ML.  
-	You also need your pipeline model from A2A to make predictions and persist the results.  

### 1. Write code to create a SparkSession, which 1) uses four cores with a proper application name; 2) use the Melbourne timezone; 3) ensure a checkpoint location has been set.


In [4]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 pyspark-shell'

from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import *
from pyspark.sql import functions as F
from pyspark.sql.types import *



spark = SparkSession \
    .builder \
    .appName("StreamingApp_A2B") \
    .master("local[4]") \
    .config("spark.sql.session.timeZone", "Australia/Melbourne") \
    .getOrCreate()


In [None]:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 pyspark-shell'

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.types import *

spark = SparkSession \
    .builder \
    .appName("StreamingApp_A2B") \
    .master("local[4]") \
    .config("spark.sql.session.timeZone", "Australia/Melbourne") \
    .config("spark.sql.streaming.checkpointLocation", "/tmp/spark_checkpoint") \
    .getOrCreate()



### 2. Write code to define the data schema for the data files, following the data types suggested in the metadata file. Load the static datasets (e.g. restaurants, delivery_address) into data frames. (You can reuse your code from 2A.)



In [5]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, DoubleType, LongType, FloatType
# Delivery Address Schema
delivery_address_schema = StructType([
    StructField("gid", IntegerType(), True),
    StructField("street_name", StringType(), True),
    StructField("street_type", StringType(), True),
    StructField("suburb", StringType(), True),
    StructField("postcode", IntegerType(), True),
    StructField("state", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("geom", StringType(), True),
    StructField("delivery_id", IntegerType(), True)
])

# Driver Schema
driver_schema = StructType([
    StructField("driver_id", IntegerType(), True),
    StructField("age", IntegerType(), True),
    StructField("rating", FloatType(), True),
    StructField("year_experience", IntegerType(), True),
    StructField("vehicle_condition", StringType(), True),
    StructField("type_of_vehicle", StringType(), True)
])

# Restaurant Schema
restaurant_schema = StructType([
    StructField("row_id", IntegerType(), True),
    StructField("restaurant_code", StringType(), True),
    StructField("chain_id", StringType(), True),
    StructField("primary_cuisine", StringType(), True),
    StructField("latitude", DoubleType(), True),
    StructField("longitude", DoubleType(), True),
    StructField("geom", StringType(), True),
    StructField("restaurant_id", IntegerType(), True),
    StructField("suburb", StringType(), True),
    StructField("postcode", IntegerType(), True)
])

# New Order Schema (instead of Order Schema)
new_order_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("delivery_person_id", IntegerType(), True),
    StructField("order_ts", LongType(), True),
    StructField("ready_ts", LongType(), True),
    StructField("weather_condition", StringType(), True),
    StructField("road_condition", StringType(), True),
    StructField("type_of_order", StringType(), True),
    StructField("order_total", IntegerType(), True),
    StructField("delivery_time", IntegerType(), True),
    StructField("travel_distance", FloatType(), True),
    StructField("restaurant_id", IntegerType(), True),
    StructField("delivery_id", IntegerType(), True)
])

# Load Static Datasets into DataFrames
delivery_address_df = spark.read.csv("delivery_address.csv", schema=delivery_address_schema, header=True)
driver_df = spark.read.csv("driver.csv", schema=driver_schema, header=True)
restaurant_df = spark.read.csv("restaurants.csv", schema=restaurant_schema, header=True)
new_order_df = spark.read.csv("new_order.csv", schema=new_order_schema, header=True)

# Verify Schema & Sample Data
print("Delivery Address Schema:")
delivery_address_df.printSchema()
print("Sample Data:")
delivery_address_df.show(5)

print("Driver Schema:")
driver_df.printSchema()
print("Sample Data:")
driver_df.show(5)

print("Restaurant Schema:")
restaurant_df.printSchema()
print("Sample Data:")
restaurant_df.show(5)

print("New Order Schema:")
new_order_df.printSchema()
print("Sample Data:")
new_order_df.show(5)

Delivery Address Schema:
root
 |-- gid: integer (nullable = true)
 |-- street_name: string (nullable = true)
 |-- street_type: string (nullable = true)
 |-- suburb: string (nullable = true)
 |-- postcode: integer (nullable = true)
 |-- state: string (nullable = true)
 |-- latitude: double (nullable = true)
 |-- longitude: double (nullable = true)
 |-- geom: string (nullable = true)
 |-- delivery_id: integer (nullable = true)

Sample Data:
+-----+-----------+-----------+---------------+--------+-----+------------+------------+--------------------+-----------+
|  gid|street_name|street_type|         suburb|postcode|state|    latitude|   longitude|                geom|delivery_id|
+-----+-----------+-----------+---------------+--------+-----+------------+------------+--------------------+-----------+
|17985| KENSINGTON|       ROAD|     KENSINGTON|    3031|  VIC|-37.79431483|144.92699295|POINT (144.926992...|          1|
|28383|    RAILWAY|      PLACE| WEST MELBOURNE|    3003|  VIC| -37.80

### 3. Using the Kafka topic (orders) from the producer in Task 1, ingest the streaming data into Spark Streaming, assuming all data comes in the String format. Except for the 'order_ts' column, you shall receive it as an Int type.


In [15]:
from pyspark.sql.functions import col, from_json, floor, rand
from pyspark.sql.types import StructType, StructField, StringType, IntegerType

# Define Schema
order_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("delivery_person_id", StringType(), True),
    StructField("order_ts", IntegerType(), True),
    StructField("ready_ts", StringType(), True),
    StructField("weather_condition", StringType(), True),
    StructField("road_condition", StringType(), True),
    StructField("type_of_order", StringType(), True),
    StructField("order_total", StringType(), True),
    StructField("delivery_time", StringType(), True),
    StructField("travel_distance", StringType(), True),
    StructField("restaurant_id", StringType(), True),
    StructField("delivery_id", StringType(), True)
])

# Reduce Spark logging
spark.sparkContext.setLogLevel("WARN")

kafka_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "new_orders") \
    .load()

# Convert Kafka value to string
json_stream = kafka_stream.selectExpr("CAST(value AS STRING)")

# Parse JSON into DataFrame
orders_df = json_stream.select(from_json(col("value"), order_schema).alias("data")).select("data.*")

In [31]:
query_orders = orders_df.writeStream \
    .outputMode("append") \
    .format("console") \
    .option("truncate", "false") \
    .queryName("ordes_df") \
    .start()

In [32]:
query_orders.stop()

### 4. Then, the streaming data frames (orders and drivers) should be transformed into the proper formats following the metadata file schema, similar to assignment 2A.

In [33]:

from pyspark.sql.functions import col, from_json
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, LongType, FloatType
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType

driver_schema = StructType([
    StructField("driver_id", StringType(), True),  
    StructField("age", StringType(), True),  
    StructField("rating", StringType(), True),
    StructField("year_experience",StringType(), True),  
    StructField("vehicle_condition", StringType(), True),
    StructField("type_of_vehicle", StringType(), True)
])


# Read drivers
kafka_drivers = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "available_drivers") \
    .option("startingOffsets", "earliest") \
    .option("failOnDataLoss", "false") \
    .load()

kafka_drivers.printSchema()

# 4. Parse JSON into Structured DataFrames
driver_df = kafka_drivers.select(from_json(col("value").cast("string"), driver_schema).alias("data")).select("data.*")

from pyspark.sql.functions import when

driver_transformed = driver_df \
    .withColumn("driver_id", when(col("driver_id").isNotNull(), col("driver_id").cast("int"))) \
    .withColumn("age", when(col("age").isNotNull(), col("age").cast("int"))) \
    .withColumn("rating", when(col("rating").isNotNull(), col("rating").cast("float"))) \
    .withColumn("year_experience", when(col("year_experience").isNotNull(), col("year_experience").cast("int"))) \
    .withColumn("vehicle_condition", col("vehicle_condition").cast("string")) \
    .withColumn("type_of_vehicle", col("type_of_vehicle").cast("string"))


query_drivers1 = driver_transformed.writeStream \
    .outputMode("append") \
    .format("console") \
    .queryName("driverQuery") \
    .option("truncate", "false") \
    .start()


root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [34]:
query_drivers1.stop()

In [None]:
query_orders.stop()
query_drivers1.stop()

In [35]:
from pyspark.sql.functions import col, to_timestamp

# Convert `ready_ts` and `order_ts` to TIMESTAMP format
orders_df = orders_df \
    .withColumn("ready_ts", to_timestamp(col("ready_ts"))) \
    .withColumn("order_ts", to_timestamp(col("order_ts")))

driver_transformed = driver_transformed.withColumn("timestamp", current_timestamp())

# Apply Watermarking
orders_limited = orders_df.withWatermark("ready_ts", "60 minutes").limit(50)
drivers_limited = driver_transformed.withWatermark("timestamp", "60 minutes").limit(20)
        
# Write into parquet files the unsuccessful requests partitioned by status code
query_orders = orders_limited.writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "parquet/orders_df")\
        .option("checkpointLocation", "parquet/orders_df/checkpoint")\
        .queryName("query_oders")\
        .start()

# Write into parquet files the unsuccessful requests partitioned by status code
query_drivers = drivers_limited.writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "parquet/drivers_df")\
        .option("checkpointLocation", "parquet/drivers_df/checkpoint")\
        .queryName("query_drivers")\
        .start()

In [28]:
query_orders.stop()
query_drivers.stop()

In [36]:
#orders = spark.read.parquet("parquet/orders_df")
orders = spark.read.parquet("parquet/orders_df")
orders.printSchema()
orders.show()

root
 |-- order_id: string (nullable = true)
 |-- delivery_person_id: string (nullable = true)
 |-- order_ts: timestamp (nullable = true)
 |-- ready_ts: timestamp (nullable = true)
 |-- weather_condition: string (nullable = true)
 |-- road_condition: string (nullable = true)
 |-- type_of_order: string (nullable = true)
 |-- order_total: string (nullable = true)
 |-- delivery_time: string (nullable = true)
 |-- travel_distance: string (nullable = true)
 |-- restaurant_id: string (nullable = true)
 |-- delivery_id: string (nullable = true)

+--------------------+------------------+-------------------+--------+-----------------+--------------+-------------+-----------+-------------+---------------+-------------+-----------+
|            order_id|delivery_person_id|           order_ts|ready_ts|weather_condition|road_condition|type_of_order|order_total|delivery_time|travel_distance|restaurant_id|delivery_id|
+--------------------+------------------+-------------------+--------+-------------

In [37]:
driverss = spark.read.parquet("parquet/drivers_df")
driverss.printSchema()
driverss.show()

root
 |-- driver_id: integer (nullable = true)
 |-- age: integer (nullable = true)
 |-- rating: float (nullable = true)
 |-- year_experience: integer (nullable = true)
 |-- vehicle_condition: string (nullable = true)
 |-- type_of_vehicle: string (nullable = true)
 |-- timestamp: timestamp (nullable = false)

+---------+---+------+---------------+-----------------+---------------+--------------------+
|driver_id|age|rating|year_experience|vehicle_condition|type_of_vehicle|           timestamp|
+---------+---+------+---------------+-----------------+---------------+--------------------+
|     1164| 18|   3.2|              1|             Poor|        Scooter|2025-02-07 11:56:...|
|     1575| 35|   3.2|              3|             Poor|          eBike|2025-02-07 11:56:...|
|     1158| 29|   3.7|              2|        Excellent|      eSchooter|2025-02-07 11:56:...|
|     1556| 56|   3.7|              3|        Excellent|           Bike|2025-02-07 11:56:...|
|     1551| 50|   4.1|          

In [None]:
query_orders.stop()
#query_drivers.stop()
#query_final.stop()

In [38]:
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType, TimestampType

# Define schema explicitly
cross_join_schema = StructType([
    StructField("order_id", StringType(), True),
    StructField("delivery_person_id", StringType(), True),
    StructField("order_ts", TimestampType(), True),
    StructField("ready_ts", TimestampType(), True),
    StructField("weather_condition", StringType(), True),
    StructField("road_condition", StringType(), True),
    StructField("type_of_order", StringType(), True),
    StructField("order_total", IntegerType(), True),  # Ensure it's explicitly read as INTEGER
    StructField("delivery_time", IntegerType(), True),
    StructField("travel_distance", FloatType(), True),
    StructField("restaurant_id", IntegerType(), True),
    StructField("delivery_id", StringType(), True),
    StructField("driver_id", StringType(), True),
    StructField("age", IntegerType(), True),
    StructField("rating", FloatType(), True),
    StructField("year_experience", IntegerType(), True),
    StructField("vehicle_condition", StringType(), True),
    StructField("type_of_vehicle", StringType(), True)
])

# # Read Parquet with defined schema
# cross_join_df = spark.read.schema(cross_join_schema).parquet("parquet/orders_drivers_joined")

# # Verify schema
# cross_join_df.printSchema()


In [39]:
# Below cross join is working but very very slowly.

# Step 1: Read Orders and Drivers from Parquet (Batch Mode)
orders_batch = spark.read.parquet("parquet/orders_df")
drivers_batch = spark.read.parquet("parquet/drivers_df")

# Step 2: Perform Cross Join
cross_join_df = orders_batch.crossJoin(drivers_batch)
# Step 3: Explicitly cast order_total to IntegerType (to prevent issues)

from pyspark.sql.functions import col
cross_join_df = cross_join_df.withColumn("order_total", col("order_total").cast("int"))

# Step 3: Write the Cross Join Result to a New Parquet File
cross_join_df.write.mode("append").parquet("parquet/orders_drivers_joined")

# Step 4: Display Output
#cross_join_df.show(truncate=False)

# Load the Cross Joined Data
cross_join_df = spark.read.schema(cross_join_schema).parquet("parquet/orders_drivers_joined")
cross_join_df.printSchema()
cross_join_df.show()

root
 |-- order_id: string (nullable = true)
 |-- delivery_person_id: string (nullable = true)
 |-- order_ts: timestamp (nullable = true)
 |-- ready_ts: timestamp (nullable = true)
 |-- weather_condition: string (nullable = true)
 |-- road_condition: string (nullable = true)
 |-- type_of_order: string (nullable = true)
 |-- order_total: integer (nullable = true)
 |-- delivery_time: integer (nullable = true)
 |-- travel_distance: float (nullable = true)
 |-- restaurant_id: integer (nullable = true)
 |-- delivery_id: string (nullable = true)
 |-- driver_id: string (nullable = true)
 |-- age: integer (nullable = true)
 |-- rating: float (nullable = true)
 |-- year_experience: integer (nullable = true)
 |-- vehicle_condition: string (nullable = true)
 |-- type_of_vehicle: string (nullable = true)



Py4JJavaError: An error occurred while calling o605.showString.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 2805.0 failed 1 times, most recent failure: Lost task 0.0 in stage 2805.0 (TID 4207) (d3e6dbbe977f executor driver): org.apache.spark.SparkException: Parquet column cannot be converted in file file:///home/student/parquet/orders_drivers_joined/part-00000-f7f0db1b-0380-4089-a825-e96c70f2dc5d-c000.snappy.parquet. Column: [delivery_time], Expected: int, Found: BINARY.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.unsupportedSchemaColumnConvertError(QueryExecutionErrors.scala:854)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:287)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:593)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException: column: [delivery_time], physicalType: BINARY, logicalType: int
	at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.constructConvertNotSupportedException(ParquetVectorUpdaterFactory.java:1127)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.getUpdater(ParquetVectorUpdaterFactory.java:189)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:175)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:342)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:233)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:283)
	... 23 more

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:2844)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:2780)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:2779)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:2779)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:1242)
	at scala.Option.foreach(Option.scala:407)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:1242)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:3048)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2982)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2971)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:984)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2398)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2419)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2438)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:530)
	at org.apache.spark.sql.execution.SparkPlan.executeTake(SparkPlan.scala:483)
	at org.apache.spark.sql.execution.CollectLimitExec.executeCollect(limit.scala:61)
	at org.apache.spark.sql.Dataset.collectFromPlan(Dataset.scala:4344)
	at org.apache.spark.sql.Dataset.$anonfun$head$1(Dataset.scala:3326)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$2(Dataset.scala:4334)
	at org.apache.spark.sql.execution.QueryExecution$.withInternalError(QueryExecution.scala:546)
	at org.apache.spark.sql.Dataset.$anonfun$withAction$1(Dataset.scala:4332)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$6(SQLExecution.scala:125)
	at org.apache.spark.sql.execution.SQLExecution$.withSQLConfPropagated(SQLExecution.scala:201)
	at org.apache.spark.sql.execution.SQLExecution$.$anonfun$withNewExecutionId$1(SQLExecution.scala:108)
	at org.apache.spark.sql.SparkSession.withActive(SparkSession.scala:900)
	at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:66)
	at org.apache.spark.sql.Dataset.withAction(Dataset.scala:4332)
	at org.apache.spark.sql.Dataset.head(Dataset.scala:3326)
	at org.apache.spark.sql.Dataset.take(Dataset.scala:3549)
	at org.apache.spark.sql.Dataset.getRows(Dataset.scala:280)
	at org.apache.spark.sql.Dataset.showString(Dataset.scala:315)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:77)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:568)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:374)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.ClientServerConnection.waitForCommands(ClientServerConnection.java:182)
	at py4j.ClientServerConnection.run(ClientServerConnection.java:106)
	at java.base/java.lang.Thread.run(Thread.java:833)
Caused by: org.apache.spark.SparkException: Parquet column cannot be converted in file file:///home/student/parquet/orders_drivers_joined/part-00000-f7f0db1b-0380-4089-a825-e96c70f2dc5d-c000.snappy.parquet. Column: [delivery_time], Expected: int, Found: BINARY.
	at org.apache.spark.sql.errors.QueryExecutionErrors$.unsupportedSchemaColumnConvertError(QueryExecutionErrors.scala:854)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:287)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
	at org.apache.spark.sql.execution.FileSourceScanExec$$anon$1.hasNext(DataSourceScanExec.scala:593)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.columnartorow_nextBatch_0$(Unknown Source)
	at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
	at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
	at org.apache.spark.sql.execution.WholeStageCodegenEvaluatorFactory$WholeStageCodegenPartitionEvaluator$$anon$1.hasNext(WholeStageCodegenEvaluatorFactory.scala:43)
	at org.apache.spark.sql.execution.SparkPlan.$anonfun$getByteArrayRdd$1(SparkPlan.scala:388)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2(RDD.scala:890)
	at org.apache.spark.rdd.RDD.$anonfun$mapPartitionsInternal$2$adapted(RDD.scala:890)
	at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
	at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:364)
	at org.apache.spark.rdd.RDD.iterator(RDD.scala:328)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:93)
	at org.apache.spark.TaskContext.runTaskWithListeners(TaskContext.scala:161)
	at org.apache.spark.scheduler.Task.run(Task.scala:141)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$4(Executor.scala:620)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally(SparkErrorUtils.scala:64)
	at org.apache.spark.util.SparkErrorUtils.tryWithSafeFinally$(SparkErrorUtils.scala:61)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:94)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:623)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1136)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:635)
	... 1 more
Caused by: org.apache.spark.sql.execution.datasources.SchemaColumnConvertNotSupportedException: column: [delivery_time], physicalType: BINARY, logicalType: int
	at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.constructConvertNotSupportedException(ParquetVectorUpdaterFactory.java:1127)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetVectorUpdaterFactory.getUpdater(ParquetVectorUpdaterFactory.java:189)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedColumnReader.readBatch(VectorizedColumnReader.java:175)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextBatch(VectorizedParquetRecordReader.java:342)
	at org.apache.spark.sql.execution.datasources.parquet.VectorizedParquetRecordReader.nextKeyValue(VectorizedParquetRecordReader.java:233)
	at org.apache.spark.sql.execution.datasources.RecordReaderIterator.hasNext(RecordReaderIterator.scala:39)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.hasNext(FileScanRDD.scala:129)
	at org.apache.spark.sql.execution.datasources.FileScanRDD$$anon$1.nextIterator(FileScanRDD.scala:283)
	... 23 more


### 5.	From each order, a) select 5 random drivers; b) use your ML model to predict their delivery time; c) select the fastest driver (i.e. the shortest delivery time), assign the driver to the order and then update delivery_time with your prediction.  
•	Note 1: You may need to join other data frames like restaurant and delivery_address if you used them in your model.  
•	Note 2: Assume one driver can only carry one order at a time within a batch. Your “random 5” selection should exclude those drivers who have already been assigned to an order.  

In [None]:
# a)
from pyspark.sql.functions import rand, collect_list, explode, slice

# Select 5 random drivers per order
random_drivers = cross_join_df.withColumn("rand", rand()) \
    .orderBy("order_id", "rand") \
    .groupby("order_id") \
    .agg(collect_list("driver_id").alias("drivers")) \
    .withColumn("drivers", slice(col("drivers"), 1, 5))

# Explode to get one row per driver
random_drivers = random_drivers.select("order_id", explode("drivers").alias("driver_id"))


In [None]:
# b)
from pyspark.sql.functions import col, rand, collect_list, explode, slice
from pyspark.ml import PipelineModel
from pyspark.sql.functions import monotonically_increasing_id

# Load the pretrained model
model = PipelineModel.load("best_model")

# Load required data
orders_df = spark.read.parquet("parquet/orders_df")
drivers_df = spark.read.parquet("parquet/drivers_df")

#delivery_address_df = spark.read.parquet("delivery_address.parquet")
#estaurants_df = spark.read.parquet("restaurants.parquet")

# Load the Cross Joined Data
cross_join_df = spark.read.schema(cross_join_schema).parquet("parquet/orders_drivers_joined")

# Select 5 random drivers per order
random_drivers = cross_join_df.withColumn("rand", rand()) \
    .orderBy("order_id", "rand") \
    .groupby("order_id") \
    .agg(collect_list("driver_id").alias("drivers")) \
    .withColumn("drivers", slice(col("drivers"), 1, 5))

# Explode to get one row per driver
random_drivers = random_drivers.select("order_id", explode("drivers").alias("driver_id"))

# Predict delivery time for selected drivers
predictions = model.transform(random_drivers)


### 6.	Perform the following aggregations:  
a)	Every 15 seconds, show the total number of revenue (sum of order_total) for each type of order (drinks, meals, snacks, etc.).   
b)	Every 30 seconds, for each suburb of restaurants, count the number of orders with predicted delivery time <=15 minutes and > 15 minutes.  

In [40]:
# 6a)
from pyspark.sql.functions import window, sum, col

# Define the path to the incoming Parquet files
parquet_path = "parquet/orders_drivers_joined"

# Read Parquet files as a Streaming DataFrame
cross_join_df = spark.readStream \
    .schema(cross_join_df.schema) \
    .format("parquet") \
    .option("path", parquet_path) \
    .option("maxFilesPerTrigger", 1) \
    .load()

In [41]:
# Ensure order_total is an integer
cross_join_df = cross_join_df.withColumn("order_total", col("order_total").cast("int"))

In [42]:
# Add watermark for handling late data
cross_join_df = cross_join_df.withWatermark("order_ts", "15 minutes")

# Aggregate revenue every 15 seconds for each type of order
time_windowed_revenue = cross_join_df.withColumn("time_window", window(col("order_ts"), "15 seconds")) \
    .groupby("time_window", "type_of_order") \
    .agg(sum("order_total").alias("total_revenue"))

# Output the results to the console in Update Mode (required for windowed aggregations)
query = time_windowed_revenue.writeStream \
    .outputMode("update") \
    .format("console") \
    .trigger(processingTime="5 seconds") \
    .queryName("cross_join")\
    .option("truncate", "false") \
    .start()

# This query takes little long to run

In [29]:
query.stop()

In [None]:
# 6b
from pyspark.sql.functions import count, when
from pyspark.sql.functions import col, from_unixtime

cross_join_df = cross_join_df.withColumn("ready_ts", from_unixtime(col("ready_ts") / 1000).cast("timestamp"))

#  Perform the aggregation every 30 seconds
delivery_time_aggregation = cross_join_df \
    .groupBy(
        window(col("ready_ts"), "30 seconds"),  # 30-second time window
        col("restaurant_suburb")  # Assuming restaurant data has 'restaurant_suburb'
    ) \
    .agg(
        count(when(col("predicted_delivery_time") <= 15, True)).alias("count_under_15"),
        count(when(col("predicted_delivery_time") > 15, True)).alias("count_over_15")
    ) \
    .select("window", "restaurant_suburb", "count_under_15", "count_over_15")



### 7.	Save the data from 6a and 6b to a Parquet file as streams. (Hint: Parquet files support streaming writing/reading. The file keeps updating while new batches arrive.) 

In [43]:
# 7a(save 6a)
# Streaming Write for Task 6a (Revenue per Order Type)
query_revenue = time_windowed_revenue.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("checkpointLocation", "checkpoints/revenue") \
    .option("path", "parquet/time_windowed_revenue") \
    .start()



In [30]:
query_revenue.stop()

In [None]:
# 7b(save 6b)
# Task 6b: Compute Delivery Time Categories per Restaurant Suburb Every 30 Seconds
# Write into parquet files the unsuccessful requests partitioned by status code
delivery_agg_query = delivery_time_aggregation.writeStream.format("parquet")\
        .outputMode("append")\
        .option("path", "parquetdelivery_time_aggregation")\
        .option("checkpointLocation", "parquet/delivery_time_aggregation/checkpoint")\
        .queryName("query_aggre")\
        .start()

In [None]:
delivery_agg_query.stop()

### 8. Read the two parquet files from task 7 as data streams and send them to Kafka topics with appropriate names.  
(Note: You shall read the parquet files as a streaming data frame and send messages to the Kafka topic when new data appears in the parquet file.)

In [44]:
kafka_bootstrap_servers = "kafka:9092"  
kafka_topic_6a = "time_windowed_revenue"  
kafka_topic_6b = "driver_performance"     

schema_6a = StructType([
    StructField("time_window", StructType([  # Nested structure for window
        StructField("start", TimestampType(), True),  # Window start time
        StructField("end", TimestampType(), True)  # Window end time
    ]), True),
    StructField("type_of_order", StringType(), True),  # Order category
    StructField("total_revenue", IntegerType(), True)  # Sum of revenue in the window
])

# Read the two parquet files as streaming data frames
time_windowed_revenue_stream = spark.readStream \
    .format("parquet") \
    .schema(schema_6a) \
    .option("path", "parquet/time_windowed_revenue") \
    .load()


# Send the data to Kafka
time_windowed_revenue_stream.selectExpr("to_json(struct(*)) AS value") \
    .writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("topic", "time_windowed_revenue_topic") \
    .option("checkpointLocation", "checkpoint/time_windowed_revenue_kafka") \
    .start()


<pyspark.sql.streaming.query.StreamingQuery at 0x7fa8d0d7a290>