# Big Data Project Notebook

## Dataset

Name: NYC Taxi & Limousine Commission (TLC) Trip Record Data
Source link: [NYC TLC Trip Record Data](https://www.nyc.gov/site/tlc/about/tlc-trip-record-data.page?utm_source=chatgpt.com)

## Main Job

**Objective**: Analyze how tip generosity varies by pickup location and time of day, and identify the top pickup zones with the most generous passengers.

### Plan

#### Setup

- clean up and select relevant columns
- create derived columns (tip percentage, hour of day)

#### Shuffles

- join zones id with zone lookup table
- aggregate per location and hour
- aggregate per average tip percentage
- order by average tip percentage


# Setup

- Import libraries
- Setup Spark
- Load dataset in memory
- Setup helpers

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import col
import matplotlib.pyplot as plt
import pandas as pd
from datetime import datetime

# Initialize Spark
spark = SparkSession.builder \
    .appName("NYC Taxi Tip Analysis - Python Version") \
    .config("spark.sql.session.timeZone", "UTC") \
    .getOrCreate()

print("✅ Spark session created.")

# helpers
def safe_hour(value):
    try:
        if isinstance(value, str) and len(value) >= 13:
            return int(value[11:13])
        elif isinstance(value, datetime):
            return value.hour
        else:
            return -1
    except Exception:
        return -1

def safe_long(value):
    try:
        return int(value)
    except Exception:
        return -1

def tip_pct(fare, tip):
    return (tip / fare) * 100.0 if fare > 0 else 0.0

# Load data in memory
trips_path = "sample-data/yellow_tripdata_2022-01.parquet"
zones_path = "sample-data/taxi_zone_lookup.csv"
output_path = "output/results"

# Read parquet files
df_trips = spark.read.parquet(trips_path).select(
    col("PULocationID").cast(LongType()).alias("PULocationID"),
    col("tpep_pickup_datetime"),
    col("fare_amount"),
    col("tip_amount")
)

# Read zone lookup
df_zones = spark.read.option("header", True).csv(zones_path) \
    .select(
        col("LocationID").cast(LongType()).alias("LocationID"),
        col("Borough"),
        col("Zone")
    )

print("✅ Startup completed.")


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
25/11/11 19:55:19 WARN Utils: Your hostname, rioly, resolves to a loopback address: 127.0.1.1; using 192.168.1.7 instead (on interface wlan0)
25/11/11 19:55:19 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/11 19:55:20 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


✅ Spark session created.
✅ Startup completed.


# Main Job

## Non optimized

In [2]:
from pyspark.sql import Row
import time

t_start = time.time()

zones_list = [(r["LocationID"], (r["Borough"], r["Zone"])) for r in df_zones.collect()]
zones_map = dict(zones_list)

rdd_trips = df_trips.rdd.map(lambda r: (
    safe_long(r["PULocationID"]),
    (r["tpep_pickup_datetime"], float(r["fare_amount"] or 0.0), float(r["tip_amount"] or 0.0))
))

rdd_joined = rdd_trips.map(lambda x: (
    (x[0], *zones_map.get(x[0], ("Unknown", "Unknown")), safe_hour(x[1][0])),
    (tip_pct(x[1][1], x[1][2]), 1)
))

rdd_hour_agg = rdd_joined.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) \
                         .mapValues(lambda x: (x[0] / x[1], x[1]))

rdd_zone_agg = rdd_hour_agg.map(lambda x: ((x[0][0], x[0][1], x[0][2]), (x[1][0] * x[1][1], x[1][1]))) \
                           .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) \
                           .map(lambda x: (x[0][0], x[0][1], x[0][2], x[1][0] / x[1][1], x[1][1]))

top_zones_nonopt = rdd_zone_agg.sortBy(lambda x: x[3], ascending=False).take(20)

df_result_nonopt = spark.createDataFrame(top_zones_nonopt, ["PULocationID", "Borough", "Zone", "avg_tip_pct", "count"])
df_result_nonopt.coalesce(1).write.mode("overwrite").json(output_path + "_safe_non_optimized.json")

t_end = time.time()
t_nonopt = t_end - t_start
print(f"⏱️ Non-optimized pipeline completed in {t_nonopt:.2f}s")


[Stage 13:>                                                         (0 + 1) / 1]

⏱️ Non-optimized pipeline completed in 16.82s


                                                                                

## Optimized

In [3]:
t_start = time.time()

zones_map = {r["LocationID"]: (r["Borough"], r["Zone"]) for r in df_zones.collect()}
b_zones = spark.sparkContext.broadcast(zones_map)

rdd_trips = df_trips.rdd.map(lambda r: (
    safe_long(r["PULocationID"]),
    (r["tpep_pickup_datetime"], float(r["fare_amount"] or 0.0), float(r["tip_amount"] or 0.0))
))

rdd_enriched = rdd_trips.map(lambda x: (
    (x[0], *b_zones.value.get(x[0], ("Unknown", "Unknown")), safe_hour(x[1][0])),
    (tip_pct(x[1][1], x[1][2]), 1)
))

rdd_zone_hour_agg = rdd_enriched.reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) \
                               .map(lambda x: ((x[0][0], x[0][1], x[0][2]), x[1][0] / x[1][1]))

rdd_partitioned = rdd_zone_hour_agg.partitionBy(8).persist()

rdd_zone_agg = rdd_partitioned.map(lambda x: (x[0], (x[1], 1))) \
                              .reduceByKey(lambda a, b: (a[0] + b[0], a[1] + b[1])) \
                              .map(lambda x: (x[0][0], x[0][1], x[0][2], x[1][0] / x[1][1], x[1][1]))

top_zones_opt = rdd_zone_agg.sortBy(lambda x: x[3], ascending=False).take(20)

df_result_opt = spark.createDataFrame(top_zones_opt, ["PULocationID", "Borough", "Zone", "avg_tip_pct", "count"])
df_result_opt.coalesce(1).write.mode("overwrite").json(output_path + "_optimized.json")

t_end = time.time()
t_opt = t_end - t_start
print(f"✅ Optimized pipeline completed in {t_opt:.2f}s")


                                                                                

✅ Optimized pipeline completed in 15.26s


                                                                                

## Timings comparison

In [4]:
speedup = t_nonopt / t_opt
saved = t_nonopt - t_opt

print("=== COMPARISON RESULTS ===")
print(f"Non-optimized time: {t_nonopt:.2f}s")
print(f"Optimized time:     {t_opt:.2f}s")
print(f"Speedup:            {speedup:.2f}x")
print(f"Time saved:         {saved:.2f}s")


=== COMPARISON RESULTS ===
Non-optimized time: 16.82s
Optimized time:     15.26s
Speedup:            1.10x
Time saved:         1.56s
