In [44]:
from pyspark.sql import SparkSession

# Initialize Spark session with Delta Lake support
spark = SparkSession.builder \
    .appName("NYC Taxi Data Analysis") \
    .config("spark.jars.packages", "io.delta:delta-core_2.12:2.4.0") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .getOrCreate()

In [45]:
import geopandas as gpd
from shapely.geometry import shape
import json
import matplotlib.pyplot as plt

from pyspark.sql.functions import udf
from pyspark.sql.types import StringType
from shapely.geometry import Point

from pyspark.sql.functions import unix_timestamp

from pyspark.sql.window import Window
from pyspark.sql.functions import lag, lead, col, avg, when, desc, sum as spark_sum

### Load the Data

In [46]:
# Trip data
# trip_data_df = spark.read.csv("trip_data/*.csv", header=True, inferSchema=True)
all_trip_data_df = spark.read.csv("sample.csv", header=True, inferSchema=True)

In [47]:
print(f"Total Number of Rows: {all_trip_data_df.count()}")

Total Number of Rows: 99999


In [48]:
all_trip_data_df.columns

['medallion',
 'hack_license',
 'vendor_id',
 'rate_code',
 'store_and_fwd_flag',
 'pickup_datetime',
 'dropoff_datetime',
 'passenger_count',
 'trip_time_in_secs',
 'trip_distance',
 'pickup_longitude',
 'pickup_latitude',
 'dropoff_longitude',
 'dropoff_latitude']

In [49]:
# select only the columns of interest
trip_data_df = all_trip_data_df.select(
    all_trip_data_df["medallion"],
    all_trip_data_df["hack_license"],
    all_trip_data_df["pickup_longitude"],
    all_trip_data_df["pickup_latitude"],
    all_trip_data_df["pickup_datetime"],
    all_trip_data_df["dropoff_longitude"],
    all_trip_data_df["dropoff_latitude"],
    all_trip_data_df["dropoff_datetime"]
)

In [50]:
trip_data_df.columns

['medallion',
 'hack_license',
 'pickup_longitude',
 'pickup_latitude',
 'pickup_datetime',
 'dropoff_longitude',
 'dropoff_latitude',
 'dropoff_datetime']

In [51]:
trip_data_df.show(5)

+--------------------+--------------------+----------------+---------------+-------------------+-----------------+----------------+-------------------+
|           medallion|        hack_license|pickup_longitude|pickup_latitude|    pickup_datetime|dropoff_longitude|dropoff_latitude|   dropoff_datetime|
+--------------------+--------------------+----------------+---------------+-------------------+-----------------+----------------+-------------------+
|89D227B655E5C82AE...|BA96DE419E711691B...|      -73.978165|      40.757977|2013-01-01 15:11:48|       -73.989838|       40.751171|2013-01-01 15:18:10|
|0BD7C8F5BA12B88E0...|9FD8F69F0804BDB55...|      -74.006683|      40.731781|2013-01-06 00:18:35|       -73.994499|        40.75066|2013-01-06 00:22:54|
|0BD7C8F5BA12B88E0...|9FD8F69F0804BDB55...|      -74.004707|       40.73777|2013-01-05 18:49:41|       -74.009834|       40.726002|2013-01-05 18:54:23|
|DFD2202EE08F7A8DC...|51EE87E3205C985EF...|      -73.974602|      40.759945|2013-01-07 2

### Data Preparation for Queries

In [52]:
# filter out rows with None values in coordinates
filtered_trip_data_df = trip_data_df.filter(
    (col("pickup_longitude").isNotNull()) & 
    (col("pickup_latitude").isNotNull()) & 
    (col("dropoff_longitude").isNotNull()) & 
    (col("dropoff_latitude").isNotNull())
)

In [53]:
# load the geojson data
with open("nyc-boroughs.geojson") as f:
    nyc_boroughs_geojson_data = json.load(f)

In [7]:
# nyc_boroughs_geojson_data

In [54]:
# Use geopandas to convert the GeoJSON to a DataFrame
boroughs_gdf = gpd.GeoDataFrame.from_features(nyc_boroughs_geojson_data["features"])

In [55]:
boroughs_gdf

