## PROJECT 2: THE DEBS CHALLENGE 2015

In [1]:
!pip install kafka-python delta-spark



### Imports and Spark Session

In [2]:
import json

# PySpark imports
from pyspark.sql import SparkSession
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, DoubleType, TimestampType
from pyspark.sql.functions import col, to_json, struct, from_json, to_timestamp, year, month, dayofmonth, regexp_extract, floor, concat, expr, current_timestamp
from pyspark.sql.functions import udf, current_timestamp, expr, col, count, to_timestamp, year, month, dayofmonth, lit, max as max_
from pyspark.sql import DataFrame
from datetime import timedelta, datetime
from delta import *

import matplotlib.pyplot as plt
import seaborn as sns
import warnings
warnings.filterwarnings("ignore")

In [3]:
schema = StructType([
    StructField("medallion", StringType(), True),
    StructField("hack_license", StringType(), True),
    StructField("pickup_datetime", StringType(), True),
    StructField("dropoff_datetime", StringType(), True),
    StructField("trip_time_in_secs", IntegerType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("pickup_longitude", DoubleType(), True),
    StructField("pickup_latitude", DoubleType(), True),
    StructField("dropoff_longitude", DoubleType(), True),
    StructField("dropoff_latitude", DoubleType(), True),
    StructField("payment_type", StringType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("surcharge", DoubleType(), True),
    StructField("mta_tax", StringType(), True),      
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", StringType(), True), 
    StructField("total", DoubleType(), True)
])

### Query 0: Data cleaning and setup

In [4]:
spark = SparkSession.builder \
    .appName("debs_grand_challenge") \
    .config(
        "spark.jars.packages",
        "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.1,"
        "org.apache.spark:spark-token-provider-kafka-0-10_2.12:3.5.1"
    ) \
    .getOrCreate()

In [5]:
df_spark = spark.read \
    .format("csv") \
    .option("header", "false") \
    .schema(schema) \
    .load("data/sample.csv")      

df_to_kafka = df_spark.select(to_json(struct("*")).alias("value"))

df_to_kafka.write \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("topic", "nyc-taxi-clean") \
    .save()

print(" Finished sending data to Kafka (nyc-taxi-clean)!")

 Finished sending data to Kafka (nyc-taxi-clean)!


In [6]:
kafka_df = spark.read \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "nyc-taxi-clean") \
    .load()

kafka_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [7]:
# Convert the Kafka binary "value" to string
raw_str_df = kafka_df.selectExpr("CAST(value AS STRING) AS raw_string")

In [8]:
# Parse JSON with the same schema
parsed_df = raw_str_df.select(from_json(col("raw_string"), schema).alias("data")).select("data.*")

print("Parsed DataFrame from Kafka (preview):")
parsed_df.show(5, truncate=False)

Parsed DataFrame from Kafka (preview):
+--------------------------------+--------------------------------+-------------------+-------------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+-----+
|medallion                       |hack_license                    |pickup_datetime    |dropoff_datetime   |trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|total|
+--------------------------------+--------------------------------+-------------------+-------------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+-----+
|7B75A4AB3E535F48D4C0429851C4FC0A|032A6DB9395CDD22DC6BAEDD78E9B587|2013-01-02 06:20:06|2013-01-02 06:25:44|338        

In [9]:
#Removing the null values in the data
df_clean = parsed_df.dropna()

df_clean = df_clean.filter(
    (col("trip_time_in_secs") > 0) &
    (col("trip_distance") > 0) &
    (col("pickup_longitude") != 0.0) &
    (col("pickup_latitude") != 0.0) &
    (col("dropoff_longitude") != 0.0) &
    (col("dropoff_latitude") != 0.0)
)

df_clean.show(5)

+--------------------+--------------------+-------------------+-------------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+-----+
|           medallion|        hack_license|    pickup_datetime|   dropoff_datetime|trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|total|
+--------------------+--------------------+-------------------+-------------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+-----+
|7B75A4AB3E535F48D...|032A6DB9395CDD22D...|2013-01-02 06:20:06|2013-01-02 06:25:44|              338|          1.2|      -73.972343|      40.749863|       -73.982697|       40.760487|         CRD|        6.5|      0.0|    0.5|   

In [19]:
#df_clean.count()

975021

In [10]:
# 6. Add time columns
df_time = df_clean.withColumn(
    "pickup_ts", to_timestamp("pickup_datetime", "yyyy-MM-dd HH:mm:ss")
).withColumn(
    "dropoff_ts", to_timestamp("dropoff_datetime", "yyyy-MM-dd HH:mm:ss")
).withColumn("year", year("pickup_ts")) \
 .withColumn("month", month("pickup_ts")) \
 .withColumn("day", dayofmonth("pickup_ts"))


In [17]:
# 7. Partitioned Parquet Output
df_time.write \
    .partitionBy("year", "month") \
    .mode("overwrite") \
    .parquet("data/kafka_cleaned_partitioned")

#spark.stop()
print(" Done reading from Kafka, cleaning, and writing partitioned data!")

 Done reading from Kafka, cleaning, and writing partitioned data!


In [11]:
# Now create a new session
new_spark = SparkSession.builder.appName("CheckParquet").getOrCreate()

df_check = new_spark.read.parquet("data/kafka_cleaned_partitioned/")
df_check.show(10, truncate=False)


+--------------------------------+--------------------------------+-------------------+-------------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+-----+-------------------+-------------------+---+----+-----+
|medallion                       |hack_license                    |pickup_datetime    |dropoff_datetime   |trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|total|pickup_ts          |dropoff_ts         |day|year|month|
+--------------------------------+--------------------------------+-------------------+-------------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+-----+-------------------+-------------------+---+----

### Query 1:  Frequent Routes

#### Part 1: Finding top 10 most frequent routes during the last 30 minutes.

In [19]:
def get_grid_cell(lat, lon):
    # This simple grid resolution multiplies coordinates by 100.
    return f"{int(lat * 100)}_{int(lon * 100)}"

get_grid_cell_udf = udf(get_grid_cell, StringType())

# Add new columns for the starting and ending grid cell IDs.
df_with_cells = df_time.withColumn("start_cell", 
                                   get_grid_cell_udf(col("pickup_latitude"), col("pickup_longitude"))
                                  ).withColumn("end_cell", 
                                   get_grid_cell_udf(col("dropoff_latitude"), col("dropoff_longitude"))
                                  )

# Determine the maximum dropoff timestamp in your data.
max_dropoff_row = df_with_cells.select(max_("dropoff_ts").alias("max_dropoff")).first()
max_dropoff_ts = max_dropoff_row["max_dropoff"]

In [12]:
# Calculate a threshold that is 30 minutes before the max dropoff time.
time_threshold = max_dropoff_ts - datetime.timedelta(minutes=30)
    
# Filter the data to include only rides completed within these 30 minutes.
df_recent = df_with_cells.filter(col("dropoff_ts") >= lit(time_threshold))

# Group by starting and ending grid cells and count the rides.
df_routes = df_recent.groupBy("start_cell", "end_cell").agg(count("*").alias("Number_of_Rides"))
    
# Select the top 10 most frequent routes.
top_routes = df_routes.orderBy(col("Number_of_Rides").desc()).limit(10)

print("Top 10 Frequent Routes in the Last 30 Minutes (relative to max dropoff time):")
top_routes.show(truncate=False)

Top 10 Frequent Routes in the Last 30 Minutes (relative to max dropoff time):
+----------+----------+---------------+
|start_cell|end_cell  |Number_of_Rides|
+----------+----------+---------------+
|4075_-7397|4074_-7398|352            |
|4075_-7397|4075_-7397|308            |
|4075_-7397|4075_-7398|300            |
|4076_-7397|4075_-7398|300            |
|4076_-7396|4075_-7397|300            |
|4076_-7397|4075_-7397|288            |
|4074_-7398|4075_-7398|276            |
|4075_-7397|4076_-7397|276            |
|4075_-7398|4075_-7398|268            |
|4074_-7398|4075_-7397|268            |
+----------+----------+---------------+



#### Part 2:  Query results must be updated whenever any of the 10 most frequent routes change.


In [11]:
kafka_df = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "kafka:9092") \
    .option("subscribe", "nyc-taxi-clean") \
    .load()

