## Analysis of New York City Taxi Data Report


 **Written by:**
*   FAIZA ROUIBAH  
*   IKRAM MAGOUSSI

**Table of Contents**

Introduction

Project Steps

*   Initialize PySpark Session
*   Data Loading
*   Data Cleaning
*   Data Enrichment
*   Query 1: Utilization
*   Query 2: Average Time to Next Fare
*   Query 3: Intra-Borough Trips
*   Query 4: Inter-Borough Trips

Conclusion

**Introduction**

Analyzing transportation data is crucial for understanding urban mobility, and the New York City taxi dataset serves as a valuable resource for this study. This project focuses on four main objectives:


1.   **Utilization of taxis**: Assessing the fraction of time a taxi is on the road and occupied by passengers.
2.   **Average time to next fare**: Calculating the waiting time between dropping off a passenger and picking up the next one.

1.    **Intra-borough trips**: Identifying the number of trips that start and end within the same borough.
2.  **Inter-borough trips:** Counting trips that begin in one borough and end in another

We will utilize PySpark for handling large data volumes and Shapely for enriching geographic information. The analysis aims to provide meaningful insights into transportation dynamics in New York City.


In [None]:
!pip install pyspark shapely



**Initializing Spark and Loading Data**
In this section, we begin by initializing a Spark session, enabling us to use
Apache Spark's data processing capabilities. Next, we load the taxi trip data from a CSV file. The header=True option tells Spark that the first row contains column names, and inferSchema=True allows Spark to automatically detect the data types of each column



In [None]:
# Initialize Spark session
spark = SparkSession.builder.appName("NYC Taxi Data Analysis").getOrCreate()
# Load taxi data CSV file
taxi_df = spark.read.csv('/content/sample_data/Sample NYC Data.csv', header=True, inferSchema=True)
# Display first few rows to check the data
taxi_df.show(5)

+--------------------+--------------------+---------+---------+------------------+---------------+----------------+---------------+----------------+---------------+-----------------+----------------+
|           medallion|        hack_license|vendor_id|rate_code|store_and_fwd_flag|pickup_datetime|dropoff_datetime|passenger_count|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|
+--------------------+--------------------+---------+---------+------------------+---------------+----------------+---------------+----------------+---------------+-----------------+----------------+
|89D227B655E5C82AE...|BA96DE419E711691B...|      CMT|        1|                 N| 01-01-13 15:11|  01-01-13 15:18|              4|      -73.978165|      40.757977|       -73.989838|       40.751171|
|0BD7C8F5BA12B88E0...|9FD8F69F0804BDB55...|      CMT|        1|                 N| 06-01-13 00:18|  06-01-13 00:22|              1|      -74.006683|      40.731781|       -73.994499|        40.75066|


 **Data Cleaning: Filtering Invalid and Inconsistent Entries**
In this section, we clean the taxi data by applying two filters:

Filter out invalid drop-off locations: We remove rows where both



1.   **Filter out invalid drop-off locations:** We remove rows where both dropoff_longitude and dropoff_latitude are set to 0, which usually indicates missing or incorrect data
*   **Filter out inconsistent time entries:** We ensure that the pickup_datetime occurs before the dropoff_datetime for each trip, which removes records with incorrect timestamps.





In [None]:
from pyspark.sql import functions as F
from pyspark.sql.functions import unix_timestamp
# Filter rows where dropoff_longitude and dropoff_latitude are not equal to 0
taxi_data_cleaned = taxi_df.filter(~((F.col('dropoff_longitude') == 0) & (F.col('dropoff_latitude') == 0)))
# Filter so that pickup_datetime is before dropoff_datetime
taxi_data_cleaned = taxi_data_cleaned.filter(F.col("pickup_datetime") < F.col("dropoff_datetime") )
# Show the cleaned DataFrame
taxi_data_cleaned.show(5)

+--------------------+--------------------+---------+---------+------------------+---------------+----------------+---------------+----------------+---------------+-----------------+----------------+
|           medallion|        hack_license|vendor_id|rate_code|store_and_fwd_flag|pickup_datetime|dropoff_datetime|passenger_count|pickup_longitude|pickup_latitude|dropoff_longitude|dropoff_latitude|
+--------------------+--------------------+---------+---------+------------------+---------------+----------------+---------------+----------------+---------------+-----------------+----------------+
|89D227B655E5C82AE...|BA96DE419E711691B...|      CMT|        1|                 N| 01-01-13 15:11|  01-01-13 15:18|              4|      -73.978165|      40.757977|       -73.989838|       40.751171|
|0BD7C8F5BA12B88E0...|9FD8F69F0804BDB55...|      CMT|        1|                 N| 06-01-13 00:18|  06-01-13 00:22|              1|      -74.006683|      40.731781|       -73.994499|        40.75066|