Unnamed: 0,geometry,boroughCode,borough,@id
0,"POLYGON ((-74.05051 40.56642, -74.04998 40.566...",5,Staten Island,http://nyc.pediacities.com/Resource/Borough/St...
1,"POLYGON ((-74.05314 40.57770, -74.05406 40.577...",5,Staten Island,http://nyc.pediacities.com/Resource/Borough/St...
2,"POLYGON ((-74.15946 40.64145, -74.15998 40.641...",5,Staten Island,http://nyc.pediacities.com/Resource/Borough/St...
3,"POLYGON ((-74.08221 40.64828, -74.08142 40.648...",5,Staten Island,http://nyc.pediacities.com/Resource/Borough/St...
4,"POLYGON ((-73.83668 40.59495, -73.83671 40.594...",4,Queens,http://nyc.pediacities.com/Resource/Borough/Qu...
...,...,...,...,...
99,"POLYGON ((-73.78103 40.87648, -73.78121 40.876...",2,Bronx,http://nyc.pediacities.com/Resource/Borough/Bronx
100,"POLYGON ((-73.78651 40.88094, -73.78582 40.880...",2,Bronx,http://nyc.pediacities.com/Resource/Borough/Bronx
101,"POLYGON ((-73.87295 40.90444, -73.85947 40.900...",2,Bronx,http://nyc.pediacities.com/Resource/Borough/Bronx
102,"POLYGON ((-73.80518 40.81527, -73.80508 40.815...",2,Bronx,http://nyc.pediacities.com/Resource/Borough/Bronx


In [56]:
boroughs_gdf["geometry"] = boroughs_gdf["geometry"].apply(shape)
boroughs_gdf

Unnamed: 0,geometry,boroughCode,borough,@id
0,"POLYGON ((-74.05051 40.56642, -74.04998 40.566...",5,Staten Island,http://nyc.pediacities.com/Resource/Borough/St...
1,"POLYGON ((-74.05314 40.57770, -74.05406 40.577...",5,Staten Island,http://nyc.pediacities.com/Resource/Borough/St...
2,"POLYGON ((-74.15946 40.64145, -74.15998 40.641...",5,Staten Island,http://nyc.pediacities.com/Resource/Borough/St...
3,"POLYGON ((-74.08221 40.64828, -74.08142 40.648...",5,Staten Island,http://nyc.pediacities.com/Resource/Borough/St...
4,"POLYGON ((-73.83668 40.59495, -73.83671 40.594...",4,Queens,http://nyc.pediacities.com/Resource/Borough/Qu...
...,...,...,...,...
99,"POLYGON ((-73.78103 40.87648, -73.78121 40.876...",2,Bronx,http://nyc.pediacities.com/Resource/Borough/Bronx
100,"POLYGON ((-73.78651 40.88094, -73.78582 40.880...",2,Bronx,http://nyc.pediacities.com/Resource/Borough/Bronx
101,"POLYGON ((-73.87295 40.90444, -73.85947 40.900...",2,Bronx,http://nyc.pediacities.com/Resource/Borough/Bronx
102,"POLYGON ((-73.80518 40.81527, -73.80508 40.815...",2,Bronx,http://nyc.pediacities.com/Resource/Borough/Bronx


In [57]:
boroughs_broadcast = spark.sparkContext.broadcast(boroughs_gdf)

In [58]:
# create udf
def get_borough(longitude, latitude):
    if longitude is None or latitude is None:
        return 'Unknown'
    point = Point(longitude, latitude)
    for _, row in boroughs_broadcast.value.iterrows():
        if row["geometry"].contains(point):
            return row["borough"]
    return 'Unknown'

In [59]:
get_borough_udf = udf(get_borough, StringType())

In [60]:
# apply UDF to trip data for both pickup_borough and dropoff borough
filtered_trip_data_df = filtered_trip_data_df.withColumn("pickup_borough", get_borough_udf(col("pickup_longitude"), col("pickup_latitude")))
filtered_trip_data_df = filtered_trip_data_df.withColumn("dropoff_borough", get_borough_udf(col("dropoff_longitude"), col("dropoff_latitude")))

In [21]:
# trip_data_df = trip_data_df.withColumn("pickup_borough", get_borough_udf(trip_data_df["pickup_longitude"], trip_data_df["pickup_latitude"]))
# trip_data_df = trip_data_df.withColumn("dropoff_borough", get_borough_udf(trip_data_df["dropoff_longitude"], trip_data_df["dropoff_latitude"]))

In [61]:
filtered_trip_data_df.show(5)

