# Pipeline 2: Stream Order Monitoring


In [None]:
from pyspark.sql import SparkSession
from pyspark import SparkConf
from pyspark.sql.functions import *
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, TimestampType
from time import sleep

sparkConf = SparkConf()
sparkConf.setMaster("spark://spark-master:7077")
sparkConf.setAppName("FastFood_Stream_Monitoring")
sparkConf.set("spark.driver.memory", "2g")
sparkConf.set("spark.executor.cores", "1")
sparkConf.set("spark.driver.cores", "1")

spark = SparkSession.builder.config(conf=sparkConf).getOrCreate()

dataSchema = StructType([
    StructField("order_id", StringType(), True),
    StructField("order_time", StringType(), True),
    StructField("city", StringType(), True),
    StructField("cuisine_type", StringType(), True),
    StructField("order_value", DoubleType(), True),
    StructField("delivery_time_minutes", IntegerType(), True),
    StructField("payment_method", StringType(), True),
    StructField("items_count", IntegerType(), True)
])

sdf_raw = spark.readStream \
    .schema(dataSchema) \
    .option("maxFilesPerTrigger", 1) \
    .option("latestFirst", "false") \
    .csv("/home/jovyan/data/fast_food_ordering_dataset_stream")

sdf = sdf_raw.withColumn("order_time", to_timestamp(col("order_time"), "yyyy-MM-dd HH:mm:ss"))

sdf.printSchema()


### Streaming Transformation 1: Real-time Orders by City

Monitor the count of orders per city in real-time.


In [None]:
for query in spark.streams.active:
    if query.name == "city_order_counts":
        query.stop()

city_counts = sdf.groupBy("city").count()

city_query = city_counts.writeStream \
    .queryName("city_order_counts") \
    .format("memory") \
    .outputMode("complete") \
    .trigger(processingTime='2 seconds') \
    .start()


### Streaming Transformation 2: Windowed Average Delivery Time by City

Calculate average delivery time by city in 30-day time windows.


In [None]:
with_event_time = sdf.selectExpr("*", "order_time as event_time")

for query in spark.streams.active:
    if query.name == "delivery_time_window":
        query.stop()

delivery_window = with_event_time \
    .withWatermark("event_time", "60 days") \
    .groupBy(window(col("event_time"), "30 days"), "city") \
    .agg(avg("delivery_time_minutes").alias("avg_delivery_time"), count("*").alias("orders_in_window"))

delivery_query = delivery_window.writeStream \
    .queryName("delivery_time_window") \
    .format("memory") \
    .outputMode("complete") \
    .trigger(processingTime='2 seconds') \
    .start()


### Streaming Transformation 3: Monthly Average Delivery Time by City

Calculate average delivery time by city in 30-day time windows.


In [None]:
for query in spark.streams.active:
    if query.name == "monthly_delivery_time":
        query.stop()

monthly_delivery_time = with_event_time \
    .withWatermark("event_time", "60 days") \
    .groupBy(window(col("event_time"), "30 days"), "city") \
    .agg(avg("delivery_time_minutes").alias("avg_delivery_time"), count("*").alias("monthly_orders"))

monthly_delivery_time_query = monthly_delivery_time.writeStream \
    .queryName("monthly_delivery_time") \
    .format("memory") \
    .outputMode("complete") \
    .trigger(processingTime='2 seconds') \
    .start()


### Monitor Streaming Queries

Display results from all streaming queries. The queries will update as new data arrives.


In [None]:
try:
    for x in range(20):
        print(f"\n=== Iteration {x+1} ===")
        
        print("\n--- City Order Counts ---")
        spark.sql("SELECT * FROM city_order_counts ORDER BY count DESC").show()
        
        print("\n--- Delivery Time by City (30-day windows) ---")
        spark.sql("""
            WITH ordered_windows AS (
                SELECT 
                    window,
                    city,
                    avg_delivery_time,
                    orders_in_window,
                    window.start as window_start
                FROM delivery_time_window
            )
            SELECT 
                window,
                city,
                avg_delivery_time,
                orders_in_window,
                AVG(avg_delivery_time) OVER (PARTITION BY city ORDER BY window_start ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as overall_avg_delivery_time,
                avg_delivery_time - AVG(avg_delivery_time) OVER (PARTITION BY city ORDER BY window_start ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as diff_from_overall_avg
            FROM ordered_windows
            ORDER BY window DESC 
            LIMIT 10
        """).show(truncate=False)
        
        print("\n--- Monthly Average Delivery Time by City (30-day windows) ---")
        spark.sql("""
            WITH ordered_windows AS (
                SELECT 
                    window,
                    city,
                    avg_delivery_time,
                    monthly_orders,
                    window.start as window_start
                FROM monthly_delivery_time
            )
            SELECT 
                window,
                city,
                avg_delivery_time,
                monthly_orders,
                AVG(avg_delivery_time) OVER (PARTITION BY city ORDER BY window_start ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as overall_avg_delivery_time,
                avg_delivery_time - AVG(avg_delivery_time) OVER (PARTITION BY city ORDER BY window_start ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW) as diff_from_overall_avg
            FROM ordered_windows
            ORDER BY window DESC 
            LIMIT 10
        """).show(truncate=False)
        
        sleep(10)
        
except KeyboardInterrupt:
    city_query.stop()
    delivery_query.stop()
    monthly_delivery_time_query.stop()
    spark.stop()
except Exception as e:
    city_query.stop()
    delivery_query.stop()
    monthly_delivery_time_query.stop()
    spark.stop()
    print(f"Error: {e}")




In [None]:
# Stop the Spark context
spark.stop()