kafka_df.printSchema()

root
 |-- key: binary (nullable = true)
 |-- value: binary (nullable = true)
 |-- topic: string (nullable = true)
 |-- partition: integer (nullable = true)
 |-- offset: long (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- timestampType: integer (nullable = true)



In [12]:
# Convert the binary 'value' from Kafka to a string and parse the JSON payload.
streaming_df = kafka_df.selectExpr("CAST(value AS STRING) as json_str") \
    .select(from_json(col("json_str"), schema).alias("data")) \
    .select("data.*")

# Add an 'ingest_time' column to mark when the event was read.
streaming_df = streaming_df.withColumn("ingest_time", current_timestamp())

# Optional: Print the schema to verify
streaming_df.printSchema()

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- trip_time_in_secs: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- surcharge: double (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- total: double (nullable = true)
 |-- ingest_time: timestamp (nullable = false)



In [15]:
# Define your grid constants for the 500m x 500m grid
grid_origin_lat = 41.474937
grid_origin_lon = -74.913585
delta_lat = 0.0045   # Approximate degrees for 500m in latitude
delta_lon = 0.0060   # Approximate degrees for 500m in longitude

# This is your foreachBatch function to process each micro-batch
def process_batch(batch_df, batch_id):
    # Skip empty batches
    if batch_df.rdd.isEmpty():
        return

    # PART A: Compute the 30-minute window based on the batch’s max dropoff
    max_dropoff = batch_df.agg({"dropoff_datetime": "max"}).collect()[0][0]
    if max_dropoff is None:
        return
    ref_time = max_dropoff - timedelta(minutes=30)

    # PART B: Compute grid cell IDs for pickup and dropoff using the 500m grid
    batch_df = batch_df.withColumn(
        "pickup_cell_east", floor((col("pickup_longitude") - lit(grid_origin_lon)) / lit(delta_lon)) + 1
    ).withColumn(
        "pickup_cell_south", floor((lit(grid_origin_lat) - col("pickup_latitude")) / lit(delta_lat)) + 1
    ).withColumn(
        "start_cell", concat(col("pickup_cell_east").cast("int"), lit("."), col("pickup_cell_south").cast("int"))
    )
    batch_df = batch_df.withColumn(
        "dropoff_cell_east", floor((col("dropoff_longitude") - lit(grid_origin_lon)) / lit(delta_lon)) + 1
    ).withColumn(
        "dropoff_cell_south", floor((lit(grid_origin_lat) - col("dropoff_latitude")) / lit(delta_lat)) + 1
    ).withColumn(
        "end_cell", concat(col("dropoff_cell_east").cast("int"), lit("."), col("dropoff_cell_south").cast("int"))
    )

    # Filter out trips that are out-of-bounds (only consider cells 1 to 300)
    batch_df = batch_df.filter(
        (col("pickup_cell_east").between(1, 300)) &
        (col("pickup_cell_south").between(1, 300)) &
        (col("dropoff_cell_east").between(1, 300)) &
        (col("dropoff_cell_south").between(1, 300))
    )

    # PART C: Filter for trips with dropoff_datetime >= ref_time (last 30 minutes)
    df_last30 = batch_df.filter(col("dropoff_datetime") >= F.lit(ref_time))
    print(f"Window filter: dropoff_datetime >= {ref_time}")
    print("df_last30 count =", df_last30.count())
    df_last30.show(5)


    # PART D: Aggregate routes and get top 10 most frequent
    df_frequent_routes = df_last30.groupBy("start_cell", "end_cell") \
        .count() \
        .withColumnRenamed("count", "Number_of_Rides")
    top10_routes = df_frequent_routes.orderBy(col("Number_of_Rides").desc()).limit(10)
    top10_list = top10_routes.collect()

    # PART E: Determine a triggering event and compute delay
    # Choose the event with the maximum dropoff_datetime as the trigger
    trigger_row = batch_df.orderBy(col("dropoff_datetime").desc()).limit(1).collect()[0]
    trigger_pickup = trigger_row["pickup_datetime"]
    trigger_dropoff = trigger_row["dropoff_datetime"]
    ingest_time = trigger_row["ingest_time"]
    processing_time = datetime.now()
    delay = (processing_time - ingest_time).total_seconds()

    # PART F: Build the output row
    output_row = {
        "pickup_datetime": trigger_pickup,
        "dropoff_datetime": trigger_dropoff,
        "delay": delay
    }
    for i in range(10):
        if i < len(top10_list):
            route = top10_list[i]
            output_row[f"start_cell_id_{i+1}"] = route["start_cell"]
            output_row[f"end_cell_id_{i+1}"] = route["end_cell"]
        else:
            output_row[f"start_cell_id_{i+1}"] = None
            output_row[f"end_cell_id_{i+1}"] = None

    # For demonstration, print the output update
    print(f"Update for batch {batch_id} :", output_row)
    
    # Define the output schema explicitly
    output_schema = StructType([
        StructField("pickup_datetime", TimestampType(), True),
        StructField("dropoff_datetime", TimestampType(), True),
        StructField("start_cell_id_1", StringType(), True),
        StructField("end_cell_id_1", StringType(), True),
        StructField("start_cell_id_2", StringType(), True),
        StructField("end_cell_id_2", StringType(), True),
        StructField("start_cell_id_3", StringType(), True),
        StructField("end_cell_id_3", StringType(), True),
        StructField("start_cell_id_4", StringType(), True),
        StructField("end_cell_id_4", StringType(), True),
        StructField("start_cell_id_5", StringType(), True),
        StructField("end_cell_id_5", StringType(), True),
        StructField("start_cell_id_6", StringType(), True),
        StructField("end_cell_id_6", StringType(), True),
        StructField("start_cell_id_7", StringType(), True),
        StructField("end_cell_id_7", StringType(), True),
        StructField("start_cell_id_8", StringType(), True),
        StructField("end_cell_id_8", StringType(), True),
        StructField("start_cell_id_9", StringType(), True),
        StructField("end_cell_id_9", StringType(), True),
        StructField("start_cell_id_10", StringType(), True),
        StructField("end_cell_id_10", StringType(), True),
        StructField("delay", DoubleType(), True)
    ])
    
    # Create the DataFrame using the explicit schema
    result_df = spark.createDataFrame([output_row], schema=output_schema)
    # Write the result_df as a table (it will create the table if it doesn't exist)
    result_df.write.mode("append").saveAsTable("top_ten_routes_output")
    result_df.show(truncate=False)

# If your taxi data doesn’t already have proper types, ensure you convert the datetime columns
streaming_df = streaming_df.withColumn("pickup_datetime", to_timestamp(col("pickup_datetime"), "yyyy-MM-dd HH:mm:ss"))
streaming_df = streaming_df.withColumn("dropoff_datetime", to_timestamp(col("dropoff_datetime"), "yyyy-MM-dd HH:mm:ss"))
streaming_df = streaming_df.withColumn("ingest_time", current_timestamp())

# Use trigger(once=True) to process existing data exactly one time
query = (
    streaming_df.writeStream
    .trigger(once=True)                # <--- This forces the query to run just once
    .foreachBatch(process_batch)
    .outputMode("append")
    .start()
)
query.awaitTermination()

In [20]:
from datetime import datetime, timedelta
from pyspark.sql.functions import col, floor, concat, to_timestamp, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, TimestampType, DoubleType

# Ensure Kafka Stream is working
assert kafka_df.isStreaming, "Kafka DataFrame is not streaming!"

# Convert Kafka binary data into JSON
streaming_df = kafka_df.selectExpr("CAST(value AS STRING) as json_str") \
    .select(from_json(col("json_str"), schema).alias("data")) \
    .select("data.*")

# Add an ingestion timestamp
streaming_df = streaming_df.withColumn("ingest_time", current_timestamp())

# Convert datetime fields
streaming_df = streaming_df.withColumn("pickup_datetime", to_timestamp(col("pickup_datetime"), "yyyy-MM-dd HH:mm:ss"))
streaming_df = streaming_df.withColumn("dropoff_datetime", to_timestamp(col("dropoff_datetime"), "yyyy-MM-dd HH:mm:ss"))

# Define grid constants
grid_origin_lat = 41.474937
grid_origin_lon = -74.913585
delta_lat = 0.0045  
delta_lon = 0.0060  

# Process micro-batches
def process_batch(batch_df, batch_id):
    if batch_df.rdd.isEmpty():
        print(f"No data in batch {batch_id}, skipping...")
        return

    max_dropoff = batch_df.agg({"dropoff_datetime": "max"}).collect()[0][0]
    if max_dropoff is None:
        return
    ref_time = max_dropoff - timedelta(minutes=30)

    # Compute grid cell IDs
    batch_df = batch_df.withColumn(
        "pickup_cell_east", floor((col("pickup_longitude") - lit(grid_origin_lon)) / lit(delta_lon)) + 1
    ).withColumn(
        "pickup_cell_south", floor((lit(grid_origin_lat) - col("pickup_latitude")) / lit(delta_lat)) + 1
    ).withColumn(
        "start_cell", concat(col("pickup_cell_east").cast("int"), lit("."), col("pickup_cell_south").cast("int"))
    ).withColumn(
        "dropoff_cell_east", floor((col("dropoff_longitude") - lit(grid_origin_lon)) / lit(delta_lon)) + 1
    ).withColumn(
        "dropoff_cell_south", floor((lit(grid_origin_lat) - col("dropoff_latitude")) / lit(delta_lat)) + 1
    ).withColumn(
        "end_cell", concat(col("dropoff_cell_east").cast("int"), lit("."), col("dropoff_cell_south").cast("int"))
    )

    # Keep trips in the valid grid range
    batch_df = batch_df.filter(
        (col("pickup_cell_east").between(1, 300)) &
        (col("pickup_cell_south").between(1, 300)) &
        (col("dropoff_cell_east").between(1, 300)) &
        (col("dropoff_cell_south").between(1, 300))
    )

    # Filter last 30 minutes of trips
    df_last30 = batch_df.filter(col("dropoff_datetime") >= lit(ref_time))
    df_last30.show(5)

    # Aggregate most frequent routes
    df_frequent_routes = df_last30.groupBy("start_cell", "end_cell") \
        .count().withColumnRenamed("count", "Number_of_Rides")

    # Get top 10 routes
    top10_routes = df_frequent_routes.orderBy(col("Number_of_Rides").desc()).limit(10)
    top10_list = top10_routes.collect()

    # Get max dropoff event
    trigger_rows = batch_df.orderBy(col("dropoff_datetime").desc()).limit(1).collect()
    if len(trigger_rows) == 0:
        return
    trigger_row = trigger_rows[0]

    processing_time = datetime.utcnow()
    delay = (processing_time - trigger_row["ingest_time"]).total_seconds()

    # Create output row
    output_row = {"pickup_datetime": trigger_row["pickup_datetime"], "dropoff_datetime": trigger_row["dropoff_datetime"], "delay": delay}
    for i in range(10):
        if i < len(top10_list):
            output_row[f"start_cell_id_{i+1}"] = top10_list[i]["start_cell"]
            output_row[f"end_cell_id_{i+1}"] = top10_list[i]["end_cell"]
        else:
            output_row[f"start_cell_id_{i+1}"] = None
            output_row[f"end_cell_id_{i+1}"] = None

    result_df = spark.createDataFrame([output_row])
    result_df.write.mode("append").saveAsTable("top_ten_routes_output")

# Run streaming query
query = streaming_df.writeStream.trigger(once=True).foreachBatch(process_batch).outputMode("append").start()
query.awaitTermination()


In [21]:
spark.sql("SELECT * FROM top_ten_routes_output").show()

AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `top_ten_routes_output` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS.; line 1 pos 14;
'Project [*]
+- 'UnresolvedRelation [top_ten_routes_output], [], false


### Query 2: Profitable Areas

#### Part 1: Report only the 10 most profitable areas

In [19]:
import math
from datetime import timedelta, datetime
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, floor, concat, lit, current_timestamp, to_timestamp, expr
)
from pyspark.sql import functions as F
from pyspark.sql.types import (
    StructType, StructField, TimestampType, DoubleType, StringType
)
from pyspark.sql.window import Window

# Grid constants for the 500m x 500m grid
grid_origin_lat = 41.474937
grid_origin_lon = -74.913585
delta_lat = 0.0045   # Approximate degrees for 500m in latitude
delta_lon = 0.0060   # Approximate degrees for 500m in longitude

def process_batch_query2(batch_df, batch_id):
    # Skip empty batches
    if batch_df.rdd.isEmpty():
        print("Empty batches") #for debugging 
        return

    # PART A: Compute reference times based on the batch’s maximum dropoff_datetime
    max_dropoff = batch_df.agg({"dropoff_datetime": "max"}).collect()[0][0]
    if max_dropoff is None:
        print("Error At Maxdropoff") # for debugging
        return
    # For profit: consider trips ending in the last 15 minutes
    ref_time_profit = max_dropoff - timedelta(minutes=15)
    # For empty taxis: consider taxis whose last dropoff was within the last 30 minutes
    ref_time_empty = max_dropoff - timedelta(minutes=30)

    # PART B: Compute profit aggregate per area (using pickup location)
    # Only consider trips that ended in the last 15 minutes
    profit_df = batch_df.filter(col("dropoff_datetime") >= F.lit(ref_time_profit)) \
        .withColumn("profit", col("fare_amount") + col("tip_amount")) \
        .withColumn(
            "pickup_cell_east",
            floor((col("pickup_longitude") - lit(grid_origin_lon)) / lit(delta_lon)) + 1
        ).withColumn(
            "pickup_cell_south",
            floor((lit(grid_origin_lat) - col("pickup_latitude")) / lit(delta_lat)) + 1
        ).withColumn(
            "pickup_cell",
            concat(col("pickup_cell_east").cast("int"), lit("."), col("pickup_cell_south").cast("int"))
        )
    profit_agg = profit_df.groupBy("pickup_cell") \
        .agg(F.expr("approx_percentile(profit, 0.5) as median_profit"))
    
    # PART C: Compute empty taxi aggregate per area (using dropoff location)
    # For each taxi, take the latest dropoff event and if it occurred in the last 30 minutes, consider it empty.
    w = Window.partitionBy("medallion").orderBy(col("dropoff_datetime").desc())
    last_dropoff_df = batch_df.withColumn("rn", F.row_number().over(w)) \
        .filter(col("rn") == 1)
    empty_df = last_dropoff_df.filter(col("dropoff_datetime") >= F.lit(ref_time_empty)) \
        .withColumn(
            "dropoff_cell_east",
            floor((col("dropoff_longitude") - lit(grid_origin_lon)) / lit(delta_lon)) + 1
        ).withColumn(
            "dropoff_cell_south",
            floor((lit(grid_origin_lat) - col("dropoff_latitude")) / lit(delta_lat)) + 1
        ).withColumn(
            "dropoff_cell",
            concat(col("dropoff_cell_east").cast("int"), lit("."), col("dropoff_cell_south").cast("int"))
        )
    empty_agg = empty_df.groupBy("dropoff_cell") \
        .agg(F.countDistinct("medallion").alias("empty_taxis"))
    
    # PART D: Join the two aggregates on the cell identifier.
    # (Here we assume the area is defined by the same grid cell for pickup and dropoff.)
    area_df = profit_agg.join(empty_agg, profit_agg.pickup_cell == empty_agg.dropoff_cell, "inner") \
        .select(profit_agg.pickup_cell.alias("cell_id"), "median_profit", "empty_taxis") \
        .filter(col("empty_taxis") > 0) \
        .withColumn("profitability", col("median_profit") / col("empty_taxis"))
    
    # Get the top 10 areas by profitability
    top10_areas = area_df.orderBy(col("profitability").desc()).limit(10)
    top10_list = top10_areas.collect()
    
    # PART E: Determine a triggering event and compute processing delay.
    # Choose the event with the maximum dropoff_datetime as the trigger.
    trigger_row = batch_df.orderBy(col("dropoff_datetime").desc()).limit(1).collect()[0]
    trigger_pickup = trigger_row["pickup_datetime"]
    trigger_dropoff = trigger_row["dropoff_datetime"]
    ingest_time = trigger_row["ingest_time"]
    processing_time = datetime.now()
    delay = (processing_time - ingest_time).total_seconds()
    
    # PART F: Build the output row.
    # The output format is:
    # pickup_datetime, dropoff_datetime, and for each of the top 10 areas:
    # profitable_cell_id_i, empty_taxies_in_cell_id_i, median_profit_in_cell_id_i, profitability_of_cell_i, then delay.
    output_row = {
        "pickup_datetime": trigger_pickup,
        "dropoff_datetime": trigger_dropoff,
        "delay": delay
    }
    # For each of the top 10 areas, add four columns.
    for i in range(10):
        if i < len(top10_list):
            area = top10_list[i]
            output_row[f"profitable_cell_id_{i+1}"] = area["cell_id"]
            # Convert the empty taxi count to string (or leave as numeric) as desired.
            output_row[f"empty_taxies_in_cell_id_{i+1}"] = str(area["empty_taxis"])
            output_row[f"median_profit_in_cell_id_{i+1}"] = area["median_profit"]
            output_row[f"profitability_of_cell_{i+1}"] = area["profitability"]
        else:
            output_row[f"profitable_cell_id_{i+1}"] = None
            output_row[f"empty_taxies_in_cell_id_{i+1}"] = None
            output_row[f"median_profit_in_cell_id_{i+1}"] = None
            output_row[f"profitability_of_cell_{i+1}"] = None

    print(f"Update for batch {batch_id}:", output_row)
    
    # Define the output schema explicitly.
    fields = [
        StructField("pickup_datetime", TimestampType(), True),
        StructField("dropoff_datetime", TimestampType(), True)
    ]
    for i in range(10):
        fields.extend([
            StructField(f"profitable_cell_id_{i+1}", StringType(), True),
            StructField(f"empty_taxies_in_cell_id_{i+1}", StringType(), True),
            StructField(f"median_profit_in_cell_id_{i+1}", DoubleType(), True),
            StructField(f"profitability_of_cell_{i+1}", DoubleType(), True)
        ])
    fields.append(StructField("delay", DoubleType(), True))
    output_schema = StructType(fields)
    
    result_df = spark.createDataFrame([output_row], schema=output_schema)
    result_df.show(truncate=False)


In [20]:
# Example: set up the streaming query (using trigger(once=True) for testing)
query2 = (
    streaming_df.writeStream
    .trigger(once=True)  # For testing; in production you might use a continuous or timed trigger.
    .foreachBatch(process_batch_query2)
    .outputMode("append")
    .start()
)

query2.awaitTermination()