**Duration Calculation**
In this section, we perform the following steps:



1.   **Date column conversion:** We convert the pickup_datetime and dropoff_datetime columns to the dd-MM-yy HH:mm format using to_timestamp. This ensures that the date and time values are interpreted correctly.
2.   **Trip duration calculation:** We add a Duration column that represents the trip duration in seconds, calculated as the difference between dropoff_datetime and pickup_datetime. Then, we add a Duration_Minutes column to express this duration in minutes.
3.   **Filtering valid records:** We keep only trips with durations between 0 and 14,400 seconds (4 hours) to eliminate outliers.


Finally, we display the first 10 rows with the new Duration and Duration_Minutes columns to verify the result.





In [None]:
from pyspark.sql import functions as F

#Convert date columns with the correct format
taxi_data_cleaned = taxi_data_cleaned.withColumn(
    "pickup_datetime",
    F.to_timestamp(F.col("pickup_datetime"), "dd-MM-yy HH:mm")
).withColumn(
    "dropoff_datetime",
    F.to_timestamp(F.col("dropoff_datetime"), "dd-MM-yy HH:mm")
)
# calculate the duration in seconds
taxi_data_cleaned = taxi_data_cleaned.withColumn(
    "Duration",
    F.unix_timestamp("dropoff_datetime") - F.unix_timestamp("pickup_datetime")
).withColumn(
    "Duration_Minutes",
    F.col("Duration") / 60  # Convert duration to minutes
)
# Filter valid records
taxi_data_cleaned = taxi_data_cleaned.filter(
    (F.col("Duration") >= 0) & (F.col("Duration") <= 14400)  # 14400 seconds = 4 hours
)
# Show final results
taxi_data_cleaned.select("pickup_datetime", "dropoff_datetime", "Duration", "Duration_Minutes").show(10)


+-------------------+-------------------+--------+----------------+
|    pickup_datetime|   dropoff_datetime|Duration|Duration_Minutes|
+-------------------+-------------------+--------+----------------+
|2013-01-01 15:11:00|2013-01-01 15:18:00|     420|             7.0|
|2013-01-06 00:18:00|2013-01-06 00:22:00|     240|             4.0|
|2013-01-05 18:49:00|2013-01-05 18:54:00|     300|             5.0|
|2013-01-07 23:54:00|2013-01-07 23:58:00|     240|             4.0|
|2013-01-07 23:25:00|2013-01-07 23:34:00|     540|             9.0|
|2013-01-07 15:27:00|2013-01-07 15:38:00|     660|            11.0|
|2013-01-08 11:01:00|2013-01-08 11:08:00|     420|             7.0|
|2013-01-07 12:39:00|2013-01-07 13:10:00|    1860|            31.0|
|2013-01-07 18:15:00|2013-01-07 18:20:00|     300|             5.0|
|2013-01-07 15:33:00|2013-01-07 15:49:00|     960|            16.0|
+-------------------+-------------------+--------+----------------+
only showing top 10 rows



**Query** 1 : **Taxi Utilization Analysis**

In this section, we analyze the usage of taxis by calculating the waiting time between trips and the duration of rides. We determine the idle time between trips and filter the records to keep only those with reasonable waiting times. By aggregating this data, we assess the efficiency of taxis through a utilization metric. This analysis helps us understand their operational performance and identify optimization opportunities. Finally, we present the results for each taxi, including the total trip duration, total waiting time, and utilization rate.

In [None]:
from pyspark.sql import SparkSession, Window
from pyspark.sql import functions as F
from pyspark.sql.functions import col, unix_timestamp, lag, sum as spark_sum

taxi_data_cleaned = taxi_data_cleaned.withColumn("pickup_datetime", F.to_timestamp("pickup_datetime")) \
                                       .withColumn("dropoff_datetime", F.to_timestamp("dropoff_datetime"))

# Define window partitioned by 'medallion' and ordered by 'pickup_datetime'
window_spec = Window.partitionBy("medallion").orderBy("pickup_datetime")

