### Install dependencies

In [1]:
!pip install delta-spark



In [2]:
from delta import configure_spark_with_delta_pip
from pyspark.sql import SparkSession


builder = SparkSession.builder.appName("project2") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog")

spark = configure_spark_with_delta_pip(builder).getOrCreate()

## Load the data

In [3]:
# Define the expected column names based on the CSV's field count
columns = [
    "medallion", "hack_license", "pickup_datetime", "dropoff_datetime",
    "trip_time_in_secs", "trip_distance", "pickup_longitude", "pickup_latitude",
    "dropoff_longitude", "dropoff_latitude", "payment_type", "fare_amount",
    "surcharge", "mta_tax", "tip_amount", "tolls_amount", "total_amount"
]

df = spark.read.option("header", "false").csv("input/sorted_data.csv")
df = df.toDF(*columns)
df.printSchema()

fraction = 1.0 / 12  # Approximately 0.03
df_sample = df.sample(withReplacement=False, fraction=fraction)

root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- trip_time_in_secs: string (nullable = true)
 |-- trip_distance: string (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- surcharge: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- total_amount: string (nullable = true)



## Data cleansing

In [4]:
from pyspark.sql.functions import col, to_timestamp

# Convert pickup and dropoff datetimes to timestamps
df_sample = df_sample.withColumn("pickup_datetime", to_timestamp(col("pickup_datetime"), "yyyy-MM-dd HH:mm:ss")) \
                     .withColumn("dropoff_datetime", to_timestamp(col("dropoff_datetime"), "yyyy-MM-dd HH:mm:ss"))

clean_df = df_sample.filter(
    (col("pickup_datetime").isNotNull()) &
    (col("dropoff_datetime").isNotNull()) &
    (col("medallion").isNotNull()) & (col("medallion") != "") &
    (col("hack_license").isNotNull()) & (col("hack_license") != "") &
    (col("trip_distance").cast("float") > 0) &
    (col("fare_amount").cast("float") > 0)
)

clean_df = clean_df.withColumn("trip_time_in_secs", col("trip_time_in_secs").cast("int")) \
                   .withColumn("trip_distance", col("trip_distance").cast("float")) \
                   .withColumn("fare_amount", col("fare_amount").cast("float")) \
                   .withColumn("surcharge", col("surcharge").cast("float")) \
                   .withColumn("mta_tax", col("mta_tax").cast("float")) \
                   .withColumn("tip_amount", col("tip_amount").cast("float")) \
                   .withColumn("tolls_amount", col("tolls_amount").cast("float"))
clean_df.count()
clean_df.show(20)

+--------------------+--------------------+-------------------+-------------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+------------+
|           medallion|        hack_license|    pickup_datetime|   dropoff_datetime|trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|total_amount|
+--------------------+--------------------+-------------------+-------------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+------------+
|1390FB380189DF6BB...|BE317B986700F63C4...|2013-01-01 00:01:00|2013-01-01 00:03:00|              120|         0.48|      -74.004173|      40.720947|       -74.003838|       40.726189|         CSH|        4.0|

## Query 1 - Part 1

In [22]:
from pyspark.sql.functions import col, floor, concat, lit, current_timestamp, expr
from datetime import timedelta

max_dropoff = clean_df.agg({"dropoff_datetime": "max"}).collect()[0][0]
print("Max dropoff datetime:", max_dropoff)

ref_time = max_dropoff - timedelta(minutes=30)

# Create grid cell identifiers for pickup (start_cell) and drop-off (end_cell)
cell_size = 0.01

df_routes = clean_df.withColumn(
    "start_cell",
    concat(
        floor(col("pickup_latitude") / cell_size),
        lit("_"),
        floor(col("pickup_longitude") / cell_size)
    )
).withColumn(
    "end_cell",
    concat(
        floor(col("dropoff_latitude") / cell_size),
        lit("_"),
        floor(col("dropoff_longitude") / cell_size)
    )
)

df_last30 = df_routes.filter(col("dropoff_datetime") >= lit(ref_time))


# Group by start and end cell and count the rides
df_frequent_routes = df_last30.groupBy("start_cell", "end_cell").count() \
    .withColumnRenamed("count", "Number_of_Rides")

top10_routes = df_frequent_routes.orderBy(col("Number_of_Rides").desc()).limit(10)
top10_routes.show(truncate=False)


Max dropoff datetime: 2014-01-01 00:37:00
+----------+----------+---------------+
|start_cell|end_cell  |Number_of_Rides|
+----------+----------+---------------+
|4073_-7399|4075_-7400|2              |
|4076_-7399|4075_-7400|2              |
|4076_-7399|4072_-7399|2              |
|4071_-7401|4069_-7386|1              |
|4075_-7397|4079_-7398|1              |
|4067_-7396|4072_-7395|1              |
|4081_-7392|4083_-7385|1              |
|4076_-7398|4075_-7400|1              |
|4064_-7378|4061_-7392|1              |
|4069_-7394|4074_-7387|1              |
+----------+----------+---------------+



## Query 1 - Part 2

In [5]:
# Write the cleansed data to a Delta table in a writable directory.
clean_df.write.format("delta").mode("overwrite").save("/tmp/delta/taxi_data")
streaming_df = spark.readStream.format("delta").load("/tmp/delta/taxi_data")

In [17]:
# 1) Read the Delta table in batch mode
check_df = spark.read.format("delta").load("/tmp/delta/taxi_data")

# 2) See the schema
check_df.printSchema()

# 3) Inspect some rows
check_df.show(5, truncate=False)

# 4) Count the rows
print("Row count:", check_df.count())


root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- trip_time_in_secs: integer (nullable = true)
 |-- trip_distance: float (nullable = true)
 |-- pickup_longitude: string (nullable = true)
 |-- pickup_latitude: string (nullable = true)
 |-- dropoff_longitude: string (nullable = true)
 |-- dropoff_latitude: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: float (nullable = true)
 |-- surcharge: float (nullable = true)
 |-- mta_tax: float (nullable = true)
 |-- tip_amount: float (nullable = true)
 |-- tolls_amount: float (nullable = true)
 |-- total_amount: string (nullable = true)

+--------------------------------+--------------------------------+-------------------+-------------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+----------

In [6]:
from pyspark.sql.functions import (
    col, floor, concat, lit, current_timestamp, to_timestamp, expr
)
from pyspark.sql import functions as F
from datetime import timedelta, datetime
from pyspark.sql.types import StructType, StructField, TimestampType, DoubleType, StringType

# Define your grid constants for the 500m x 500m grid
grid_origin_lat = 41.474937
grid_origin_lon = -74.913585
delta_lat = 0.0045   # Approximate degrees for 500m in latitude
delta_lon = 0.0060   # Approximate degrees for 500m in longitude

# This is your foreachBatch function to process each micro-batch
def process_batch(batch_df, batch_id):
    # Skip empty batches
    if batch_df.rdd.isEmpty():
        return

    # PART A: Compute the 30-minute window based on the batch’s max dropoff
    max_dropoff = batch_df.agg({"dropoff_datetime": "max"}).collect()[0][0]
    if max_dropoff is None:
        return
    ref_time = max_dropoff - timedelta(minutes=30)

    # PART B: Compute grid cell IDs for pickup and dropoff using the 500m grid
    batch_df = batch_df.withColumn(
        "pickup_cell_east", floor((col("pickup_longitude") - lit(grid_origin_lon)) / lit(delta_lon)) + 1
    ).withColumn(
        "pickup_cell_south", floor((lit(grid_origin_lat) - col("pickup_latitude")) / lit(delta_lat)) + 1
    ).withColumn(
        "start_cell", concat(col("pickup_cell_east").cast("int"), lit("."), col("pickup_cell_south").cast("int"))
    )
    batch_df = batch_df.withColumn(
        "dropoff_cell_east", floor((col("dropoff_longitude") - lit(grid_origin_lon)) / lit(delta_lon)) + 1
    ).withColumn(
        "dropoff_cell_south", floor((lit(grid_origin_lat) - col("dropoff_latitude")) / lit(delta_lat)) + 1
    ).withColumn(
        "end_cell", concat(col("dropoff_cell_east").cast("int"), lit("."), col("dropoff_cell_south").cast("int"))
    )

    # Filter out trips that are out-of-bounds (only consider cells 1 to 300)
    batch_df = batch_df.filter(
        (col("pickup_cell_east").between(1, 300)) &
        (col("pickup_cell_south").between(1, 300)) &
        (col("dropoff_cell_east").between(1, 300)) &
        (col("dropoff_cell_south").between(1, 300))
    )

    # PART C: Filter for trips with dropoff_datetime >= ref_time (last 30 minutes)
    df_last30 = batch_df.filter(col("dropoff_datetime") >= F.lit(ref_time))
    print(f"Window filter: dropoff_datetime >= {ref_time}")
    print("df_last30 count =", df_last30.count())
    df_last30.show(5)


    # PART D: Aggregate routes and get top 10 most frequent
    df_frequent_routes = df_last30.groupBy("start_cell", "end_cell") \
        .count() \
        .withColumnRenamed("count", "Number_of_Rides")
    top10_routes = df_frequent_routes.orderBy(col("Number_of_Rides").desc()).limit(10)
    top10_list = top10_routes.collect()

    # PART E: Determine a triggering event and compute delay
    # Choose the event with the maximum dropoff_datetime as the trigger
    trigger_row = batch_df.orderBy(col("dropoff_datetime").desc()).limit(1).collect()[0]
    trigger_pickup = trigger_row["pickup_datetime"]
    trigger_dropoff = trigger_row["dropoff_datetime"]
    ingest_time = trigger_row["ingest_time"]
    processing_time = datetime.now()
    delay = (processing_time - ingest_time).total_seconds()

    # PART F: Build the output row
    output_row = {
        "pickup_datetime": trigger_pickup,
        "dropoff_datetime": trigger_dropoff,
        "delay": delay
    }
    for i in range(10):
        if i < len(top10_list):
            route = top10_list[i]
            output_row[f"start_cell_id_{i+1}"] = route["start_cell"]
            output_row[f"end_cell_id_{i+1}"] = route["end_cell"]
        else:
            output_row[f"start_cell_id_{i+1}"] = None
            output_row[f"end_cell_id_{i+1}"] = None

    # For demonstration, print the output update
    print(f"Update for batch {batch_id} :", output_row)
    
    # Define the output schema explicitly
    output_schema = StructType([
        StructField("pickup_datetime", TimestampType(), True),
        StructField("dropoff_datetime", TimestampType(), True),
        StructField("start_cell_id_1", StringType(), True),
        StructField("end_cell_id_1", StringType(), True),
        StructField("start_cell_id_2", StringType(), True),
        StructField("end_cell_id_2", StringType(), True),
        StructField("start_cell_id_3", StringType(), True),
        StructField("end_cell_id_3", StringType(), True),
        StructField("start_cell_id_4", StringType(), True),
        StructField("end_cell_id_4", StringType(), True),
        StructField("start_cell_id_5", StringType(), True),
        StructField("end_cell_id_5", StringType(), True),
        StructField("start_cell_id_6", StringType(), True),
        StructField("end_cell_id_6", StringType(), True),
        StructField("start_cell_id_7", StringType(), True),
        StructField("end_cell_id_7", StringType(), True),
        StructField("start_cell_id_8", StringType(), True),
        StructField("end_cell_id_8", StringType(), True),
        StructField("start_cell_id_9", StringType(), True),
        StructField("end_cell_id_9", StringType(), True),
        StructField("start_cell_id_10", StringType(), True),
        StructField("end_cell_id_10", StringType(), True),
        StructField("delay", DoubleType(), True)
    ])
    
    # Create the DataFrame using the explicit schema
    result_df = spark.createDataFrame([output_row], schema=output_schema)
    result_df.show(truncate=False)

# If your taxi data doesn’t already have proper types, ensure you convert the datetime columns
streaming_df = streaming_df.withColumn("pickup_datetime", to_timestamp(col("pickup_datetime"), "yyyy-MM-dd HH:mm:ss"))
streaming_df = streaming_df.withColumn("dropoff_datetime", to_timestamp(col("dropoff_datetime"), "yyyy-MM-dd HH:mm:ss"))
streaming_df = streaming_df.withColumn("ingest_time", current_timestamp())

# Use trigger(once=True) to process existing data exactly one time
query = (
    streaming_df.writeStream
    .trigger(once=True)                # <--- This forces the query to run just once
    .foreachBatch(process_batch)
    .outputMode("append")
    .start()
)

query.awaitTermination()


Window filter: dropoff_datetime >= 2014-01-01 00:32:18
df_last30 count = 3
+--------------------+--------------------+-------------------+-------------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+------------+--------------------+----------------+-----------------+----------+-----------------+------------------+--------+
|           medallion|        hack_license|    pickup_datetime|   dropoff_datetime|trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|total_amount|         ingest_time|pickup_cell_east|pickup_cell_south|start_cell|dropoff_cell_east|dropoff_cell_south|end_cell|
+--------------------+--------------------+-------------------+-------------------+-----------------+-------------+----------------+---------------+-----------------+-----