+--------------------+--------------------+----------------+---------------+-------------------+-----------------+----------------+-------------------+--------------+---------------+
|           medallion|        hack_license|pickup_longitude|pickup_latitude|    pickup_datetime|dropoff_longitude|dropoff_latitude|   dropoff_datetime|pickup_borough|dropoff_borough|
+--------------------+--------------------+----------------+---------------+-------------------+-----------------+----------------+-------------------+--------------+---------------+
|89D227B655E5C82AE...|BA96DE419E711691B...|      -73.978165|      40.757977|2013-01-01 15:11:48|       -73.989838|       40.751171|2013-01-01 15:18:10|     Manhattan|      Manhattan|
|0BD7C8F5BA12B88E0...|9FD8F69F0804BDB55...|      -74.006683|      40.731781|2013-01-06 00:18:35|       -73.994499|        40.75066|2013-01-06 00:22:54|     Manhattan|      Manhattan|
|0BD7C8F5BA12B88E0...|9FD8F69F0804BDB55...|      -74.004707|       40.73777|2013-01-0

In [23]:
# Convert time to unix timestamp and compute duration of trip
filtered_trip_data_df = filtered_trip_data_df.withColumn("pickup_time", unix_timestamp(filtered_trip_data_df["pickup_datetime"]))
filtered_trip_data_df = filtered_trip_data_df.withColumn("dropoff_time", unix_timestamp(filtered_trip_data_df["dropoff_datetime"]))
filtered_trip_data_df = filtered_trip_data_df.withColumn("duration", filtered_trip_data_df["dropoff_time"] - filtered_trip_data_df["pickup_time"])

In [24]:
# remove trips that are outliers and have a negative duration [CLEAN DATA]
clean_trip_data_df = filtered_trip_data_df.filter((filtered_trip_data_df["duration"] > 0) & (filtered_trip_data_df["duration"] <= 4 * 3600))

In [25]:
clean_trip_data_df.show(5)

+--------------------+--------------------+---------+---------+------------------+-------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+--------------+---------------+-----------+------------+--------+
|           medallion|        hack_license|vendor_id|rate_code|store_and_fwd_flag|    pickup_datetime|   dropoff_datetime|passenger_count|trip_time_in_secs|trip_distance|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|pickup_borough|dropoff_borough|pickup_time|dropoff_time|duration|
+--------------------+--------------------+---------+---------+------------------+-------------------+-------------------+---------------+-----------------+-------------+----------------+---------------+-----------------+----------------+--------------+---------------+-----------+------------+--------+
|89D227B655E5C82AE...|BA96DE419E711691B...|      CMT|        1|                 N|2013-0

### Query 1: Utilization

In [62]:
# create window partitioning by pickup time
window_spec = Window.partitionBy("hack_license").orderBy("pickup_time")
calc_trip_data_df = clean_trip_data_df.withColumn("previous_dropoff", lag("dropoff_time").over(window_spec))

In [63]:
# compute the idle time: pickup - last dropoff
calc_trip_data_df = calc_trip_data_df.withColumn("idle_time", calc_trip_data_df["pickup_time"] - calc_trip_data_df["previous_dropoff"])

In [64]:
#remove trips where the difference between subsequent rides are huge.
calc_trip_data_df = calc_trip_data_df.filter((calc_trip_data_df["idle_time"] > 0) & (calc_trip_data_df["idle_time"] <= 4 * 3600))

In [65]:
#total idle time per taxi indicated by hack license
utilization_df = calc_trip_data_df.groupBy("hack_license").agg(spark_sum("idle_time").alias("total_idle_time")).orderBy(desc("total_idle_time"))

In [66]:
utilization_df.show(10)

+--------------------+---------------+
|        hack_license|total_idle_time|
+--------------------+---------------+
|9D1B49F1300FE0067...|          48120|
|FFEE627F76A3A498D...|          45780|
|4EB96EC9F3A42794D...|          43860|
|B7661DF207AD66261...|          43020|
|14FAF868BAA670F06...|          41580|
|6B4F56A1A0C191C4D...|          40920|
|5F7F4EE09860E7732...|          40560|
|F13CAF0F4613574F4...|          40560|
|5305DE3681B59EC2C...|          40140|
|018ACEDAC372E95A9...|          39600|
+--------------------+---------------+
only showing top 10 rows



In [68]:
#last 10
utilization_df.orderBy("total_idle_time").show(10)