# Create a window partitioned by 'medallion' and ordered by 'pickup_datetime'
windowSpec = Window.partitionBy("medallion").orderBy("pickup_datetime")

# Calculate the waiting time (inactivity) between the dropoff of one trip and the pickup of the next
taxi_data_cleaned = taxi_data_cleaned.withColumn(
    "next_pickup_datetime",
    F.lead("pickup_datetime").over(windowSpec)
).withColumn(
    "idle_time",
    (F.unix_timestamp("next_pickup_datetime") - F.unix_timestamp("dropoff_datetime")) / 60
)
# Filter idle times greater than 4 hours (240 minutes)
taxi_data_cleaned = taxi_data_cleaned.filter(col("idle_time") <= 240)

# Calculate travel time
taxi_data_cleaned = taxi_data_cleaned.withColumn("trip_duration",
                                             (unix_timestamp("dropoff_datetime") - unix_timestamp("pickup_datetime")) / 60)  # Durée en minutes

# Calculate total travel time and idle time per taxi
utilization_df = taxi_data_cleaned.groupBy("medallion") \
    .agg(
        spark_sum("trip_duration").alias("total_trip_duration"),
        spark_sum("idle_time").alias("total_idle_time")
    ).withColumn(
        "utilization",
        F.when((col("total_trip_duration") + col("total_idle_time")) > 0,
                col("total_trip_duration") / (col("total_trip_duration") + col("total_idle_time"))
        ).otherwise(0)
    )

#Show results
utilization_df.select("medallion", "total_trip_duration", "total_idle_time", "utilization").show()


+--------------------+-------------------+---------------+-------------------+
|           medallion|total_trip_duration|total_idle_time|        utilization|
+--------------------+-------------------+---------------+-------------------+
|000318C2E3E638158...|              107.0|          120.0| 0.4713656387665198|
|002B4CFC5B8920A87...|              163.0|          295.0| 0.3558951965065502|
|002E3B405B6ABEA23...|              161.0|          269.0| 0.3744186046511628|
|0030AD2648D81EE87...|                9.0|           12.0|0.42857142857142855|
|0035520A854E4F276...|              131.0|          248.0|0.34564643799472294|
|0036961468659D0BF...|              177.0|          329.0|0.34980237154150196|
|003889E315BFDD985...|               72.0|          158.0| 0.3130434782608696|
|0038EF45118925A51...|              166.0|          252.0|0.39712918660287083|
|003D87DB553C6F00F...|              194.0|          203.0|0.48866498740554154|
|003EEA559FA618008...|              235.0|          

**Data Enrichment**

In this data enrichment phase, we loaded the geographical information of New York's boroughs from a GeoJSON file. This file contains polygons representing the boundaries of each borough, allowing us to identify the borough corresponding to each taxi trip's coordinates. After loading the data, we broadcasted it to all Spark workers to ensure quick access during calculations

In [None]:
# Loading the geojson file
import json
with open('/content/sample_data/nyc-boroughs.geojson', 'r') as file:
    geojson_data = json.load(file)

In [None]:
# Broadcasting the geojson data to the workers
broadcast_geojson = spark.sparkContext.broadcast(geojson_data)

# Verify that the broadcast has been done
assert broadcast_geojson.value == geojson_data, "Broadcasting failed!"

In [None]:
from shapely.geometry import Point, shape
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

def preprocess_geojson(geojson_boroughs: dict) -> list:
    """
    Preprocesses the geojson data to create a sorted list of polygons.

    Args:
    - geojson_boroughs (dict): The geojson data containing polygons and their properties.

    Returns:
    - list: A sorted list of tuples where each tuple contains a polygon and its associated properties.
    """

    polygons_with_props = [
        (shape(feature['geometry']), feature['properties'])
        for feature in geojson_boroughs['features']
    ]
    return sorted(polygons_with_props, key=lambda x: (x[1]['boroughCode'], -x[0].area))

def get_borough(longitude: float, latitude: float, sorted_polygons: list) -> str:
    """
    Return the borough name for a given longitude and latitude.

    Args:
    - longitude (float): Longitude of the location.
    - latitude (float): Latitude of the location.
    - sorted_polygons (list): List of preprocessed polygons for lookup.

    Returns:
    - str: Name of the borough if the point falls within a polygon, otherwise None.
    """
    point = Point(longitude, latitude)
    for polygon, properties in sorted_polygons:
        if polygon.contains(point):
            return properties['borough']
    return None
