# Module 6 Homework — Apache Spark
Data Engineering Zoomcamp 2026

Dataset: Yellow Taxi November 2025

## Setup

In [11]:
import os
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

# Absolute paths — data/ is mounted at /home/jovyan/work/data/
DATA_DIR = '/home/jovyan/work/data'
TRIPS_PATH = os.path.join(DATA_DIR, 'yellow_tripdata_2025-11.parquet')
ZONES_PATH = os.path.join(DATA_DIR, 'taxi_zone_lookup.csv')
OUTPUT_PATH = os.path.join(DATA_DIR, 'yellow_2025_11_repartitioned')

spark = SparkSession.builder \
    .master('local[*]') \
    .appName('homework6') \
    .getOrCreate()

spark.conf.set('spark.sql.repl.eagerEval.enabled', True)
spark.sparkContext.setLogLevel('WARN')
print(f'Spark version: {spark.version}')

Spark version: 3.5.0


## Question 1 — Spark version

In [12]:
print(spark.version)

3.5.0


## Question 2 — Average parquet file size
Read November 2025 Yellow Taxi data, repartition to 4 and save as parquet.

In [13]:
df = spark.read.parquet(TRIPS_PATH)
print(f'Total rows: {df.count():,}')
df.printSchema()

Total rows: 4,181,444
root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- cbd_congestion_fee: double (nullable = true)



In [14]:
df.repartition(4).write.parquet(OUTPUT_PATH, mode='overwrite')

In [15]:
parquet_files = [f for f in os.listdir(OUTPUT_PATH) if f.endswith('.parquet')]
sizes_mb = [os.path.getsize(os.path.join(OUTPUT_PATH, f)) / (1024 * 1024) for f in parquet_files]

print(f'Number of parquet files: {len(parquet_files)}')
print(f'File sizes (MB): {[round(s, 2) for s in sizes_mb]}')
print(f'Average size: {sum(sizes_mb)/len(sizes_mb):.2f} MB')

Number of parquet files: 4
File sizes (MB): [24.41, 24.4, 24.41, 24.41]
Average size: 24.41 MB


## Question 3 — Trips on November 15th

In [16]:
count_nov15 = df.filter(
    F.to_date(F.col('tpep_pickup_datetime')) == '2025-11-15'
).count()

print(f'Trips on November 15th: {count_nov15:,}')

Trips on November 15th: 162,604


## Question 4 — Longest trip in hours

In [17]:
df_duration = df.withColumn(
    'duration_hours',
    (F.unix_timestamp('tpep_dropoff_datetime') - F.unix_timestamp('tpep_pickup_datetime')) / 3600
)

max_duration = df_duration.agg(F.max('duration_hours')).collect()[0][0]
print(f'Longest trip: {max_duration:.1f} hours')

Longest trip: 90.6 hours


## Question 5 — Spark UI port

The Spark UI runs on port **4040**.

Access it at: http://localhost:4040

## Question 6 — Least frequent pickup zone

In [18]:
zones = spark.read.option('header', 'true').csv(ZONES_PATH)
zones.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



In [19]:
pickup_counts = df.groupBy('PULocationID').count()

result = pickup_counts \
    .join(zones, pickup_counts['PULocationID'] == zones['LocationID'], 'left') \
    .select('Zone', 'count') \
    .orderBy('count') \
    .limit(10)

result.show(truncate=False)

+---------------------------------------------+-----+
|Zone                                         |count|
+---------------------------------------------+-----+
|Governor's Island/Ellis Island/Liberty Island|1    |
|Arden Heights                                |1    |
|Eltingville/Annadale/Prince's Bay            |1    |
|Port Richmond                                |3    |
|Rossville/Woodrow                            |4    |
|Rikers Island                                |4    |
|Green-Wood Cemetery                          |4    |
|Great Kills                                  |4    |
|Jamaica Bay                                  |5    |
|Westerleigh                                  |12   |
+---------------------------------------------+-----+



In [20]:
spark.stop()