# New York City Taxi Data Analysis


## Table of Contents
1. [Project Overview](#project-overview)
2. [Data Source Description](#data-source-description)
3. [Project Steps](#project-steps)
    - [Initialize PySpark Session](#initialize-pyspark-session)
    - [Data Loading](#data-loading)
    - [Data Enrichment](#data-enrichment)
    - [Data Cleaning](#data-cleaning)
    - [Query 1: Utilization](#query1-utilization)
    - [Query 2: Average Time to Next Fare](#query2-average-time)
    - [Query 3: Intra-Borough Trips](#query3-intra-borough)
    - [Query 4: Inter-Borough Trips](#query4-inter-borough)
4. [Conclusion](#conclusion)


## Project Overview
This project aims to analyze a dataset containing New York City taxi ride information. The main goal is to compute the utilization of each taxi, understand the time it takes for a taxi to find its next passenger based on the borough of drop-off, and understand the intra-borough and inter-borough trips.



## Data Source Description

Each row in the dataset represents a single taxi ride in CSV format. The dataset provides:

* Unique ID for the taxi (license)
* Pick-up location (latitude and longitude)
* Pick-up time
* Drop-off location (latitude and longitude)
* Drop-off time \
Additionally, the dataset includes a .geojson file with the geographical boundaries of the various NYC boroughs to identify and associate each ride with a specific borough.



## Project Steps


### Initialize PySpark Session


In [51]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("Taxi Ride Data Analysis") \
    .master("local[*]") \
    .config("spark.dynamicAllocation.enabled", "true") \
    .config("spark.shuffle.service.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.sql.shuffle.partitions", "100") \
    .config("spark.ui.retainedStages", 100) \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")


### Data Loading


In [63]:
from pyspark.sql.functions import col

# Read the CSV file with schema inference and immediately select the necessary columns
taxi_data_final = (spark.read.csv("Sample_NYC_Data.csv", header=True, inferSchema=True)
            .select("hack_license", 
                    "pickup_datetime", 
                    "pickup_longitude", 
                    "pickup_latitude", 
                    "dropoff_datetime", 
                    "dropoff_longitude", 
                    "dropoff_latitude"))

# Cache the data
taxi_data_final.cache()


# Show the filtered data
taxi_data_final.limit(5).show(truncate=False)


23/10/21 23:28:28 WARN CacheManager: Asked to cache already cached data.


+--------------------------------+---------------+----------------+---------------+----------------+-----------------+----------------+
|hack_license                    |pickup_datetime|pickup_longitude|pickup_latitude|dropoff_datetime|dropoff_longitude|dropoff_latitude|
+--------------------------------+---------------+----------------+---------------+----------------+-----------------+----------------+
|BA96DE419E711691B9445D6A6307C170|01-01-13 15:11 |-73.978165      |40.757977      |01-01-13 15:18  |-73.989838       |40.751171       |
|9FD8F69F0804BDB5549F40E9DA1BE472|06-01-13 0:18  |-74.006683      |40.731781      |06-01-13 0:22   |-73.994499       |40.75066        |
|9FD8F69F0804BDB5549F40E9DA1BE472|05-01-13 18:49 |-74.004707      |40.73777       |05-01-13 18:54  |-74.009834       |40.726002       |
|51EE87E3205C985EF8431D850C786310|07-01-13 23:54 |-73.974602      |40.759945      |07-01-13 23:58  |-73.984734       |40.759388       |
|51EE87E3205C985EF8431D850C786310|07-01-13 23:25

### Data Cleaning

In [7]:
from pyspark.sql import functions as F
from pyspark.sql.functions import from_unixtime, unix_timestamp

# Shape before cleaning
before_rows = taxi_data_final.count()
before_cols = len(taxi_data_final.columns)
print(f"Before cleaning: {before_rows} rows, {before_cols} columns")

Before cleaning: 99999 rows, 7 columns


In [64]:
from pyspark.sql import SparkSession 
from pyspark.sql.functions import unix_timestamp, col

# Assuming spark session is already created
# spark = SparkSession.builder.appName("TaxiData").getOrCreate()

taxi_data_final = taxi_data_final

# Convert the date columns to unix timestamps
taxi_data_final = taxi_data_final.withColumn("pickup_timestamp", unix_timestamp("pickup_datetime", "dd-MM-yy H:mm").cast("timestamp"))
taxi_data_final = taxi_data_final.withColumn("dropoff_timestamp", unix_timestamp("dropoff_datetime", "dd-MM-yy H:mm").cast("timestamp"))

# Calculate the duration in minutes between pickup and dropoff
taxi_data_final = taxi_data_final.withColumn("Duration", (col("dropoff_timestamp").cast("long") - col("pickup_timestamp").cast("long")) / 60)

# Remove the old date columns
taxi_data_final = taxi_data_final.drop("pickup_datetime", "dropoff_datetime")

# Show the schema and the results
taxi_data_final.printSchema()
taxi_data_final.limit(5).show(truncate=False)


root
 |-- hack_license: string (nullable = true)
 |-- pickup_longitude: double (nullable = true)
 |-- pickup_latitude: double (nullable = true)
 |-- dropoff_longitude: double (nullable = true)
 |-- dropoff_latitude: double (nullable = true)
 |-- pickup_timestamp: timestamp (nullable = true)
 |-- dropoff_timestamp: timestamp (nullable = true)
 |-- Duration: double (nullable = true)

+--------------------------------+----------------+---------------+-----------------+----------------+-------------------+-------------------+--------+
|hack_license                    |pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|pickup_timestamp   |dropoff_timestamp  |Duration|
+--------------------------------+----------------+---------------+-----------------+----------------+-------------------+-------------------+--------+
|BA96DE419E711691B9445D6A6307C170|-73.978165      |40.757977      |-73.989838       |40.751171       |2013-01-01 15:11:00|2013-01-01 15:18:00|7.0     |
|9FD8F6

In [65]:
for col_name in taxi_data_final.columns:
    null_count = taxi_data_final.filter(taxi_data_final[col_name].isNull()).count()
    print(f"Number of null values in {col_name}: {null_count}")


Number of null values in hack_license: 0
Number of null values in pickup_longitude: 0
Number of null values in pickup_latitude: 0
Number of null values in dropoff_longitude: 0
Number of null values in dropoff_latitude: 0
Number of null values in pickup_timestamp: 0
Number of null values in dropoff_timestamp: 0
Number of null values in Duration: 0


In [66]:
from pyspark.sql.functions import col

taxi_data_final = taxi_data_final.filter(
    # NYC bounds
    (col("pickup_latitude") >= 40.4774) & (col("pickup_latitude") <= 40.9176) &
    (col("pickup_longitude") >= -74.2591) & (col("pickup_longitude") <= -73.7004) &
    (col("dropoff_latitude") >= 40.4774) & (col("dropoff_latitude") <= 40.9176) &
    (col("dropoff_longitude") >= -74.2591) & (col("dropoff_longitude") <= -73.7004) &

    # Ensure Duration is reasonable (assuming it exists and is in minutes)
    (col("Duration") > 0) & (col("Duration") <= 4*60) &

    # Filter out zeros for latitudes and longitudes
    (col("pickup_longitude") != 0) & 
    (col("pickup_latitude") != 0) & 
    (col("dropoff_longitude") != 0) & 
    (col("dropoff_latitude") != 0)
)


### Data Enrichment


In [67]:
# Loading the geojson file
import json
with open('nyc-boroughs.geojson', 'r') as file:
    geojson_data = json.load(file)

In [None]:
# Broadcasting the geojson data to the workers
broadcast_geojson = spark.sparkContext.broadcast(geojson_data)

# Verify that the broadcast has been done
assert broadcast_geojson.value == geojson_data, "Broadcasting failed!"


In [71]:
from shapely.geometry import Point, shape
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def preprocess_geojson(geojson_boroughs: dict) -> list:
    """
    Preprocesses the geojson data to create a sorted list of polygons.
    
    Args:
    - geojson_boroughs (dict): The geojson data containing polygons and their properties.
    
    Returns:
    - list: A sorted list of tuples where each tuple contains a polygon and its associated properties.
    """
    
    polygons_with_props = [
        (shape(feature['geometry']), feature['properties'])
        for feature in geojson_boroughs['features']
    ]
    return sorted(polygons_with_props, key=lambda x: (x[1]['boroughCode'], -x[0].area))

def get_borough(longitude: float, latitude: float, sorted_polygons: list) -> str:
    """
    Return the borough name for a given longitude and latitude.
    
    Args:
    - longitude (float): Longitude of the location.
    - latitude (float): Latitude of the location.
    - sorted_polygons (list): List of preprocessed polygons for lookup.
    
    Returns:
    - str: Name of the borough if the point falls within a polygon, otherwise None.
    """
    point = Point(longitude, latitude)
    for polygon, properties in sorted_polygons:
        if polygon.contains(point):
            return properties['borough']
    return None

# Preprocess polygons and then broadcast
sorted_polygons = preprocess_geojson(broadcast_geojson.value)
sorted_polygons_broadcast = spark.sparkContext.broadcast(sorted_polygons)

# Convert the get_borough function to a UDF using lambda to pass the broadcasted sorted_polygons
borough_udf = udf(lambda lon, lat: get_borough(lon, lat, sorted_polygons_broadcast.value), StringType())


In [73]:
# Use withColumn for pickup_borough
taxi_data_enriched = (taxi_data_final
                      .withColumn("pickup_borough", 
                                  borough_udf(col("pickup_longitude"), col("pickup_latitude"))))

# Use withColumn again for dropoff_borough
taxi_data_enriched = (taxi_data_enriched
                      .withColumn("dropoff_borough", 
                                  borough_udf(col("dropoff_longitude"), col("dropoff_latitude"))))

# Show results
taxi_data_enriched.limit(5).select("pickup_borough", "dropoff_borough").show(truncate=False)


[Stage 272:>                                                        (0 + 1) / 1]

+--------------+---------------+
|pickup_borough|dropoff_borough|
+--------------+---------------+
|Manhattan     |Manhattan      |
|Manhattan     |Manhattan      |
|Manhattan     |Manhattan      |
|Manhattan     |Manhattan      |
|Manhattan     |Manhattan      |
+--------------+---------------+



                                                                                

In [76]:
columns_to_drop = ['pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude']
taxi_data_enriched = taxi_data_enriched.drop(*columns_to_drop)

In [75]:
# Display distinct pickup boroughs
taxi_data_enriched.select("pickup_borough").distinct().show()

# Display distinct dropoff boroughs
taxi_data_enriched.select("dropoff_borough").distinct().show()


                                                                                

+--------------+
|pickup_borough|
+--------------+
|        Queens|
|      Brooklyn|
| Staten Island|
|          null|
|     Manhattan|
|         Bronx|
+--------------+





+---------------+
|dropoff_borough|
+---------------+
|         Queens|
|       Brooklyn|
|  Staten Island|
|           null|
|      Manhattan|
|          Bronx|
+---------------+



                                                                                

In [77]:
# Count rows where pickup_borough is "null"
pickup_null_count = taxi_data_enriched.filter(taxi_data_enriched["pickup_borough"].isNull()).count()

# Count rows where dropoff_borough is "null"
dropoff_null_count = taxi_data_enriched.filter(taxi_data_enriched["dropoff_borough"].isNull()).count()

print(f"Number of rows with 'null' pickup_borough: {pickup_null_count}")
print(f"Number of rows with 'null' dropoff_borough: {dropoff_null_count}")




Number of rows with 'null' pickup_borough: 119
Number of rows with 'null' dropoff_borough: 373


                                                                                

In [78]:
taxi_data_enriched = taxi_data_enriched.filter(
    (taxi_data_enriched["pickup_borough"].isNotNull()) & 
    (taxi_data_enriched["dropoff_borough"].isNotNull())
)


In [79]:
# Display distinct pickup boroughs
taxi_data_enriched.select("pickup_borough").distinct().show()

# Display distinct dropoff boroughs
taxi_data_enriched.select("dropoff_borough").distinct().show()


                                                                                

+--------------+
|pickup_borough|
+--------------+
|        Queens|
|      Brooklyn|
| Staten Island|
|     Manhattan|
|         Bronx|
+--------------+





+---------------+
|dropoff_borough|
+---------------+
|         Queens|
|       Brooklyn|
|  Staten Island|
|      Manhattan|
|          Bronx|
+---------------+



                                                                                

In [80]:
# Shape after cleaning
print(taxi_data_enriched.count())



97375


                                                                                

### Query 2: Average Time to Next Fare

In [24]:
from pyspark.sql import Window
from pyspark.sql.functions import lead, col, avg, when

# Define a window specification
windowSpec = Window.partitionBy("hack_license").orderBy("pickup_timestamp")

# Get the pickup timestamp of the next trip for each taxi's row
taxi_data_enriched = taxi_data_enriched.withColumn("next_pickup_timestamp", 
                                                  lead("pickup_timestamp").over(windowSpec))

# Compute the time difference in seconds between dropoff and the next pickup
taxi_data_enriched = taxi_data_enriched.withColumn("waiting_time", 
           when(col("next_pickup_timestamp").isNotNull(), 
                (col("next_pickup_timestamp").cast("long") - col("dropoff_timestamp").cast("long")))
           .otherwise(None))

# Compute the average waiting time for each taxi in each dropoff borough
average_waiting_time_per_taxi_per_borough = (taxi_data_enriched.groupBy("hack_license", "dropoff_borough")
                                .agg(avg("waiting_time").alias("avg_waiting_time_seconds"))
                                .orderBy("hack_license", "dropoff_borough"))

#Droping the no needed any longer column
taxi_data_enriched = taxi_data_enriched.drop("next_pickup_timestamp", "waiting_time")

# Show the results
average_waiting_time_per_taxi_per_borough.show()




+--------------------+---------------+------------------------+
|        hack_license|dropoff_borough|avg_waiting_time_seconds|
+--------------------+---------------+------------------------+
|001C8AAB90AEE49F3...|       Brooklyn|                  5940.0|
|001C8AAB90AEE49F3...|      Manhattan|                  3510.0|
|0025133AD810DBE80...|      Manhattan|                  2400.0|
|002C093A2CB9FD40C...|       Brooklyn|                   450.0|
|002C093A2CB9FD40C...|      Manhattan|      1029.2307692307693|
|002C093A2CB9FD40C...|         Queens|                  1020.0|
|00374328FBA75FBFC...|         Queens|                    null|
|00447A6197DBB329F...|       Brooklyn|                  2940.0|
|00447A6197DBB329F...|      Manhattan|                  2850.0|
|00447A6197DBB329F...|         Queens|                  4800.0|
|0046F1E91AA13DEDE...|      Manhattan|       553.3333333333334|
|00567B1CBFD51DDFA...|      Manhattan|                   504.0|
|0057CCB5BA8D29E34...|      Manhattan|  

                                                                                

### Query 3: Intra-Borough Trips

In [81]:
from pyspark.sql.functions import count, col

same_borough_trips = taxi_data_enriched.filter(col("pickup_borough") == col("dropoff_borough"))

# Count the number of these trips grouped by borough
count_same_borough_trips = same_borough_trips.agg(count("*").alias("number_of_trips"))\
                                             .orderBy("number_of_trips", ascending=False)

count_same_borough_trips.show()




+---------------+
|number_of_trips|
+---------------+
|          85944|
+---------------+



                                                                                

### Query 4: Inter-Borough Trips

In [82]:
from pyspark.sql.functions import count, col

# Filter rows where pickup_borough is not equal to dropoff_borough
inter_borough_trips = taxi_data_enriched.filter(col("pickup_borough") != col("dropoff_borough"))

# Count the number of these trips grouped by pickup and dropoff boroughs
count_inter_borough_trips = inter_borough_trips.agg(count("*").alias("number_of_trips"))\
                                               .orderBy("number_of_trips", ascending=False)

count_inter_borough_trips.show()




+---------------+
|number_of_trips|
+---------------+
|          11431|
+---------------+



                                                                                

### Query 1 : Utilization: This is per taxi/driver. This can be computed by computing the idle time per taxi.

In [83]:
from pyspark.sql import Window
from pyspark.sql.functions import lag, col, when, unix_timestamp, sum

# Define a window specification
windowSpec = Window.partitionBy("hack_license").orderBy("pickup_timestamp")

# Calculate previous_dropoff_timestamp
taxi_data_enriched = taxi_data_enriched.withColumn(
    "previous_dropoff_timestamp", lag(col("dropoff_timestamp")).over(windowSpec)
)

# Calculate idle_time_seconds
taxi_data_enriched = taxi_data_enriched.withColumn(
    "idle_time_seconds",
    when(col("previous_dropoff_timestamp").isNull(), 0)
    .otherwise(
        (unix_timestamp("pickup_timestamp") - unix_timestamp("previous_dropoff_timestamp"))
    )
).withColumn(
    "idle_time_seconds",
    when(col("idle_time_seconds") > 14400, 0)
    .otherwise(col("idle_time_seconds"))
)

# Calculate the duration of each trip in seconds
taxi_data_enriched = taxi_data_enriched.withColumn(
    "trip_duration_seconds", 
    unix_timestamp("dropoff_timestamp") - unix_timestamp("pickup_timestamp")
)

# Drop the intermediate column to save memory
taxi_data_enriched = taxi_data_enriched.drop("previous_dropoff_timestamp")

# Group by taxi and borough, then compute total trip duration and total idle time
taxi_borough_utilization = taxi_data_enriched.groupBy("hack_license", "dropoff_borough").agg(
    sum("trip_duration_seconds").alias("total_trip_duration"),
    sum("idle_time_seconds").alias("total_idle_time")
)

# Calculate the utilization for each taxi within each borough
taxi_borough_utilization = taxi_borough_utilization.withColumn(
    "utilization",
    col("total_trip_duration") / (col("total_trip_duration") + col("total_idle_time"))
)

# Show a sample of the result
taxi_borough_utilization.select("hack_license", "dropoff_borough", "utilization").limit(5).show()




+--------------------+---------------+-------------------+
|        hack_license|dropoff_borough|        utilization|
+--------------------+---------------+-------------------+
|00BFF9F028A7A9365...|         Queens|                1.0|
|00BFF9F028A7A9365...|       Brooklyn| 0.2962962962962963|
|00BFF9F028A7A9365...|      Manhattan| 0.3504273504273504|
|02856AFC22881ABCA...|      Manhattan|0.41333333333333333|
|03A2D28F831C5C3E5...|      Manhattan|0.23809523809523808|
+--------------------+---------------+-------------------+



                                                                                

In [84]:
from pyspark.sql.functions import first, round

# Pivot the DataFrame
pivot_df = taxi_borough_utilization.groupBy("hack_license").pivot("dropoff_borough").agg(round(first("utilization") * 100, 2))

# Show the reshaped DataFrame
pivot_df.limit(10).show()




+--------------------+-----+--------+---------+------+-------------+
|        hack_license|Bronx|Brooklyn|Manhattan|Queens|Staten Island|
+--------------------+-----+--------+---------+------+-------------+
|00BFF9F028A7A9365...| null|   29.63|    35.04| 100.0|         null|
|02856AFC22881ABCA...| null|    null|    41.33|  null|         null|
|03A2D28F831C5C3E5...| null|    null|    23.81| 31.45|         null|
|05679B05C691B1A10...| null|    null|    56.25|  null|         null|
|069B5562096AF7684...| null|    null|    100.0|  null|         null|
|083D8CEB07A9C923F...| null|    null|    37.66| 87.69|         null|
|0FBF11956EE14B253...| null|    null|    24.38| 19.47|         null|
|1186CE6BC2838695A...| null|   48.15|    27.43|  null|         null|
|130328475AD7427AF...| null|    null|     null| 100.0|         null|
|138B0A7B7D3B898E4...| null|    null|     null| 100.0|         null|
+--------------------+-----+--------+---------+------+-------------+



                                                                                

### Analysis of Taxi Utilization
 


When analyzing the utilization of taxis, we employ the following formula:

$$
\text{Utilization} = \frac{\text{Total Trip Duration}}{\text{Total Trip Duration} + \text{Idle Time}}
$$

Given this formula, if a taxi has an **Idle Time** of \(0\) (indicating it has not been idle), the equation simplifies to:

$$
\text{Utilization} = \frac{\text{Total Trip Duration}}{\text{Total Trip Duration}} = 1
$$

Expressed as a percentage, this equates to \(100\%\).

This means that if a taxi records no idle time, it has been utilized for the entire duration considered, leading to a utilization rate of \(100\%\). This is an essential aspect to consider when interpreting taxis with maximum utilization, as it may hint at continuous service without any downtime and it needs further inspections.


In [35]:
from pyspark.sql.functions import count

# Extract all hack_license(s) with utilization of 1 (or 100% when expressed as a percentage)
licenses_with_100_utilization = pivot_df.filter(col("Manhattan") == 100).select("hack_license").rdd.flatMap(lambda x: x).collect()  
# Adjust the borough name as necessary if you are looking at a borough other than Manhattan

# Filter the taxi_data_enriched DataFrame for hack_license values with 100% utilization
filtered_data = taxi_data_enriched.filter(col("hack_license").isin(licenses_with_100_utilization))

# Group by hack_license and count entries
license_counts = filtered_data.groupBy("hack_license").agg(count("*").alias("num_entries"))

# Filter licenses with more than one entry
licenses_with_multiple_entries = license_counts.filter(col("num_entries") > 1)

# Show the hack_license values with more than one entry
licenses_with_multiple_entries.limit(5).show()




+--------------------+-----------+
|        hack_license|num_entries|
+--------------------+-----------+
|28A7C858D9231A3EC...|          2|
|CF1BF49F7229C883D...|          2|
|A3E12DBA6882D47DB...|          2|
|B3C8637D063490373...|          2|
|108AAECE2F72BB70F...|          2|
+--------------------+-----------+



                                                                                

In [43]:
# Pick the second hack_license value from the DataFrame
selected_license = licenses_with_multiple_entries.select("hack_license").collect()[1]["hack_license"]

# Filter the taxi_data_enriched DataFrame for the selected hack_license
selected_license_data = taxi_data_enriched.filter(col("hack_license") == selected_license)

# Show the details for the selected hack_license
selected_license_data.show()


                                                                                

+--------------------+-------------------+-------------------+--------+--------------+---------------+---------------------+------------+-----------------+---------------------+--------------------------+
|        hack_license|   pickup_timestamp|  dropoff_timestamp|Duration|pickup_borough|dropoff_borough|next_pickup_timestamp|waiting_time|idle_time_seconds|trip_duration_seconds|previous_dropoff_timestamp|
+--------------------+-------------------+-------------------+--------+--------------+---------------+---------------------+------------+-----------------+---------------------+--------------------------+
|CF1BF49F7229C883D...|2013-01-13 03:26:00|2013-01-13 03:37:00|    11.0|     Manhattan|      Manhattan|  2013-01-13 03:37:00|           0|                0|                  660|                      null|
|CF1BF49F7229C883D...|2013-01-13 03:37:00|2013-01-13 03:41:00|     4.0|     Manhattan|      Manhattan|                 null|        null|                0|                  240|   

> The pickup time is immediately after the dropoff time of the previous ride, indicating that there was no idle time between the two rides.