# Preprocess polygons and then broadcast
sorted_polygons = preprocess_geojson(broadcast_geojson.value)
sorted_polygons_broadcast = spark.sparkContext.broadcast(sorted_polygons)

# Convert the get_borough function to a UDF using lambda to pass the broadcasted sorted_polygons
borough_udf = udf(lambda lon, lat: get_borough(lon, lat, sorted_polygons_broadcast.value), StringType())

In [None]:
# Use withColumn for pickup_borough
taxi_data_cleaned = (taxi_data_cleaned
                      .withColumn("pickup_borough",
                                  borough_udf(col("pickup_longitude"), col("pickup_latitude"))))

# Use withColumn again for dropoff_borough
taxi_data_cleaned = (taxi_data_cleaned
                      .withColumn("dropoff_borough",
                                  borough_udf(col("dropoff_longitude"), col("dropoff_latitude"))))

# Show results
taxi_data_cleaned.limit(5).select("pickup_borough", "dropoff_borough").show(truncate=False)

+--------------+---------------+
|pickup_borough|dropoff_borough|
+--------------+---------------+
|Manhattan     |Manhattan      |
|Manhattan     |Manhattan      |
|Manhattan     |Manhattan      |
|Manhattan     |Manhattan      |
|Manhattan     |Manhattan      |
+--------------+---------------+



In [None]:
columns_to_drop = ['pickup_longitude', 'pickup_latitude', 'dropoff_longitude', 'dropoff_latitude']
taxi_data_cleaned = taxi_data_cleaned.drop(*columns_to_drop)

In [None]:
# Display distinct pickup boroughs
taxi_data_cleaned.select("pickup_borough").distinct().show()

# Display distinct dropoff boroughs
taxi_data_cleaned.select("dropoff_borough").distinct().show()

+--------------+
|pickup_borough|
+--------------+
|        Queens|
|      Brooklyn|
| Staten Island|
|     Manhattan|
|         Bronx|
|          NULL|
+--------------+

+---------------+
|dropoff_borough|
+---------------+
|         Queens|
|       Brooklyn|
|  Staten Island|
|      Manhattan|
|          Bronx|
|           NULL|
+---------------+



In [None]:
# Count rows where pickup_borough is "null"
pickup_null_count = taxi_data_cleaned.filter(taxi_data_cleaned["pickup_borough"].isNull()).count()

# Count rows where dropoff_borough is "null"
dropoff_null_count = taxi_data_cleaned.filter(taxi_data_cleaned["dropoff_borough"].isNull()).count()

print(f"Number of rows with 'null' pickup_borough: {pickup_null_count}")
print(f"Number of rows with 'null' dropoff_borough: {dropoff_null_count}")


Number of rows with 'null' pickup_borough: 163
Number of rows with 'null' dropoff_borough: 369


In [None]:
taxi_data_cleaned = taxi_data_cleaned.filter(
    (taxi_data_cleaned["pickup_borough"].isNotNull()) &
    (taxi_data_cleaned["dropoff_borough"].isNotNull())
)

In [None]:
# Display distinct pickup boroughs
taxi_data_cleaned.select("pickup_borough").distinct().show()

# Display distinct dropoff boroughs
taxi_data_cleaned.select("dropoff_borough").distinct().show()

+--------------+
|pickup_borough|
+--------------+
|        Queens|
|      Brooklyn|
| Staten Island|
|     Manhattan|
|         Bronx|
+--------------+

+---------------+
|dropoff_borough|
+---------------+
|         Queens|
|       Brooklyn|
|  Staten Island|
|      Manhattan|
|          Bronx|
+---------------+



In [None]:
# Shape after cleaning
print(taxi_data_cleaned.count())

88466


**Query 2: Average Time to Find the Next Fare**

In this section, we calculate the average time a taxi takes to find its next fare, broken down by destination borough. To achieve this, we use the lead function to determine the pickup time of the next trip for each taxi. We then measure the time difference, in seconds, between the end of the current trip (drop-off) and the start of the next one (pickup) to obtain the waiting time. The data is grouped by taxi identifier (hack_license) and drop-off borough to calculate the average waiting time for each taxi in each borough.

**The objective** of this analysis is to understand the operational efficiency of taxis by identifying the average waiting times for finding a new fare in different boroughs, which can help optimize deployment strategies and enhance the customer experience