+--------------------+---------------+
|        hack_license|total_idle_time|
+--------------------+---------------+
|23A1E6B3FF2D80309...|             60|
|31A1F192A01B30B2C...|             60|
|1603132156F27D303...|             60|
|E3199D5923AEBE5B4...|             60|
|1735076F27B86A649...|             60|
|3836463623075CCF1...|             60|
|0772E85CB3C3FB46F...|             60|
|06150B4FF9CD737D0...|             60|
|751EE304AD669A5C9...|             60|
|49365436007E31EE7...|             60|
+--------------------+---------------+
only showing top 10 rows



In [69]:
# Compute the average of the total_idle_time column
average_idle_time_df = utilization_df.agg(avg("total_idle_time").alias("average_total_idle_time"))
average_idle_time_df.show()

+-----------------------+
|average_total_idle_time|
+-----------------------+
|     10904.484590478474|
+-----------------------+



In [60]:
# # Store Utilization Results
# utilization_df.write.mode("overwrite").parquet("delta/utilization_parquet")
# # utilization_df.write.mode("overwrite").csv("delta/utilization_csv", header=True)

# calc_trip_data_df.write.mode("overwrite").parquet("delta/calc_trip_data_df_parquet")
# # calc_trip_data_df.write.mode("overwrite").csv("delta/calc_trip_data_df_csv", header=True)

In [70]:
print(f"Total Number of Rows in Utilization_df: {utilization_df.count()}")
# Indicates the unique number of cars/drivers

Total Number of Rows in Utilization_df: 8339


In [71]:
# Save the DataFrame as a CSV file
utilization_df.write.csv("output/utilization", header=True, mode="overwrite")

### Query 2: Average Time to Find Next Fare Per Destination Borough

In [72]:
calc_trip_data_df.filter(calc_trip_data_df["pickup_time"].isNull()).count()

0

In [None]:
# calc_trip_data_df.filter(calc_trip_data_df["dropoff_time"].isNull()).count()

In [None]:
# print(f"{calc_trip_data_df.count()}")

In [73]:
# sliding window of pickup time to the next immediate pickup
calc_trip_data_df = calc_trip_data_df.withColumn("next_pickup", lead("pickup_time").over(window_spec))
calc_trip_data_df = calc_trip_data_df.withColumn("time_to_next_fare", when(col("next_pickup").isNotNull(), col("next_pickup") - col("dropoff_time")).otherwise(None))

In [74]:
calc_trip_data_df = calc_trip_data_df.filter(col("time_to_next_fare").isNotNull())

In [75]:
average_time_df = calc_trip_data_df.groupBy("dropoff_borough").agg(avg("time_to_next_fare").alias("average_time_to_next_fare")).orderBy(desc("average_time_to_next_fare"))

In [76]:
average_time_df.show(10)

+---------------+-------------------------+
|dropoff_borough|average_time_to_next_fare|
+---------------+-------------------------+
|  Staten Island|                   4125.0|
|       Brooklyn|        3727.846865364851|
|         Queens|       3379.2986818454165|
|          Bronx|       3128.3783783783783|
|           NULL|       2038.0407754010696|
|      Manhattan|       1046.8417208814271|
+---------------+-------------------------+



In [37]:
# print(f"Total Number of Rows in average_time_df: {average_time_df.count()}")
# Indicates the unique number of cars/drivers. Should be the same as above.

Total Number of Rows in average_time_df: 6


### Query 3: Number of Trips Within the Same Borough

In [77]:
# total trips that start in one and end within a borough
same_borough_df = calc_trip_data_df.filter(calc_trip_data_df["pickup_borough"] == calc_trip_data_df["dropoff_borough"]).groupBy("pickup_borough").count().orderBy(desc("count"))

In [78]:
same_borough_df.show(10)

+--------------+-----+
|pickup_borough|count|
+--------------+-----+
|     Manhattan|68789|
|        Queens|  736|
|      Brooklyn|  534|
|         Bronx|   32|
+--------------+-----+



In [40]:
# print(f"Total Number of Rows in same_borough_df: {same_borough_df.count()}")
# Indicates the number of boroughs we have for this.

Total Number of Rows in same_borough_df: 4


### Query 4: Number of Trips Between Different Boroughs

In [79]:
# total trips that start in one borough and end in the order
different_borough_df = calc_trip_data_df.filter(calc_trip_data_df["pickup_borough"] != calc_trip_data_df["dropoff_borough"]).groupBy("pickup_borough", "dropoff_borough").count().orderBy(desc("count"))

In [None]:
different_borough_df.show(20)

In [43]:
print(f"Total Number of Rows in different_borough_df: {different_borough_df.count()}")
# Indicates the number of unique A-B borough endings 

Total Number of Rows in different_borough_df: 11
