In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, unix_timestamp, udf, lag, sum as spark_sum
from pyspark.sql.window import Window
from pyspark.sql.types import StringType, DoubleType
from shapely.geometry import Point, shape
import json

spark = (SparkSession.builder
                    .appName('NewYorkCityTaxiData')
                    .getOrCreate()
        )

In [2]:
spark

In [15]:
nyc_taxi_data = spark.read.option("header", "true").csv("input/Sample NYC Data.csv")

nyc_taxi_data.show()

+--------------------+--------------------+---------+---------+------------------+---------------+----------------+---------------+----------------+---------------+-----------------+----------------+
|           medallion|        hack_license|vendor_id|rate_code|store_and_fwd_flag|pickup_datetime|dropoff_datetime|passenger_count|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|
+--------------------+--------------------+---------+---------+------------------+---------------+----------------+---------------+----------------+---------------+-----------------+----------------+
|89D227B655E5C82AE...|BA96DE419E711691B...|      CMT|        1|                 N| 01-01-13 15:11|  01-01-13 15:18|              4|      -73.978165|      40.757977|       -73.989838|       40.751171|
|0BD7C8F5BA12B88E0...|9FD8F69F0804BDB55...|      CMT|        1|                 N| 06-01-13 00:18|  06-01-13 00:22|              1|      -74.006683|      40.731781|       -73.994499|        40.75066|


In [5]:
nyc_taxi_data = nyc_taxi_data.withColumn("pickup_longitude", col("pickup_longitude").cast(DoubleType()))
nyc_taxi_data = nyc_taxi_data.withColumn("pickup_latitude", col("pickup_latitude").cast(DoubleType()))
nyc_taxi_data = nyc_taxi_data.withColumn("dropoff_longitude", col("dropoff_longitude").cast(DoubleType()))
nyc_taxi_data = nyc_taxi_data.withColumn("dropoff_latitude", col("dropoff_latitude").cast(DoubleType()))


In [24]:
# Load Borough GeoJSON Data
with open("input/nyc-boroughs.geojson") as f:
    geojson_data = json.load(f)
    # for feature in geojson_data["features"]:
    #     print(feature["properties"]["borough"])

# Broadcast GeoJSON Data
geo_broadcast = spark.sparkContext.broadcast(geojson_data)

In [6]:
def get_borough(lon, lat):
    if lon is None or lat is None:
        return None
    point = Point(lon, lat)
    for feature in geo_broadcast.value["features"]:
        polygon = shape(feature["geometry"])
        if polygon.contains(point):
            return feature["properties"]["borough"]
    return "Unknown"

borough_udf = udf(get_borough, StringType())


In [7]:
nyc_taxi_data = nyc_taxi_data.withColumn("pickup_borough", borough_udf("pickup_longitude", "pickup_latitude"))
nyc_taxi_data = nyc_taxi_data.withColumn("dropoff_borough", borough_udf("dropoff_longitude", "dropoff_latitude"))


In [8]:
# Convert Datetime Columns
nyc_taxi_data = nyc_taxi_data.withColumn("pickup_ts", unix_timestamp(col("pickup_datetime"), "dd-MM-yy HH:mm"))
nyc_taxi_data = nyc_taxi_data.withColumn("dropoff_ts", unix_timestamp(col("dropoff_datetime"), "dd-MM-yy HH:mm"))

# Calculate Trip Duration
nyc_taxi_data = nyc_taxi_data.withColumn("duration", col("dropoff_ts") - col("pickup_ts"))

# Filter Negative & Unrealistic Trips (more than 4 hours)
nyc_taxi_data = nyc_taxi_data.filter((col("duration") > 0) & (col("duration") <= 4 * 3600))


In [None]:
# Window to Partition by Taxi ID and Order by Pickup Time
window_spec = Window.partitionBy("medallion").orderBy("pickup_ts")

# Previous Dropoff Time
nyc_taxi_data = nyc_taxi_data.withColumn("previous_dropoff", lag("dropoff_ts").over(window_spec))

# Idle Time Calculation
nyc_taxi_data = nyc_taxi_data.withColumn("idle_time", col("pickup_ts") - col("previous_dropoff"))

# Filter Outliers (Idle Time <= 4 hours)
nyc_taxi_data = nyc_taxi_data.filter((col("idle_time") > 0) & (col("idle_time") <= 4 * 3600))

# Aggregate Utilization
utilization_df = nyc_taxi_data.groupBy("medallion").agg(
    spark_sum("duration").alias("total_trip_time"),
    spark_sum("idle_time").alias("total_idle_time")
)

# Utilization Formula
utilization_df = utilization_df.withColumn("utilization", col("total_trip_time") / (col("total_trip_time") + col("total_idle_time")))

print("Utilization per Taxi:")
utilization_df.show(truncate=False)

Utilization per Taxi:


In [None]:
# Search Time Calculation
nyc_taxi_data = nyc_taxi_data.withColumn("search_time", col("pickup_ts") - col("previous_dropoff"))

# Average Search Time per Borough
avg_search_df = nyc_taxi_data.groupBy("dropoff_borough").agg({"search_time": "avg"}) \
    .withColumnRenamed("avg(search_time)", "avg_search_time")

print("Average Search Time per Borough:")
avg_search_df.show(truncate=False).take(10)

In [None]:
# Same Borough Trips
same_borough_df = nyc_taxi_data.filter(col("pickup_borough") == col("dropoff_borough")) \
    .groupBy("pickup_borough").count() \
    .withColumnRenamed("count", "same_borough_trips")

print("Same Borough Trips:")
same_borough_df.show(truncate=False)



In [None]:
# Different Borough Trips
diff_borough_df = nyc_taxi_data.filter(col("pickup_borough") != col("dropoff_borough")) \
    .groupBy("pickup_borough", "dropoff_borough").count() \
    .withColumnRenamed("count", "different_borough_trips")

print("Different Borough Trips:")
diff_borough_df.show(truncate=False)