In [None]:
from pyspark.sql import Window
from pyspark.sql.functions import lead, col, avg, when

# Define a window specification
windowSpec = Window.partitionBy("hack_license").orderBy("pickup_datetime")

# Get the pickup timestamp of the next trip for each taxi's row
taxi_data_cleaned = taxi_data_cleaned.withColumn("next_pickup_timestamp",
                                                  lead("pickup_datetime").over(windowSpec))

# Compute the time difference in seconds between dropoff and the next pickup
taxi_data_cleaned = taxi_data_cleaned.withColumn("waiting_time",
           when(col("next_pickup_timestamp").isNotNull(),
                (col("next_pickup_timestamp").cast("long") - col("dropoff_datetime").cast("long")))
           .otherwise(None))

# Compute the average waiting time for each taxi in each dropoff borough
average_waiting_time_per_taxi_per_borough = (taxi_data_cleaned.groupBy("hack_license", "dropoff_borough")
                                .agg(avg("waiting_time").alias("avg_waiting_time_seconds"))
                                .orderBy("hack_license", "dropoff_borough"))

#Droping the no needed any longer column
taxi_data_cleaned = taxi_data_cleaned.drop("next_pickup_timestamp", "waiting_time")

# Show the results
average_waiting_time_per_taxi_per_borough.show()


+--------------------+---------------+------------------------+
|        hack_license|dropoff_borough|avg_waiting_time_seconds|
+--------------------+---------------+------------------------+
|001C8AAB90AEE49F3...|       Brooklyn|                  5940.0|
|001C8AAB90AEE49F3...|      Manhattan|                  3510.0|
|0025133AD810DBE80...|      Manhattan|                  2400.0|
|002C093A2CB9FD40C...|       Brooklyn|                   450.0|
|002C093A2CB9FD40C...|      Manhattan|                  1105.0|
|002C093A2CB9FD40C...|         Queens|                  1020.0|
|00447A6197DBB329F...|       Brooklyn|                  2940.0|
|00447A6197DBB329F...|      Manhattan|                  5460.0|
|00447A6197DBB329F...|         Queens|                  4800.0|
|0046F1E91AA13DEDE...|      Manhattan|       582.3529411764706|
|00567B1CBFD51DDFA...|      Manhattan|                   504.0|
|006114F940CB87B3A...|      Manhattan|                   603.0|
|006114F940CB87B3A...|         Queens|  

**Query 3: Intra-Borough Trips**

Dans cette section, nous déterminons le nombre de trajets de taxi qui ont commencé et terminé dans le même borough. En filtrant le jeu de données pour inclure uniquement les trajets où le borough de prise en charge correspond au borough de dépôt

In [None]:
from pyspark.sql.functions import count, col

same_borough_trips = taxi_data_cleaned.filter(col("pickup_borough") == col("dropoff_borough"))

# Count the number of these trips grouped by borough
count_same_borough_trips = same_borough_trips.agg(count("*").alias("number_of_trips"))\
                                             .orderBy("number_of_trips", ascending=False)

count_same_borough_trips.show()

+---------------+
|number_of_trips|
+---------------+
|          78976|
+---------------+



**Query 4: Inter-Borough Trips**

In this section, we analyze inter-borough trips by filtering the data to identify rides where the pickup borough differs from the drop-off borough. This allows us to count the number of trips made between different boroughs.

 **The objective of this analysis** is to identify trends in inter-borough travel in order to optimize taxi services by adjusting resources to meet passenger needs and improve transportation efficiency.

In [None]:
from pyspark.sql.functions import count, col

# Filter rows where pickup_borough is not equal to dropoff_borough
inter_borough_trips = taxi_data_cleaned.filter(col("pickup_borough") != col("dropoff_borough"))

# Count the number of these trips grouped by pickup and dropoff boroughs
count_inter_borough_trips = inter_borough_trips.agg(count("*").alias("number_of_trips"))\
                                               .orderBy("number_of_trips", ascending=False)

count_inter_borough_trips.show()

+---------------+
|number_of_trips|
+---------------+
|           9490|
+---------------+



**Conclusion**

This project highlights key trends in urban mobility, offering insights to improve the efficiency and accessibility of transportation services. By identifying patterns and mobility behaviors, this analysis provides a strong foundation for optimizing resource allocation, better meeting user needs, and enhancing the customer experience. Future analyses could expand by integrating additional data, such as peak hours and new mobility habits, to support even more precise and tailored planning