# Introduction 

This part demonstrates the use of Apache Spark RDDs (Resilient Distributed Datasets) for analyzing NYC taxi trip data from 2023. RDD are a low level Spark API, offering fine-grained control over distributed data processing through map-reduce operations.

# 1. Setup

In [1]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from operator import add # Useful for some RDD operations like reduce
import time # Necessary to measure execution time

# Initialization of the SparkSession and SparkContext
# The two lines of memory config to avoid kernel crash
spark = SparkSession.builder \
    .master('local[*]') \
    .appName("Analyse Taxis NYC RDD Simple v3 - 1 Month") \
    .config("spark.driver.memory", "6g") \
    .config("spark.executor.memory", "8g") \
    .getOrCreate()

sc = spark.sparkContext

# Display configuration information
print(f"Spark Version: {spark.version}")

Spark Version: 4.0.1


## Launch Verification

In [2]:
!pip install psutil
from psutil import virtual_memory
ram_gb = virtual_memory().total / 1e9
print('Your runtime has {:.1f} gigabytes of available RAM\n'.format(ram_gb))

Your runtime has 8.2 gigabytes of available RAM



In [3]:
spark.sparkContext.defaultParallelism

16

# 2. Data Loading

In [4]:
# Path to load all Parquet files from 2023
# Warning, do not put the January file yellow_tripdata_2023-01.parquet in the taxi folder because it has a different schema than the others.
parquet_path_all = "taxi/yellow_tripdata_2023-*.parquet"

print(f"\nLoading months of data from: {parquet_path_all}")

try:
    # 1. Load as DataFrame (Spark reads Parquet optimally)
    df_taxi = spark.read.parquet(parquet_path_all)
    print("Data loading in DataFrame completed (11 months).")

    # 2. Display schema and preview for confirmation
    print(f"\nTotal number of records (Action .count() recommended only for testing on 1 month or after a filter): {df_taxi.count():,}")
    print("\nData Schema:")
    df_taxi.printSchema()

    # 3. Convert to RDD for the RR analysis
    taxi_rdd = df_taxi.rdd
    print("\nDataFrame converted to RDD.")

    # 4. Cache (to speed up repeated RDD actions)
    taxi_rdd.cache()
    print(f"RDD cached. Number of partitions: {taxi_rdd.getNumPartitions()}")

except Exception as e:
    print(f"\nERROR: Problem loading data from 12 months.")
    print(e)
    # Stop Spark in case of a loading error
    spark.stop()
    raise SystemExit("Shutdown due to data loading error.")


Loading months of data from: taxi/yellow_tripdata_2023-*.parquet
Data loading in DataFrame completed (11 months).

Total number of records (Action .count() recommended only for testing on 1 month or after a filter): 35,243,460

Data Schema:
root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: doub

# 3. Simple Analysis with RDD

In [5]:
# This first very simple analysis allows us to check that the data is fully loaded and the spark context works as expected.
if 'taxi_rdd' in locals(): # Check if RDD has been created

    # ### Study 1: Count total number of trips (Action: count)
    print("\n--- RDD Study: Total number of trips ---")
    start_time = time.time()
    try:
        # The .count() action forces reading and returns the total number of elements.
        total_trajets = taxi_rdd.count()
        end_time = time.time()
        print(f"Total number of trips in the sample (1 month): {total_trajets:,}")
        print(f"Computation time: {end_time - start_time:.2f} seconds")
    except Exception as e:
        print(f"Study 1 failed (RDD.count): {e}")


--- RDD Study: Total number of trips ---
Total number of trips in the sample (1 month): 35,243,460
Computation time: 70.77 seconds


# 4. Analysis of Pickup Zones

In [6]:
# We can now proceed to a more complex analysis, and use the map reduce model we learned, to count and order pickup locations. 
if 'taxi_rdd' in locals(): # Ensure that the RDD is available

    # --- RDD Study: Top 10 pickup zones (PULocationID) ---
    print("\n--- RDD Study: Top 10 pickup zones (PULocationID) ---")
    start_time_rdd = time.time()

    try:
        # 1. Map: (PULocationID, 1) - Extract the ID and assign a counter of 1
        # Columns are accessible by name in the Row RDD.
        rdd_counts = taxi_rdd.map(lambda row: (row['PULocationID'], 1))

        # 2. ReduceByKey: Aggregate the number of trips by PULocationID (sum the '1's)
        rdd_reduced = rdd_counts.reduceByKey(add)

        # 3. Action (TakeOrdered): Get the 10 results with the highest value (count).
        # First sort by count (element[1]) in descending order (negative).
        # The result is a Python list [ (PULocationID, count), ... ]
        top_10_locations = rdd_reduced.takeOrdered(10, key=lambda x: -x[1])

        end_time_rdd = time.time()
        time_rdd = end_time_rdd - start_time_rdd

        # Display results
        print(f"Computation time: {time_rdd:.2f} seconds")
        print("\nTop 10 pickup zones:")
        print("| Zone ID | Number of Trips |")
        print("|---|---|\n")
        for location_id, count in top_10_locations:
            print(f"| {location_id:^7} | {count:^15,} |")

    except Exception as e:
        print(f"Study 2 failed (RDD.map/reduceByKey): {e}")


