In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, udf, unix_timestamp, lag, sum as spark_sum, avg, count
from pyspark.sql.window import Window
from shapely.geometry import Point, shape
import json

# Initialize Spark Session
spark = SparkSession.builder.appName("NYC Taxi Utilization").getOrCreate()

# Load Taxi Ride Dataset
taxi_df = spark.read.csv("input/Sample NYC Data.csv", header=True, inferSchema=True)

# Load GeoJSON Data
with open("input/nyc-boroughs.geojson") as f:
    geojson = json.load(f)

# Create Shapely Polygons from GeoJSON Data
boroughs = []
for feature in geojson['features']:
    polygon = shape(feature['geometry'])
    borough = feature['properties']['borough']
    boroughs.append((borough, polygon))

# Sort boroughs by size (descending)
boroughs = sorted(boroughs, key=lambda x: x[1].area, reverse=True)

# UDF to Map Coordinates to Borough Name
def find_borough(lon, lat):
    point = Point(lon, lat)
    for borough, polygon in boroughs:
        if polygon.contains(point):
            return borough
    return "Unknown"

find_borough_udf = udf(find_borough)

# Enrich Dataset with Borough Names
taxi_df = taxi_df.withColumn("pickup_borough", find_borough_udf(col("pickup_longitude"), col("pickup_latitude")))
taxi_df = taxi_df.withColumn("dropoff_borough", find_borough_udf(col("dropoff_longitude"), col("dropoff_latitude")))

# Compute Trip Duration
taxi_df = taxi_df.withColumn("pickup_ts", unix_timestamp("pickup_datetime", "dd-MM-yy HH:mm"))
taxi_df = taxi_df.withColumn("dropoff_ts", unix_timestamp("dropoff_datetime", "dd-MM-yy HH:mm"))
taxi_df = taxi_df.withColumn("duration", col("dropoff_ts") - col("pickup_ts"))

# Filter Outliers
taxi_df = taxi_df.filter((col("duration") > 0) & (col("duration") <= 4 * 3600))

# Window Specification for Idle Time Calculation
window_spec = Window.partitionBy("medallion").orderBy("pickup_ts")

# Lag Function to Get Previous Dropoff Time
taxi_df = taxi_df.withColumn("prev_dropoff_ts", lag("dropoff_ts").over(window_spec))

# Calculate Idle Time
taxi_df = taxi_df.withColumn("idle_time", (col("pickup_ts") - col("prev_dropoff_ts")).cast("long"))

# Filter Out Sessions Where Idle Time is Greater than 4 Hours
taxi_df = taxi_df.filter((col("idle_time") >= 0) & (col("idle_time") <= 4 * 3600) | col("idle_time").isNull())

# Group by Taxi and Compute Utilization
utilization_df = taxi_df.groupBy("medallion").agg(
    spark_sum("duration").alias("total_time"),
    spark_sum("idle_time").alias("total_idle_time")
)

utilization_df = utilization_df.withColumn("utilization", col("total_time") / (col("total_time") + col("total_idle_time")))

print("Utilization per Taxi:")
utilization_df.show()

Utilization per Taxi:
+--------------------+----------+---------------+-------------------+
|           medallion|total_time|total_idle_time|        utilization|
+--------------------+----------+---------------+-------------------+
|000318C2E3E638158...|     13800|          17400| 0.4423076923076923|
|002E3B405B6ABEA23...|      8400|          16140| 0.3422982885085575|
|0030AD2648D81EE87...|      1980|            720| 0.7333333333333333|
|0036961468659D0BF...|     10980|          19740|        0.357421875|
|0038EF45118925A51...|     10620|          15120| 0.4125874125874126|
|0053334C798EC6C8E...|      7920|          22440| 0.2608695652173913|
|005DED7D6E6C45441...|     11460|          11760| 0.4935400516795866|
|005F00B38F46E2100...|     18600|          42180| 0.3060217176702863|
|00790C7BAD30B7A9E...|     11580|          25320|0.31382113821138213|
|0094A03FFE6BAFBE0...|     10200|           5400| 0.6538461538461539|
|009D3CCA83486B03F...|     19620|          40920| 0.3240832507433102

In [2]:
# Calculate Average Time to Find Next Fare per Destination Borough
avg_fare_time_df = taxi_df.filter(col("idle_time").isNotNull()).groupBy("dropoff_borough").agg(avg("idle_time").alias("avg_fare_time"))

avg_fare_time_df.show()

+---------------+------------------+
|dropoff_borough|     avg_fare_time|
+---------------+------------------+
|         Queens|2040.1597869507323|
|        Unknown|1594.4157303370787|
|       Brooklyn| 1910.373382624769|
|  Staten Island|            1050.0|
|      Manhattan|1116.6821560192775|
|          Bronx|2201.6326530612246|
+---------------+------------------+



In [3]:
# Count Trips That Started and Ended Within the Same Borough
same_borough_trips_df = taxi_df.filter(col("pickup_borough") == col("dropoff_borough"))
trip_counts_df = same_borough_trips_df.groupBy("pickup_borough").agg(count("medallion").alias("same_borough_trip_count"))

trip_counts_df.show()

+--------------+-----------------------+
|pickup_borough|same_borough_trip_count|
+--------------+-----------------------+
|        Queens|                   1236|
|       Unknown|                   1462|
|      Brooklyn|                    971|
|     Manhattan|                  81847|
|         Bronx|                     44|
| Staten Island|                      1|
+--------------+-----------------------+



In [6]:
# Count Trips That Started in One Borough and Ended in Another
different_borough_trips_df = taxi_df.filter(col("pickup_borough") != col("dropoff_borough"))
different_trip_counts_df = different_borough_trips_df.groupBy("pickup_borough").agg(count("medallion").alias("different_borough_trip_count"))

different_trip_counts_df.show()

+--------------+----------------------------+
|pickup_borough|different_borough_trip_count|
+--------------+----------------------------+
|        Queens|                        4019|
|       Unknown|                         122|
|      Brooklyn|                         791|
| Staten Island|                           1|
|     Manhattan|                        6334|
|         Bronx|                          27|
+--------------+----------------------------+

