# Environment prep

In [1]:
!pip install pyspark



In [2]:
!pip install kafka-python



In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, window, udf, count, rank, lit, when, struct, collect_list, median, current_timestamp
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, IntegerType, TimestampType
import math
import time

# Initialize Spark Session
spark = SparkSession.builder \
    .appName("BDM_project_2") \
    .getOrCreate()

print("Spark version:", spark.version)

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/03/30 21:37:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Spark version: 3.5.4


In [4]:
# Define schema for taxi data
schema = StructType([
    StructField("medallion", StringType(), True),
    StructField("hack_license", StringType(), True),
    StructField("pickup_datetime", TimestampType(), True),
    StructField("dropoff_datetime", TimestampType(), True),
    StructField("trip_time_in_secs", IntegerType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("pickup_longitude", DoubleType(), True),
    StructField("pickup_latitude", DoubleType(), True),
    StructField("dropoff_longitude", DoubleType(), True),
    StructField("dropoff_latitude", DoubleType(), True),
    StructField("payment_type", StringType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("surcharge", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True)
])

# Data cleaning (Query 0)

In [5]:
data_path = "./sorted_data_1gb"
df = spark.read.csv(data_path, schema = schema, header = True)

cleaned_df = df.filter(
    (col("pickup_longitude").isNotNull()) &
    (col("pickup_latitude").isNotNull()) &
    (col("dropoff_longitude").isNotNull()) &
    (col("dropoff_latitude").isNotNull()) &
    (col("pickup_longitude") != 0) &
    (col("pickup_latitude") != 0) &
    (col("dropoff_longitude") != 0) &
    (col("dropoff_latitude") != 0) &
    (col("medallion").isNotNull()) &
    (col("hack_license").isNotNull()) &
    (col("trip_time_in_secs") > 0) &
    (col("trip_distance") > 0) &
    (col("fare_amount") > 0)
)

print("Query 0 - Sample of Cleaned Data:")
cleaned_df.show(5)

Query 0 - Sample of Cleaned Data:
+--------------------+--------------------+-------------------+-------------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+
|           medallion|        hack_license|    pickup_datetime|   dropoff_datetime|trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|
+--------------------+--------------------+-------------------+-------------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+
|FF0622D5D4D01EB3C...|E1E2DD8B97AB23CC6...|2013-11-15 21:39:56|2013-11-15 21:45:19|              322|          1.1|        -73.9776|      40.786705|       -73.981133|       40.774364|         CRD|        6.0|     

In [6]:
def get_grid_cell(lat, lon, cell_size_m):
    if lat is None or lon is None:
        return None
    origin_lat, origin_lon = 41.474937, -74.913585
    lat_per_cell = cell_size_m / 111000.0
    lon_per_cell = cell_size_m / (111000.0 * math.cos(math.radians(origin_lat)))
    lat_offset = math.floor((lat - origin_lat) / lat_per_cell) + 1
    lon_offset = math.floor((lon - origin_lon) / lon_per_cell) + 1
    return f"{lat_offset}.{lon_offset}"

grid_cell_500_udf = udf(lambda lat, lon: get_grid_cell(lat, lon, 500), StringType())
grid_cell_250_udf = udf(lambda lat, lon: get_grid_cell(lat, lon, 250), StringType())

# Grid transformation
df_grid = cleaned_df \
    .withColumn("pickup_grid_500", grid_cell_500_udf(col("pickup_latitude"), col("pickup_longitude"))) \
    .withColumn("dropoff_grid_500", grid_cell_500_udf(col("dropoff_latitude"), col("dropoff_longitude"))) \
    .withColumn("pickup_grid_250", grid_cell_250_udf(col("pickup_latitude"), col("pickup_longitude"))) \
    .withColumn("dropoff_grid_250", grid_cell_250_udf(col("dropoff_latitude"), col("dropoff_longitude"))) \
    .filter(col("pickup_grid_500").isNotNull() & col("dropoff_grid_500").isNotNull() &
            col("pickup_grid_250").isNotNull() & col("dropoff_grid_250").isNotNull())

print("Grid Cell Sample:")
df_grid.select("pickup_grid_500", "dropoff_grid_500", "pickup_grid_250", "dropoff_grid_250").show(5)

Grid Cell Sample:
+---------------+----------------+---------------+----------------+
|pickup_grid_500|dropoff_grid_500|pickup_grid_250|dropoff_grid_250|
+---------------+----------------+---------------+----------------+
|       -152.156|        -155.156|       -305.312|        -311.311|
|       -155.160|        -162.154|       -310.320|        -324.308|
|       -156.159|        -168.159|       -313.318|        -337.318|
|       -169.162|        -168.158|       -338.323|        -337.316|
|       -154.162|        -155.160|       -308.323|        -310.319|
+---------------+----------------+---------------+----------------+
only showing top 5 rows



# Query 1

Part 1

In [12]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Define a fixed reference timestamp
reference_time = df_grid.agg(F.max("pickup_datetime")).collect()[0][0]

# Filter the data for trips that occurred in the last 30 minutes relative to the reference time
time_limit = F.lit(reference_time) - F.expr("INTERVAL 30 MINUTES")

# Filter the DataFrame to include only trips completed in the last 30 minutes from the reference time
df_recent = df_grid.filter(F.col("pickup_datetime") >= time_limit)

# Perform the groupBy aggregation to find frequent routes within the last 30 minutes
route_counts = df_recent \
    .groupBy("pickup_grid_500", "dropoff_grid_500") \
    .agg(F.count("*").alias("ride_count"))

# Rank routes by their frequency
window_spec = Window.orderBy(F.col("ride_count").desc())

# Apply the window function to rank the routes
top_10_routes_static = route_counts \
    .withColumn("rank", F.rank().over(window_spec)) \
    .filter(F.col("rank") <= 10)

# Select only the relevant columns for output: start cell, end cell, and number of rides
top_10_routes_static = top_10_routes_static.select(
    F.col("pickup_grid_500").alias("start_cell"),
    F.col("dropoff_grid_500").alias("end_cell"),
    "ride_count"
)

# Show the top 10 frequent routes (for verification)
top_10_routes_static.show(10)

25/03/30 21:57:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/30 21:57:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/30 21:57:14 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.

+----------+--------+----------+
|start_cell|end_cell|ride_count|
+----------+--------+----------+
|  -154.156|-152.156|         2|
|  -164.155|-162.156|         2|
|  -159.157|-162.152|         1|
|  -167.154|-165.155|         1|
|  -168.155|-157.154|         1|
|  -160.176|-161.174|         1|
|  -158.158|-160.152|         1|
|  -160.154|-158.154|         1|
|  -159.157|-162.155|         1|
|  -158.159|-162.168|         1|
+----------+--------+----------+
only showing top 10 rows



25/03/30 21:57:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/30 21:57:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/30 21:57:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
25/03/30 21:57:16 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

Part 2

In [10]:
windowed_routes = df_grid \
    .withWatermark("dropoff_datetime", "30 minutes") \
    .groupBy(window("dropoff_datetime", "30 minutes"), "pickup_grid_500", "dropoff_grid_500") \
    .agg(count("*").alias("ride_count"))

# Rank and filter top 10
from pyspark.sql import Window

window_spec = Window.partitionBy("window").orderBy(col("ride_count").desc())

top_routes = windowed_routes \
    .withColumn("rank", rank().over(window_spec)) \
    .filter(col("rank") <= 10)

# Add processing delay
top_routes = top_routes.withColumn("processing_delay", (current_timestamp() - col("window.start")).cast("long"))

top_routes.show(10)



+--------------------+---------------+----------------+----------+----+----------------+
|              window|pickup_grid_500|dropoff_grid_500|ride_count|rank|processing_delay|
+--------------------+---------------+----------------+----------+----+----------------+
|{2013-01-01 04:00...|       -162.154|        -167.153|         2|   1|       386358070|
|{2013-01-01 04:00...|       -163.155|        -169.151|         2|   1|       386358070|
|{2013-01-01 04:00...|       -158.154|        -155.160|         2|   1|       386358070|
|{2013-01-01 04:00...|       -150.161|        -149.161|         1|   4|       386358070|
|{2013-01-01 04:00...|       -152.162|        -152.160|         1|   4|       386358070|
|{2013-01-01 04:00...|       -153.156|        -159.154|         1|   4|       386358070|
|{2013-01-01 04:00...|       -155.161|        -154.162|         1|   4|       386358070|
|{2013-01-01 04:00...|       -156.153|        -167.153|         1|   4|       386358070|
|{2013-01-01 04:00...

                                                                                

In [11]:
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# Assuming route_counts is the static dataset that contains necessary columns (pickup_datetime, dropoff_datetime, etc.)

# Define window specification for ranking by ride_count
window_spec = Window.orderBy(F.col("ride_count").desc())

# Define the current timestamp
current_time = F.current_timestamp()

# Filter the data for the last 30 minutes
filtered_data = route_counts.filter(
    (F.col("pickup_datetime") >= (current_time - F.expr("INTERVAL 30 MINUTE"))) &
    (F.col("pickup_datetime") <= current_time)
)

# Select relevant columns and compute the rank based on ride_count
top_10_routes = filtered_data.select(
    "pickup_datetime", 
    "dropoff_datetime", 
    "pickup_grid_500", 
    "dropoff_grid_500", 
    "ride_count"
).withColumn("rank", F.rank().over(window_spec)) \
    .filter(F.col("rank") <= 10)

# Add processing delay (current time - pickup time)
top_10_routes = top_10_routes.withColumn("processing_delay", 
    (current_time - F.col("pickup_datetime")).cast("long"))

# Add NULL values for missing routes if fewer than 10 routes are found
top_10_routes_with_nulls = top_10_routes.select(
    "pickup_datetime", 
    "dropoff_datetime", 
    F.coalesce(F.col("pickup_grid_500"), F.lit(None)).alias("start_cell_id"),
    F.coalesce(F.col("dropoff_grid_500"), F.lit(None)).alias("end_cell_id"),
    F.coalesce(F.col("processing_delay"), F.lit(None)).alias("delay")
)

# Handle the case of fewer than 10 routes by adding NULL rows for missing data
null_routes = spark.createDataFrame(
    [(None, None, None, None, None) for _ in range(10 - top_10_routes_with_nulls.count())],
    ["pickup_datetime", "dropoff_datetime", "start_cell_id", "end_cell_id", "delay"]
)

# Union the results with the NULL rows to ensure 10 rows
top_10_routes_with_nulls = top_10_routes_with_nulls.unionByName(null_routes)

# To make sure only the last 10 rows are returned (in case there are more than 10)
top_10_routes_with_nulls = top_10_routes_with_nulls.limit(10)

# Add the delay capture logic (this will be the time difference between reading input and producing output)
top_10_routes_with_nulls = top_10_routes_with_nulls.withColumn("output_produced_time", current_time)

# Show the result (or write to a file/database)
top_10_routes_with_nulls.show()

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `pickup_datetime` cannot be resolved. Did you mean one of the following? [`pickup_grid_500`, `ride_count`, `dropoff_grid_500`].;
'Filter (('pickup_datetime >= cast(current_timestamp() - INTERVAL '30' MINUTE as timestamp)) AND ('pickup_datetime <= current_timestamp()))
+- Aggregate [pickup_grid_500#115, dropoff_grid_500#134], [pickup_grid_500#115, dropoff_grid_500#134, count(1) AS ride_count#296L]
   +- Filter (pickup_datetime#2 >= cast(2013-12-31 23:59:55 - INTERVAL '30' MINUTE as timestamp))
      +- Filter (((isnotnull(pickup_grid_500#115) AND isnotnull(dropoff_grid_500#134)) AND isnotnull(pickup_grid_250#154)) AND isnotnull(dropoff_grid_250#175))
         +- Project [medallion#0, hack_license#1, pickup_datetime#2, dropoff_datetime#3, trip_time_in_secs#4, trip_distance#5, pickup_longitude#6, pickup_latitude#7, dropoff_longitude#8, dropoff_latitude#9, payment_type#10, fare_amount#11, surcharge#12, mta_tax#13, tip_amount#14, tolls_amount#15, pickup_grid_500#115, dropoff_grid_500#134, pickup_grid_250#154, <lambda>(dropoff_latitude#9, dropoff_longitude#8)#174 AS dropoff_grid_250#175]
            +- Project [medallion#0, hack_license#1, pickup_datetime#2, dropoff_datetime#3, trip_time_in_secs#4, trip_distance#5, pickup_longitude#6, pickup_latitude#7, dropoff_longitude#8, dropoff_latitude#9, payment_type#10, fare_amount#11, surcharge#12, mta_tax#13, tip_amount#14, tolls_amount#15, pickup_grid_500#115, dropoff_grid_500#134, <lambda>(pickup_latitude#7, pickup_longitude#6)#153 AS pickup_grid_250#154]
               +- Project [medallion#0, hack_license#1, pickup_datetime#2, dropoff_datetime#3, trip_time_in_secs#4, trip_distance#5, pickup_longitude#6, pickup_latitude#7, dropoff_longitude#8, dropoff_latitude#9, payment_type#10, fare_amount#11, surcharge#12, mta_tax#13, tip_amount#14, tolls_amount#15, pickup_grid_500#115, <lambda>(dropoff_latitude#9, dropoff_longitude#8)#133 AS dropoff_grid_500#134]
                  +- Project [medallion#0, hack_license#1, pickup_datetime#2, dropoff_datetime#3, trip_time_in_secs#4, trip_distance#5, pickup_longitude#6, pickup_latitude#7, dropoff_longitude#8, dropoff_latitude#9, payment_type#10, fare_amount#11, surcharge#12, mta_tax#13, tip_amount#14, tolls_amount#15, <lambda>(pickup_latitude#7, pickup_longitude#6)#114 AS pickup_grid_500#115]
                     +- Filter ((((((((((((isnotnull(pickup_longitude#6) AND isnotnull(pickup_latitude#7)) AND isnotnull(dropoff_longitude#8)) AND isnotnull(dropoff_latitude#9)) AND NOT (pickup_longitude#6 = cast(0 as double))) AND NOT (pickup_latitude#7 = cast(0 as double))) AND NOT (dropoff_longitude#8 = cast(0 as double))) AND NOT (dropoff_latitude#9 = cast(0 as double))) AND isnotnull(medallion#0)) AND isnotnull(hack_license#1)) AND (trip_time_in_secs#4 > 0)) AND (trip_distance#5 > cast(0 as double))) AND (fare_amount#11 > cast(0 as double)))
                        +- Relation [medallion#0,hack_license#1,pickup_datetime#2,dropoff_datetime#3,trip_time_in_secs#4,trip_distance#5,pickup_longitude#6,pickup_latitude#7,dropoff_longitude#8,dropoff_latitude#9,payment_type#10,fare_amount#11,surcharge#12,mta_tax#13,tip_amount#14,tolls_amount#15] csv


# Query 2

Part 1

In [None]:
from pyspark.sql import Window
from pyspark.sql.functions import col, median, count, when, rank

# Add profit column and 250m grid cells
profit_df = cleaned_df \
    .withColumn("profit", col("fare_amount") + col("tip_amount")) \
    .withColumn("pickup_grid_250", grid_cell_250_udf(col("pickup_latitude"), col("pickup_longitude"))) \
    .withColumn("dropoff_grid_250", grid_cell_250_udf(col("dropoff_latitude"), col("dropoff_longitude"))) \
    .filter(col("pickup_grid_250").isNotNull() & col("dropoff_grid_250").isNotNull())

# Step 1: Calculate median profit per pickup cell
median_profit = profit_df \
    .groupBy("pickup_grid_250") \
    .agg(median("profit").alias("median_profit"))

# Step 2: Estimate empty taxis (simplified for static data)
# For each medallion, count dropoffs; assume a taxi is "empty" if it has no subsequent pickup within 30 minutes
# In static data, we'll approximate by counting unique dropoffs per cell without time tracking
empty_taxis = profit_df \
    .groupBy("medallion", "dropoff_grid_250", "dropoff_datetime") \
    .agg(count("*").alias("trip_count")) \
    .groupBy("dropoff_grid_250") \
    .agg(count("medallion").alias("empty_taxis"))  # Simplified: counts unique medallions per dropoff cell

# Step 3: Compute profitability
profitability = median_profit \
    .join(empty_taxis, median_profit.pickup_grid_250 == empty_taxis.dropoff_grid_250, "left_outer") \
    .na.fill({"empty_taxis": 0}) \
    .withColumn("profitability",
                when(col("empty_taxis") > 0, col("median_profit") / col("empty_taxis"))
                .otherwise(col("median_profit"))) \
    .select(col("pickup_grid_250").alias("cell_id"), "empty_taxis", "median_profit", "profitability")

# Step 4: Rank and get top 10
window_spec_profit = Window.orderBy(col("profitability").desc())
top_10_profit_static = profitability \
    .withColumn("rank", rank().over(window_spec_profit)) \
    .filter(col("rank") <= 10) \
    .select("cell_id", "empty_taxis", "median_profit", "profitability", "rank")

# Show results
print("Query 2 Part 1 - Top 10 Profitable Areas (Static):")
top_10_profit_static.show(10)