In [1]:
!pip install shapely

[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m23.2.1[0m[39;49m -> [0m[32;49m24.3.1[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython3 -m pip install --upgrade pip[0m


In [3]:
#Import libraries
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import lead, col, avg, when, count, unix_timestamp, lag, sum, round
import json
from shapely.geometry import Point, Polygon


In [4]:
# Initialize the Spark session
spark = SparkSession.builder.appName("TaxiDataAnalysis").getOrCreate()


24/11/04 16:59:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Using Spark's default log4j profile: org/apache/spark/log4j-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [5]:
# Read CSV and select required columns
df = (spark.read
      .option("header", True)
      .option("inferSchema", True)
      .csv("data/Sample NYC Data.csv")
      .select(
          "hack_license",
          "pickup_datetime",
          "dropoff_datetime",
          "pickup_longitude",
          "pickup_latitude",
          "dropoff_longitude",
          "dropoff_latitude"
      ))

# Display initial data
print("Initial DataFrame Schema:")
df.printSchema()
print("\nSample Data:")
df.show(5)

                                                                                

Initial DataFrame Schema:
root
 |-- hack_license: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)


Sample Data:


                                                                                

+--------------------+---------------+----------------+----------------+---------------+-----------------+----------------+
|        hack_license|pickup_datetime|dropoff_datetime|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|
+--------------------+---------------+----------------+----------------+---------------+-----------------+----------------+
|BA96DE419E711691B...| 01-01-13 15:11|  01-01-13 15:18|      -73.978165|      40.757977|       -73.989838|       40.751171|
|9FD8F69F0804BDB55...| 06-01-13 00:18|  06-01-13 00:22|      -74.006683|      40.731781|       -73.994499|        40.75066|
|9FD8F69F0804BDB55...| 05-01-13 18:49|  05-01-13 18:54|      -74.004707|       40.73777|       -74.009834|       40.726002|
|51EE87E3205C985EF...| 07-01-13 23:54|  07-01-13 23:58|      -73.974602|      40.759945|       -73.984734|       40.759388|
|51EE87E3205C985EF...| 07-01-13 23:25|  07-01-13 23:34|       -73.97625|      40.748528|       -74.002586|       40.747868|
+-------

# Data Cleaning and Duration Calculation


In [6]:
# Clean data and add duration column
df_cleaned = df \
    .withColumn("pickup_datetime", F.to_timestamp("pickup_datetime", "dd-MM-yy HH:mm")) \
    .withColumn("dropoff_datetime", F.to_timestamp("dropoff_datetime", "dd-MM-yy HH:mm")) \
    .withColumn("duration",
        F.unix_timestamp("dropoff_datetime") - F.unix_timestamp("pickup_datetime")) \
    .filter(
        # Remove null values
        F.col("pickup_datetime").isNotNull() &
        F.col("dropoff_datetime").isNotNull() &
        F.col("pickup_longitude").isNotNull() &
        F.col("pickup_latitude").isNotNull() &
        F.col("dropoff_longitude").isNotNull() &
        F.col("dropoff_latitude").isNotNull() &
        # Remove invalid times (negative duration or > 4 hours)
        (F.col("duration") > 0) &
        (F.col("duration") <= 14400)
    )

# Display cleaned data
print("Cleaned DataFrame Schema:")
df_cleaned.printSchema()
print("\nSample Cleaned Data:")
df_cleaned.show(5)

# Display summary statistics
print("\nRecord count before cleaning:", df.count())
print("Record count after cleaning:", df_cleaned.count())

Cleaned DataFrame Schema:
root
 |-- hack_license: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- duration: long (nullable = true)


Sample Cleaned Data:


                                                                                

+--------------------+-------------------+-------------------+----------------+---------------+-----------------+----------------+--------+
|        hack_license|    pickup_datetime|   dropoff_datetime|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|duration|
+--------------------+-------------------+-------------------+----------------+---------------+-----------------+----------------+--------+
|BA96DE419E711691B...|2013-01-01 15:11:00|2013-01-01 15:18:00|      -73.978165|      40.757977|       -73.989838|       40.751171|     420|
|9FD8F69F0804BDB55...|2013-01-06 00:18:00|2013-01-06 00:22:00|      -74.006683|      40.731781|       -73.994499|        40.75066|     240|
|9FD8F69F0804BDB55...|2013-01-05 18:49:00|2013-01-05 18:54:00|      -74.004707|       40.73777|       -74.009834|       40.726002|     300|
|51EE87E3205C985EF...|2013-01-07 23:54:00|2013-01-07 23:58:00|      -73.974602|      40.759945|       -73.984734|       40.759388|     240|
|51EE87E3205C985EF..

                                                                                


Record count before cleaning: 99999


[Stage 7:>                                                          (0 + 1) / 1]

Record count after cleaning: 99549


                                                                                

# Borough Boundary Processing

In [8]:
def load_borough_polygons(geojson_path):
    """Load and process borough boundaries from GeoJSON file"""
    with open(geojson_path) as f:
        gj = json.load(f)

    borough_polygons = {}
    # Sort features by borough code and polygon size
    for feature in sorted(gj['features'],
                         key=lambda x: (x['properties']['boroughCode'],
                                      -Polygon(x['geometry']['coordinates'][0]).area)):
        borough = feature['properties']['borough']
        polygon = Polygon(feature['geometry']['coordinates'][0])
        # Keep only the first (largest) polygon for each borough
        if borough not in borough_polygons:
            borough_polygons[borough] = polygon
    return borough_polygons

# Load borough polygons
borough_polygons = load_borough_polygons("data/nyc-boroughs.geojson")
broadcast_polygons = spark.sparkContext.broadcast(borough_polygons)

print("Loaded Boroughs:", list(borough_polygons.keys()))

Loaded Boroughs: ['Manhattan', 'Bronx', 'Brooklyn', 'Queens', 'Staten Island']


# Creating Spatial Matching Functions

In this section, we implement functions to match geographical coordinates to their corresponding NYC boroughs. We'll create and register a User-Defined Function (UDF) in Spark to enable efficient spatial queries across our dataset.

## Steps :
1. Create point-in-polygon matching function
2. Register the function as a Spark UDF
3. Prepare for parallel processing with broadcast variables

In [9]:
def get_borough(lon, lat, polygons):
    """Determine borough for a given coordinate pair"""
    if lon is None or lat is None:
        return None
    point = Point(lon, lat)
    for borough, polygon in polygons.items():
        if polygon.contains(point):
            return borough
    return None

# Register UDF
borough_udf = F.udf(
    lambda lon, lat: get_borough(lon, lat, broadcast_polygons.value),
    StringType()
)

# Data Enrichment with Borough Information



In [10]:
# Add borough information to cleaned dataset
df_final = df_cleaned \
    .withColumn("pickup_borough",
                borough_udf(F.col("pickup_longitude"), F.col("pickup_latitude"))) \
    .withColumn("dropoff_borough",
                borough_udf(F.col("dropoff_longitude"), F.col("dropoff_latitude")))

# Cache the final dataframe
df_final.cache()

# Display results
print("Final DataFrame Schema:")
df_final.printSchema()
print("\nSample Final Data:")
df_final.show(5)
print("\nTrips by Pickup Borough:")
df_final.groupBy("pickup_borough").count().show()
print("\nTrips by Dropoff Borough:")
df_final.groupBy("dropoff_borough").count().show()

Final DataFrame Schema:
root
 |-- hack_license: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- duration: long (nullable = true)
 |-- pickup_borough: string (nullable = true)
 |-- dropoff_borough: string (nullable = true)


Sample Final Data:


                                                                                

+--------------------+-------------------+-------------------+----------------+---------------+-----------------+----------------+--------+--------------+---------------+
|        hack_license|    pickup_datetime|   dropoff_datetime|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|duration|pickup_borough|dropoff_borough|
+--------------------+-------------------+-------------------+----------------+---------------+-----------------+----------------+--------+--------------+---------------+
|BA96DE419E711691B...|2013-01-01 15:11:00|2013-01-01 15:18:00|      -73.978165|      40.757977|       -73.989838|       40.751171|     420|     Manhattan|      Manhattan|
|9FD8F69F0804BDB55...|2013-01-06 00:18:00|2013-01-06 00:22:00|      -74.006683|      40.731781|       -73.994499|        40.75066|     240|     Manhattan|      Manhattan|
|9FD8F69F0804BDB55...|2013-01-05 18:49:00|2013-01-05 18:54:00|      -74.004707|       40.73777|       -74.009834|       40.726002|     300|     M

                                                                                

+--------------+-----+
|pickup_borough|count|
+--------------+-----+
|        Queens| 5871|
|          null| 1681|
|      Brooklyn| 1960|
| Staten Island|    2|
|     Manhattan|89956|
|         Bronx|   79|
+--------------+-----+


Trips by Dropoff Borough:


                                                                                

+---------------+-----+
|dropoff_borough|count|
+---------------+-----+
|         Queens| 5437|
|           null| 2107|
|       Brooklyn| 3591|
|  Staten Island|   13|
|      Manhattan|88008|
|          Bronx|  393|
+---------------+-----+



In [11]:
# Count rows where pickup_borough is "null"
pickup_null_count = df_final.filter(df_final["pickup_borough"].isNull()).count()

# Count rows where dropoff_borough is "null"
dropoff_null_count = df_final.filter(df_final["dropoff_borough"].isNull()).count()

print(f"Number of rows with 'null' pickup_borough: {pickup_null_count}")
print(f"Number of rows with 'null' dropoff_borough: {dropoff_null_count}")



                                                                                

Number of rows with 'null' pickup_borough: 1681
Number of rows with 'null' dropoff_borough: 2107


In [12]:
df_final = df_final.filter(
    (df_final["pickup_borough"].isNotNull()) &
    (df_final["dropoff_borough"].isNotNull())
)


# Query1 : Utilization  - Understanding Taxi Utilization

In [13]:
#  1 --> Create window specification for ordering trips by taxi and pickup time
window_spec = Window.partitionBy("hack_license").orderBy("pickup_datetime")


In [14]:
# 2 --> Calculating idle times
df_with_idle = df_final.withColumn(
    "prev_dropoff_ts",
    lag("dropoff_datetime").over(window_spec)
).withColumn(
    "idle_time_seconds",
    when(
        col("prev_dropoff_ts").isNotNull(),
        unix_timestamp(col("pickup_datetime")) - unix_timestamp(col("prev_dropoff_ts"))
    ).otherwise(0)
)

In [15]:
# 3 --> Identify New Sessions and Filter Idle Time
df_with_sessions = df_with_idle.withColumn(
    "is_new_session",
    when(col("idle_time_seconds") > 14400, True).otherwise(False)
).withColumn(
    "valid_idle_time",
    when(
        (col("idle_time_seconds") <= 14400) & (col("idle_time_seconds") > 0),
        col("idle_time_seconds")
    ).otherwise(0)
)


In [16]:

# 4--> Calculate session numbers
df_with_session_nums = df_with_sessions.withColumn(
    "session_id",
    sum(col("is_new_session").cast("int")).over(window_spec)
)


In [17]:
# 5--> Calculate utilization metrics
print("Computing utilization rates...")
utilization = df_with_session_nums.groupBy("hack_license", "session_id").agg(
    sum("duration").alias("total_occupied_time"),
    sum("valid_idle_time").alias("total_idle_time"),
    count("*").alias("trips_count")
).withColumn(
    "total_time",
    col("total_occupied_time") + col("total_idle_time")
).withColumn(
    "utilization_rate",
    round((col("total_occupied_time") / col("total_time") * 100), 2)
)


Computing utilization rates...


In [18]:
# Cache the results for multiple analyses
utilization.cache()

print("Utilization calculation completed!")

Utilization calculation completed!


In [20]:
utilization.show(5)

[Stage 36:>                                                         (0 + 1) / 1]

+--------------------+----------+-------------------+---------------+-----------+----------+----------------+
|        hack_license|session_id|total_occupied_time|total_idle_time|trips_count|total_time|utilization_rate|
+--------------------+----------+-------------------+---------------+-----------+----------+----------------+
|02856AFC22881ABCA...|         0|               1860|           2640|          3|      4500|           41.33|
|03A2D28F831C5C3E5...|         0|               6900|          19020|          8|     25920|           26.62|
|069B5562096AF7684...|         0|                600|              0|          1|       600|           100.0|
|0FBF11956EE14B253...|         0|               7800|          25560|         10|     33360|           23.38|
|130328475AD7427AF...|         0|                720|              0|          1|       720|           100.0|
+--------------------+----------+-------------------+---------------+-----------+----------+----------------+
only showi

                                                                                

## Query 2 : The average time it takes for a taxi to find its next fare(trip) per destination borough. ##

In [23]:
# Get the pickup timestamp of the next trip for each taxi's row
df_final_with_next_pickup_timestamp = df_final.withColumn("next_pickup_timestamp",
                                                  lead("pickup_datetime").over(window_spec))

In [24]:
# Compute the time difference in seconds between dropoff and the next pickup
df_with_with_waiting_time = df_final_with_next_pickup_timestamp.withColumn(
    "waiting_time",
    F.when(
        F.col("next_pickup_timestamp").isNotNull(),
        (F.unix_timestamp("next_pickup_timestamp") - F.unix_timestamp("dropoff_datetime"))
    ).otherwise(None)
)

df_with_with_waiting_time.show()

                                                                                

+--------------------+-------------------+-------------------+----------------+---------------+-----------------+----------------+--------+--------------+---------------+---------------------+------------+
|        hack_license|    pickup_datetime|   dropoff_datetime|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|duration|pickup_borough|dropoff_borough|next_pickup_timestamp|waiting_time|
+--------------------+-------------------+-------------------+----------------+---------------+-----------------+----------------+--------+--------------+---------------+---------------------+------------+
|02856AFC22881ABCA...|2013-01-13 02:42:00|2013-01-13 02:51:00|      -73.956192|      40.772175|       -73.993713|       40.761349|     540|     Manhattan|      Manhattan|  2013-01-13 03:32:00|        2460|
|02856AFC22881ABCA...|2013-01-13 03:32:00|2013-01-13 03:41:00|      -74.008347|      40.725906|       -73.993164|       40.722485|     540|     Manhattan|      Manhattan|  2013

In [25]:
# Compute the average waiting time for each taxi in each dropoff borough
avg_wait_time_per_taxi_per_borough = (df_with_with_waiting_time.groupBy("hack_license", "dropoff_borough")
                                .agg(avg("waiting_time").alias("avg_waiting_time_seconds"))
                                .orderBy("hack_license", "dropoff_borough"))

In [26]:
#Droping the no needed any longer column
avg_wait_time_per_taxi_per_borough = avg_wait_time_per_taxi_per_borough.drop("next_pickup_timestamp", "waiting_time")


In [27]:
# Show the results
avg_wait_time_per_taxi_per_borough.show()


                                                                                

+--------------------+---------------+------------------------+
|        hack_license|dropoff_borough|avg_waiting_time_seconds|
+--------------------+---------------+------------------------+
|001C8AAB90AEE49F3...|       Brooklyn|                  5940.0|
|001C8AAB90AEE49F3...|      Manhattan|                  3510.0|
|0025133AD810DBE80...|      Manhattan|                  2400.0|
|002C093A2CB9FD40C...|       Brooklyn|                   450.0|
|002C093A2CB9FD40C...|      Manhattan|      1029.2307692307693|
|002C093A2CB9FD40C...|         Queens|                  1020.0|
|00374328FBA75FBFC...|         Queens|                    null|
|00447A6197DBB329F...|       Brooklyn|                  2940.0|
|00447A6197DBB329F...|      Manhattan|                  2850.0|
|00447A6197DBB329F...|         Queens|                  4800.0|
|0046F1E91AA13DEDE...|      Manhattan|       553.3333333333334|
|00567B1CBFD51DDFA...|      Manhattan|                   504.0|
|0057CCB5BA8D29E34...|      Manhattan|  




## Computing the number of trips that started and ended within the same borough##


In [None]:
same_borough_trips = df_final.filter(col("pickup_borough") == col("dropoff_borough"))

# Count the number of these trips grouped by borough
count_same_borough_trips = same_borough_trips.agg(count("*").alias("number_of_trips"))


count_same_borough_trips.show()

## The number of trips that started in one borough and ended in another one


In [70]:
# Filter rows where pickup_borough is not equal to dropoff_borough
inter_borough_trips = df_final.filter(col("pickup_borough") != col("dropoff_borough"))

# Count the number of these trips grouped by pickup and dropoff boroughs
count_inter_borough_trips = inter_borough_trips.agg(count("*").alias("number_of_trips"))

count_inter_borough_trips.show()

+---------------+
|number_of_trips|
+---------------+
|          11403|
+---------------+

