# Big Data Management Project 2:
## DESB GRAND CHALLENGE 2015

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import unix_timestamp, regexp_extract, col, count, udf
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType, DoubleType, FloatType
from pyspark.sql.window import Window

import math
import time

In [2]:
spark = SparkSession.builder \
    .appName('BDM_Project2') \
    .getOrCreate()

### Query 0
Data Cleansing and Setup

In [3]:
start_time = time.time()  

# Defining the schema for faster reading of data
schema = StructType([
    StructField("medallion", StringType(), True),
    StructField("hack_license", StringType(), True),
    StructField("pickup_datetime", TimestampType(), True),
    StructField("dropoff_datetime", TimestampType(), True),
    StructField("trip_time_in_secs", IntegerType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("pickup_longitude", DoubleType(), True),
    StructField("pickup_latitude", DoubleType(), True),
    StructField("dropoff_longitude", DoubleType(), True),
    StructField("dropoff_latitude", DoubleType(), True),
    StructField("payment_type", StringType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("surcharge", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True)
])

# Creating a single dataframe of all the trip_data files
taxi_df_og = (
    spark.readStream
    .option("maxFilesPerTrigger", 1) 
    .option("header", False)
    .schema(schema)
    .csv("input/")
)

# Removing the trips with 0 passengers
# Transforming the data 
taxi_df = taxi_df_og.filter(
    (regexp_extract(col("medallion"), r"^[a-fA-F0-9]{32}$", 0) != "") &
    (regexp_extract(col("hack_license"), r"^[a-fA-F0-9]{32}$", 0) != "") &
    (col("pickup_datetime").isNotNull()) &
    (col("dropoff_datetime").isNotNull()) &               
    (col("trip_distance") > 0) &                    
    (col("fare_amount") > 0) &
    (col("tip_amount") >= 0)
)

# Convert timestamps to Unix format 
taxi_df = taxi_df.withColumn("pickup_ts", unix_timestamp("pickup_datetime")) \
    .withColumn("dropoff_ts", unix_timestamp("dropoff_datetime")) \
    .withColumn("duration", col("dropoff_ts") - col("pickup_ts")) \
    .select("*") \
    .dropna()  # Drop remaining null values

# Start the streaming query with trigger(once=True) to process data once and stop
query = (
    taxi_df.writeStream
    .outputMode("append")
    .format("parquet")
    .option("path", "output/preprocessed_data")
    .option("checkpointLocation", "output/checkpoint")
    .trigger(once=True)  
    .start()
)

query.awaitTermination()

print("Execution time", time.time() - start_time)

Execution time 2.9444541931152344


In [4]:
output_df = spark.read.parquet("output/preprocessed_data")
output_df.show(5) 

+--------------------+--------------------+-------------------+-------------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+----------+----------+--------+
|           medallion|        hack_license|    pickup_datetime|   dropoff_datetime|trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount| pickup_ts|dropoff_ts|duration|
+--------------------+--------------------+-------------------+-------------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+----------+----------+--------+
|64187531006B3D8B3...|871C25EC0B8BF4BC8...|2013-08-01 17:47:36|2013-08-01 19:05:40|             4683|         18.0|      -74.005295|      40.750935|      

In [5]:
sample_df = output_df.limit(2800000) # around 1gb of data

In [13]:
# Impact of transformations
original_count = output_df.count()
filtered_count = sample_df.count()
filtered_out_count = original_count - filtered_count

print(f"Original count: {original_count}") 
print(f"Filtered count: {filtered_count}")
print(f"Rows filtered out: {filtered_out_count}")

Original count: 90288423
Filtered count: 2800000
Rows filtered out: 87488423


### Grid Cells for Query 1

In [6]:
start_lat = 41.474937
start_long = -74.913585
cell_size = 0.044 # 500m to degrees for latitude (and longitude)

def grid_cells_q1(point_long, point_lat):
    
    long = math.floor((point_long - start_long) / cell_size) + 1
    lat = math.floor((start_lat - point_lat) / cell_size) + 1
    
    # Ensure the cell is within valid grid bounds (300x300)
    if not (1 <= long <= 300 and 1 <= lat <= 300):
        return None 
    
    return float(f"{long}.{lat}") # Convert to X.X format

In [7]:
get_grid = udf(grid_cells_q1, FloatType())

taxi_df_q1 = sample_df.withColumn("start_cell", get_grid(sample_df.pickup_longitude, sample_df.pickup_latitude))\
    .withColumn("end_cell", get_grid(sample_df.dropoff_longitude, sample_df.dropoff_latitude))\
    .filter(
        col("start_cell").isNotNull() & col("end_cell").isNotNull()  # Filter out trips outside of the grid
    )

taxi_df_q1.show(5, truncate=False)

+--------------------------------+--------------------------------+-------------------+-------------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+----------+----------+--------+----------+--------+
|medallion                       |hack_license                    |pickup_datetime    |dropoff_datetime   |trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|pickup_ts |dropoff_ts|duration|start_cell|end_cell|
+--------------------------------+--------------------------------+-------------------+-------------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+----------+----------+--------+----------+--------+
|64187531006B3D8B3831D

### Query 1
Frequent Routes

### Grid Cells for Query 2

In [6]:
start_lat = 41.474937
start_long = -74.913585
cell_size = 0.022 # 250m to degrees for latitude (and longitude)

def grid_cells_q2(point_long, point_lat):
    
    long = math.floor((point_long - start_long) / cell_size) + 1
    lat = math.floor((start_lat - point_lat) / cell_size) + 1
    
    # Ensure the cell is within valid grid bounds (600x600)
    if not (1 <= long <= 600 and 1 <= lat <= 600):
        return None 
    
    return float(f"{long}.{lat}") # Convert to X.X format

In [7]:
get_grid2 = udf(grid_cells_q2, FloatType())

taxi_df_q2 = sample_df.withColumn("start_cell", get_grid2(sample_df.pickup_longitude, sample_df.pickup_latitude))\
    .withColumn("end_cell", get_grid2(sample_df.dropoff_longitude, sample_df.dropoff_latitude))\
    .filter(
        col("start_cell").isNotNull() & col("end_cell").isNotNull()  # Filter out trips outside of the grid
    )

taxi_df_q2.show(5, truncate=False)

+--------------------------------+--------------------------------+-------------------+-------------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+----------+----------+--------+----------+--------+
|medallion                       |hack_license                    |pickup_datetime    |dropoff_datetime   |trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|payment_type|fare_amount|surcharge|mta_tax|tip_amount|tolls_amount|pickup_ts |dropoff_ts|duration|start_cell|end_cell|
+--------------------------------+--------------------------------+-------------------+-------------------+-----------------+-------------+----------------+---------------+-----------------+----------------+------------+-----------+---------+-------+----------+------------+----------+----------+--------+----------+--------+
|64187531006B3D8B3831D

### Query 2
Profitable Areas

In [10]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, unix_timestamp, window, count, sum, expr, lit, udf
from pyspark.sql.types import StringType, DoubleType, IntegerType, StructType, StructField

# Initialize Spark Session
spark = SparkSession.builder.appName("DEBS_Taxi_Batch").getOrCreate()

# Define Schema for the CSV
schema = StructType([
    StructField("medallion", StringType(), True),
    StructField("hack_license", StringType(), True),
    StructField("pickup_datetime", TimestampType(), True),
    StructField("dropoff_datetime", TimestampType(), True),
    StructField("trip_time_in_secs", IntegerType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("pickup_longitude", DoubleType(), True),
    StructField("pickup_latitude", DoubleType(), True),
    StructField("dropoff_longitude", DoubleType(), True),
    StructField("dropoff_latitude", DoubleType(), True),
    StructField("payment_type", StringType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("surcharge", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True)
])

# Read the CSV file
taxi_df = spark.read.csv("input/sorted_data_sample.csv", header=True, schema=schema)


taxi_df = taxi_df.filter(
    (regexp_extract(col("medallion"), r"^[a-fA-F0-9]{32}$", 0) != "") &
    (regexp_extract(col("hack_license"), r"^[a-fA-F0-9]{32}$", 0) != "") &
    (col("pickup_datetime").isNotNull()) &
    (col("dropoff_datetime").isNotNull()) &               
    (col("trip_distance") > 0) &                    
    (col("fare_amount") > 0) &
    (col("tip_amount") >= 0)
)

# Convert timestamps to Unix format 
taxi_df = taxi_df.withColumn("pickup_ts", unix_timestamp("pickup_datetime")) \
    .withColumn("dropoff_ts", unix_timestamp("dropoff_datetime")) \
    .withColumn("duration", col("dropoff_ts") - col("pickup_ts")) \
    .select("*") \
    .dropna()  # Drop remaining null values

# ------------------------------------ QUERY 0: Data Cleansing ------------------------------------ #
taxi_df = taxi_df.filter((col("pickup_latitude").between(40.5, 41.9)) &
                         (col("pickup_longitude").between(-74.3, -73.7)) &
                         (col("dropoff_latitude").between(40.5, 41.9)) &
                         (col("dropoff_longitude").between(-74.3, -73.7)) &
                         (col("fare_amount") > 0))

# ------------------------------------ QUERY 1: Frequent Routes ------------------------------------ #
route_counts = taxi_df.groupBy(
    window(col("pickup_datetime"), "30 minutes"),
    "pickup_latitude", "pickup_longitude", "dropoff_latitude", "dropoff_longitude"
).agg(count("*").alias("trip_count"))


# Get Top 10 Routes
top_routes = route_counts.orderBy(col("trip_count").desc()).limit(10)

# Show Query 1 results
top_routes.show()

+--------------------+---------------+----------------+----------------+-----------------+----------+
|              window|pickup_latitude|pickup_longitude|dropoff_latitude|dropoff_longitude|trip_count|
+--------------------+---------------+----------------+----------------+-----------------+----------+
|{2013-01-03 09:30...|      40.752472|      -73.938568|       40.752472|       -73.938568|         5|
|{2013-01-01 13:00...|      40.744915|      -73.949043|       40.744915|       -73.949043|         5|
|{2013-01-08 19:30...|      40.684944|       -73.98587|       40.684944|        -73.98587|         5|
|{2013-01-10 19:00...|      40.752098|      -73.982376|       40.752098|       -73.982376|         5|
|{2013-01-03 11:30...|      40.769669|       -73.95285|       40.769669|        -73.95285|         4|
|{2013-01-01 18:00...|      40.755325|       -73.99646|       40.755325|        -73.99646|         4|
|{2013-01-09 08:30...|      40.744915|      -73.949043|       40.744915|       -73

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, unix_timestamp, window, count, expr
from pyspark.sql.types import StringType, DoubleType, IntegerType, StructType, StructField, TimestampType

# Initialize Spark Session
spark = SparkSession.builder.appName("DEBS_Taxi_Stream").getOrCreate()

# Define Schema for the CSV
schema = StructType([
    StructField("medallion", StringType(), True),
    StructField("hack_license", StringType(), True),
    StructField("pickup_datetime", TimestampType(), True),
    StructField("dropoff_datetime", TimestampType(), True),
    StructField("trip_time_in_secs", IntegerType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("pickup_longitude", DoubleType(), True),
    StructField("pickup_latitude", DoubleType(), True),
    StructField("dropoff_longitude", DoubleType(), True),
    StructField("dropoff_latitude", DoubleType(), True),
    StructField("payment_type", StringType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("surcharge", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True)
])

# Read from CSV directory as a stream (replace "input/directory" with your path)
taxi_df_stream = spark.readStream.option("header", "true").schema(schema).csv("input/")

# Cleanse the data
taxi_df_stream_cleaned = taxi_df_stream.filter(
    (expr("regexp_extract(medallion, '^[a-fA-F0-9]{32}$', 0) != ''")) &
    (expr("regexp_extract(hack_license, '^[a-fA-F0-9]{32}$', 0) != ''")) &
    (col("pickup_datetime").isNotNull()) &
    (col("dropoff_datetime").isNotNull()) &               
    (col("trip_distance") > 0) &                    
    (col("fare_amount") > 0) &
    (col("tip_amount") >= 0)
)

# Convert timestamps to Unix format 
taxi_df_stream_cleaned = taxi_df_stream_cleaned.withColumn("pickup_ts", unix_timestamp("pickup_datetime")) \
    .withColumn("dropoff_ts", unix_timestamp("dropoff_datetime")) \
    .withColumn("duration", col("dropoff_ts") - col("pickup_ts")) \
    .dropna()  # Drop remaining null values

# ------------------------------------ QUERY 0: Data Cleansing ------------------------------------ #
taxi_df_stream_cleaned = taxi_df_stream_cleaned.filter(
    (col("pickup_latitude").between(40.5, 41.9)) &
    (col("pickup_longitude").between(-74.3, -73.7)) &
    (col("dropoff_latitude").between(40.5, 41.9)) &
    (col("dropoff_longitude").between(-74.3, -73.7)) &
    (col("fare_amount") > 0)
)

# ------------------------------------ QUERY 1: Frequent Routes ------------------------------------ #
# Define a watermark and perform aggregation over the window
taxi_df_stream_cleaned_with_watermark = taxi_df_stream_cleaned.withWatermark("pickup_datetime", "1 hour")

# Group by a 30-minute window and calculate trip counts
route_counts_stream = taxi_df_stream_cleaned_with_watermark.groupBy(
    window(col("pickup_datetime"), "30 minutes"),
    "pickup_latitude", "pickup_longitude", "dropoff_latitude", "dropoff_longitude"
).agg(count("*").alias("trip_count"))

# Aggregate by the window and the route
route_counts_stream = route_counts_stream.groupBy("pickup_latitude", "pickup_longitude").count()

# Now, you can sort since it's an aggregated stream
top_routes_stream = route_counts_stream.orderBy(col("count").desc()).limit(10)

# Set the configuration to disable the correctness check
spark.conf.set("spark.sql.streaming.statefulOperator.checkCorrectness.enabled", "false")

# Check the schema of the cleaned data (just to ensure it's being processed correctly)
taxi_df_stream_cleaned.printSchema()

# Test if data is available in the stream by showing a small sample
# taxi_df_stream_cleaned.show(10)  # Uncomment to show some rows from the cleaned stream

# Set the output mode to "append" for real-time updates and trigger processing every 10 seconds
query = top_routes_stream.writeStream \
    .outputMode("append") \
    .format("console") \
    .trigger(processingTime='10 seconds') \
    .start()

# Wait for the termination of the query
query.awaitTermination()


root
 |-- medallion: string (nullable = true)
 |-- hack_license: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- trip_time_in_secs: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- surcharge: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- pickup_ts: long (nullable = true)
 |-- dropoff_ts: long (nullable = true)
 |-- duration: long (nullable = true)



AnalysisException: Append output mode not supported when there are streaming aggregations on streaming DataFrames/DataSets without watermark;
GlobalLimit 10
+- LocalLimit 10
   +- Sort [count#145L DESC NULLS LAST], true
      +- Aggregate [pickup_latitude#7, pickup_longitude#6], [pickup_latitude#7, pickup_longitude#6, count(1) AS count#145L]
         +- Aggregate [window#131-T3600000ms, pickup_latitude#7, pickup_longitude#6, dropoff_latitude#9, dropoff_longitude#8], [window#131-T3600000ms AS window#109-T3600000ms, pickup_latitude#7, pickup_longitude#6, dropoff_latitude#9, dropoff_longitude#8, count(1) AS trip_count#130L]
            +- Project [named_struct(start, knownnullable(precisetimestampconversion(((precisetimestampconversion(pickup_datetime#2-T3600000ms, TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(pickup_datetime#2-T3600000ms, TimestampType, LongType) - 0) % 1800000000) < cast(0 as bigint)) THEN (((precisetimestampconversion(pickup_datetime#2-T3600000ms, TimestampType, LongType) - 0) % 1800000000) + 1800000000) ELSE ((precisetimestampconversion(pickup_datetime#2-T3600000ms, TimestampType, LongType) - 0) % 1800000000) END) - 0), LongType, TimestampType)), end, knownnullable(precisetimestampconversion((((precisetimestampconversion(pickup_datetime#2-T3600000ms, TimestampType, LongType) - CASE WHEN (((precisetimestampconversion(pickup_datetime#2-T3600000ms, TimestampType, LongType) - 0) % 1800000000) < cast(0 as bigint)) THEN (((precisetimestampconversion(pickup_datetime#2-T3600000ms, TimestampType, LongType) - 0) % 1800000000) + 1800000000) ELSE ((precisetimestampconversion(pickup_datetime#2-T3600000ms, TimestampType, LongType) - 0) % 1800000000) END) - 0) + 1800000000), LongType, TimestampType))) AS window#131-T3600000ms, medallion#0, hack_license#1, pickup_datetime#2-T3600000ms, dropoff_datetime#3, trip_time_in_secs#4, trip_distance#5, pickup_longitude#6, pickup_latitude#7, dropoff_longitude#8, dropoff_latitude#9, payment_type#10, fare_amount#11, surcharge#12, mta_tax#13, tip_amount#14, tolls_amount#15, pickup_ts#33L, dropoff_ts#51L, duration#70L]
               +- Filter isnotnull(pickup_datetime#2-T3600000ms)
                  +- EventTimeWatermark pickup_datetime#2: timestamp, 1 hours
                     +- Filter ((((((pickup_latitude#7 >= 40.5) AND (pickup_latitude#7 <= 41.9)) AND ((pickup_longitude#6 >= -74.3) AND (pickup_longitude#6 <= -73.7))) AND ((dropoff_latitude#9 >= 40.5) AND (dropoff_latitude#9 <= 41.9))) AND ((dropoff_longitude#8 >= -74.3) AND (dropoff_longitude#8 <= -73.7))) AND (fare_amount#11 > cast(0 as double)))
                        +- Filter atleastnnonnulls(19, medallion#0, hack_license#1, pickup_datetime#2, dropoff_datetime#3, trip_time_in_secs#4, trip_distance#5, pickup_longitude#6, pickup_latitude#7, dropoff_longitude#8, dropoff_latitude#9, payment_type#10, fare_amount#11, surcharge#12, mta_tax#13, tip_amount#14, tolls_amount#15, pickup_ts#33L, dropoff_ts#51L, duration#70L)
                           +- Project [medallion#0, hack_license#1, pickup_datetime#2, dropoff_datetime#3, trip_time_in_secs#4, trip_distance#5, pickup_longitude#6, pickup_latitude#7, dropoff_longitude#8, dropoff_latitude#9, payment_type#10, fare_amount#11, surcharge#12, mta_tax#13, tip_amount#14, tolls_amount#15, pickup_ts#33L, dropoff_ts#51L, (dropoff_ts#51L - pickup_ts#33L) AS duration#70L]
                              +- Project [medallion#0, hack_license#1, pickup_datetime#2, dropoff_datetime#3, trip_time_in_secs#4, trip_distance#5, pickup_longitude#6, pickup_latitude#7, dropoff_longitude#8, dropoff_latitude#9, payment_type#10, fare_amount#11, surcharge#12, mta_tax#13, tip_amount#14, tolls_amount#15, pickup_ts#33L, unix_timestamp(dropoff_datetime#3, yyyy-MM-dd HH:mm:ss, Some(Etc/UTC), false) AS dropoff_ts#51L]
                                 +- Project [medallion#0, hack_license#1, pickup_datetime#2, dropoff_datetime#3, trip_time_in_secs#4, trip_distance#5, pickup_longitude#6, pickup_latitude#7, dropoff_longitude#8, dropoff_latitude#9, payment_type#10, fare_amount#11, surcharge#12, mta_tax#13, tip_amount#14, tolls_amount#15, unix_timestamp(pickup_datetime#2, yyyy-MM-dd HH:mm:ss, Some(Etc/UTC), false) AS pickup_ts#33L]
                                    +- Filter ((((((NOT (regexp_extract(medallion#0, ^[a-fA-F0-9]{32}$, 0) = ) AND NOT (regexp_extract(hack_license#1, ^[a-fA-F0-9]{32}$, 0) = )) AND isnotnull(pickup_datetime#2)) AND isnotnull(dropoff_datetime#3)) AND (trip_distance#5 > cast(0 as double))) AND (fare_amount#11 > cast(0 as double))) AND (tip_amount#14 >= cast(0 as double)))
                                       +- StreamingRelation DataSource(org.apache.spark.sql.SparkSession@d663220,csv,List(),Some(StructType(StructField(medallion,StringType,true),StructField(hack_license,StringType,true),StructField(pickup_datetime,TimestampType,true),StructField(dropoff_datetime,TimestampType,true),StructField(trip_time_in_secs,IntegerType,true),StructField(trip_distance,DoubleType,true),StructField(pickup_longitude,DoubleType,true),StructField(pickup_latitude,DoubleType,true),StructField(dropoff_longitude,DoubleType,true),StructField(dropoff_latitude,DoubleType,true),StructField(payment_type,StringType,true),StructField(fare_amount,DoubleType,true),StructField(surcharge,DoubleType,true),StructField(mta_tax,DoubleType,true),StructField(tip_amount,DoubleType,true),StructField(tolls_amount,DoubleType,true))),List(),None,Map(header -> true, path -> input/),None), FileSource[input/], [medallion#0, hack_license#1, pickup_datetime#2, dropoff_datetime#3, trip_time_in_secs#4, trip_distance#5, pickup_longitude#6, pickup_latitude#7, dropoff_longitude#8, dropoff_latitude#9, payment_type#10, fare_amount#11, surcharge#12, mta_tax#13, tip_amount#14, tolls_amount#15]


In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import window

# Initialize Spark session
spark = SparkSession.builder \
    .appName("TaxiTripsStream") \
    .getOrCreate()

# Define the path to the input data (replace with your actual file path)
input_path = "input/"  # Modify this to point to your actual file or directory

# Read the streaming data (assuming the data is in CSV format)
taxi_df_stream = spark.readStream \
    .option("header", "true") \
    .schema("medallion STRING, hack_license STRING, pickup_datetime TIMESTAMP, dropoff_datetime TIMESTAMP, "
            "trip_time_in_secs INT, trip_distance DOUBLE, pickup_longitude DOUBLE, pickup_latitude DOUBLE, "
            "dropoff_longitude DOUBLE, dropoff_latitude DOUBLE, payment_type STRING, fare_amount DOUBLE, "
            "surcharge DOUBLE, mta_tax DOUBLE, tip_amount DOUBLE, tolls_amount DOUBLE, pickup_ts LONG, "
            "dropoff_ts LONG, duration LONG") \
    .csv(input_path)

# Clean and filter the streaming DataFrame as needed
taxi_df_stream_cleaned = taxi_df_stream.filter(
    "pickup_datetime IS NOT NULL AND dropoff_datetime IS NOT NULL AND trip_distance > 0 AND fare_amount > 0 AND tip_amount >= 0"
)

# Apply watermark on the 'pickup_datetime' column with a 1-hour delay
taxi_df_stream_cleaned_with_watermark = taxi_df_stream_cleaned \
    .withWatermark("pickup_datetime", "1 hour")

# Perform aggregation with windowing to group trips by pickup location and 1-hour time window
top_routes_stream = taxi_df_stream_cleaned_with_watermark \
    .groupBy(
        window("pickup_datetime", "1 hour"),
        "pickup_latitude",
        "pickup_longitude"
    ) \
    .count()

# Output the result to the console (for demonstration purposes)
query = top_routes_stream.writeStream \
    .outputMode("append") \
    .format("console") \
    .trigger(processingTime='10 seconds') \
    .start()

# Wait for the termination of the query
query.awaitTermination()


ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 718, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


In [11]:
# ------------------------------------ QUERY 2: Profitable Areas ------------------------------------ #
# UDF to compute grid cell
def compute_grid_cell(latitude, longitude, lat_start=41.474937, lon_start=-74.913585, cell_size=0.002247):
    if latitude is None or longitude is None:
        return None
    cell_x = 1 + int((longitude - lon_start) / 0.00294)
    cell_y = 1 + int((latitude - lat_start) / 0.002247)
    return f"{cell_x}.{cell_y}"

grid_udf = udf(compute_grid_cell, StringType())

# Assign grid cells
taxi_df = taxi_df.withColumn("pickup_cell", grid_udf(col("pickup_latitude"), col("pickup_longitude"))) \
                 .withColumn("dropoff_cell", grid_udf(col("dropoff_latitude"), col("dropoff_longitude")))


# Compute Median Profit Per Cell
profit_df = taxi_df.groupBy("pickup_cell").agg(expr("percentile_approx(fare_amount + tip_amount, 0.5)").alias("median_profit"))

# Count Empty Taxis Per Cell
empty_taxis_df = taxi_df.groupBy("dropoff_cell").agg(count("*").alias("empty_taxis"))

# Compute Profitability and Get Top 10 Profitable Cells
profitability_df = profit_df.join(empty_taxis_df, profit_df.pickup_cell == empty_taxis_df.dropoff_cell, "left_outer") \
    .withColumn("profitability", col("median_profit") / (col("empty_taxis") + lit(1)))

top_profitable_cells = profitability_df.orderBy(col("profitability").desc()).limit(10)

# Show Query 2 results
top_profitable_cells.selectExpr(
    "pickup_cell as profitable_cell_id",
    "empty_taxis as empty_taxis_in_cell_id",
    "median_profit as median_profit_in_cell_id",
    "profitability as profitability_of_cell"
).show()


+------------------+----------------------+------------------------+---------------------+
|profitable_cell_id|empty_taxis_in_cell_id|median_profit_in_cell_id|profitability_of_cell|
+------------------+----------------------+------------------------+---------------------+
|           274.-57|                     1|                   200.0|                100.0|
|          290.-286|                     1|                   170.0|                 85.0|
|          241.-294|                     1|                   163.2|                 81.6|
|          400.-229|                     1|                   154.0|                 77.0|
|          272.-328|                     1|                   152.5|                76.25|
|          352.-372|                     1|                   145.0|                 72.5|
|          410.-158|                     1|                   140.0|                 70.0|
|          356.-140|                     1|                   138.0|                 69.0|