--- RDD Study: Top 10 pickup zones (PULocationID) ---
Computation time: 36.32 seconds

Top 10 pickup zones:
| Zone ID | Number of Trips |
|---|---|

|   132   |    1,832,274    |
|   237   |    1,643,721    |
|   161   |    1,630,624    |
|   236   |    1,458,193    |
|   162   |    1,248,419    |
|   138   |    1,216,071    |
|   186   |    1,195,886    |
|   230   |    1,171,690    |
|   142   |    1,155,796    |
|   170   |    1,043,327    |


# 5. Analysis of Average Speed by Hour of Day

In [6]:
from datetime import timedelta # Necessary for time calculations

if 'taxi_rdd' in locals(): # Ensure that the RDD is available

    
    print("\n--- RDD Study: Average speed by hour of day ---")
    start_time_rdd_complex = time.time()

    try:
        # A helper function to extract hour and calculate duration in seconds
        def extract_hour_and_duration(row):
            pickup_time = row['tpep_pickup_datetime']
            dropoff_time = row['tpep_dropoff_datetime']

            # Ensure that values are not null and dropoff is after pickup
            if pickup_time and dropoff_time and dropoff_time > pickup_time:
                trip_duration_seconds = (dropoff_time - pickup_time).total_seconds()
                # Filter unrealistic durations (e.g., more than 24h) and null durations
                if trip_duration_seconds > 60 and trip_duration_seconds < (24 * 3600):
                    # Key: Hour of pickup (0-23)
                    hour = pickup_time.hour
                    # Values: (Distance, Duration in hours)
                    trip_distance = row['trip_distance']
                    trip_duration_hours = trip_duration_seconds / 3600.0
                    return (hour, (trip_distance, trip_duration_hours))
            return None # Ignore invalid rows

        # 1 Map: (Hour, (Distance, Duration in hours)) + Filtering
        rdd_distance_duration = taxi_rdd.map(extract_hour_and_duration).filter(lambda x: x is not None)

        # 2 ReduceByKey: (Hour, (Total Distance, Total Duration))
        # Aggregation function: (dist1, dur1) + (dist2, dur2) = (dist1+dist2, dur1+dur2)
        rdd_totals = rdd_distance_duration.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1]))

        # 3 Map: (Hour, Average Speed) where Speed = TotalDistance / TotalDuration
        rdd_avg_speed = rdd_totals.map(lambda x: (x[0], x[1][0] / x[1][1])) # x[0] = Hour, x[1][0] = Total Dist, x[1][1] = Total Duration

        # 4 Action (Collect) and Sort: Retrieve result and sort by hour
        avg_speed_by_hour = sorted(rdd_avg_speed.collect(), key=lambda x: x[0])

        end_time_rdd_complex = time.time()
        time_rdd_complex = end_time_rdd_complex - start_time_rdd_complex

        # We display results
        print(f"Computation time: {time_rdd_complex:.2f} seconds")
        print("\nAverage Speed (mph) by Hour of Day:")
        print("| Hour | Average Speed (mph) |")
        print("|---|---|\n")
        for hour, avg_speed in avg_speed_by_hour:
            print(f"| {hour:^5} | {avg_speed:^19.2f} |")

    except Exception as e:
        print(f"Complex RDD Study failed: {e}")


--- RDD Study: Average speed by hour of day ---
Computation time: 46.37 seconds

Average Speed (mph) by Hour of Day:
| Hour | Average Speed (mph) |
|---|---|

|   0   |        15.36        |
|   1   |        14.63        |
|   2   |        19.74        |
|   3   |        20.71        |
|   4   |        41.38        |
|   5   |        41.32        |
|   6   |        32.70        |
|   7   |        22.63        |
|   8   |        15.67        |
|   9   |        15.03        |
|  10   |        12.16        |
|  11   |        12.15        |
|  12   |        11.95        |
|  13   |        12.63        |
|  14   |        12.07        |
|  15   |        12.90        |
|  16   |        11.03        |
|  17   |        12.01        |
|  18   |        11.94        |
|  19   |        13.74        |
|  20   |        13.53        |
|  21   |        14.65        |
|  22   |        15.12        |
|  23   |        15.45        |


# Conclusion 

The average speed analysis by hour of day reveals coherent and expected patterns in NYC taxi traffic. The results show higher average speeds during late-night and early-morning hours (approximately midnight to 5 AM) when the city is quieter with less traffic congestion. Conversely, lower average speeds are observed during typical rush hours and busy daytime periods when traffic density increases. This pattern aligns with real-world expectations: taxis can travel faster on less congested streets during off-peak hours. The RDD-based implementation successfully processed millions of trip records, using the map-reduce paradigm we saw in class.