# Module 6 Homework - Spark Batch Processing

In [1]:
import os
import requests
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, to_date, unix_timestamp, max
from pyspark.sql.types import *
import shutil

## Download Data

In [2]:
yellow_url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2025-11.parquet"
zone_url = "https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv"

yellow_file = "yellow_tripdata_2025-11.parquet"
zone_file = "taxi_zone_lookup.csv"

def download_file(url, filename):
    if not os.path.exists(filename):
        print(f"Downloading {filename}...")
        r = requests.get(url)
        with open(filename, "wb") as f:
            f.write(r.content)
        print("Done.")
    else:
        print(f"{filename} already exists.")

download_file(yellow_url, yellow_file)
download_file(zone_url, zone_file)

yellow_tripdata_2025-11.parquet already exists.
taxi_zone_lookup.csv already exists.


## Create Spark Session

In [3]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName("DE-Zoomcamp-HW6") \
    .getOrCreate()

print("\nSpark Version:")
print(spark.version)


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
26/03/01 10:55:41 WARN Utils: Your hostname, codespaces-58a1f5, resolves to a loopback address: 127.0.0.1; using 10.0.12.62 instead (on interface eth0)
26/03/01 10:55:41 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
26/03/01 10:55:42 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable



Spark Version:
4.1.1


## Read Yellow Data

In [4]:
df = spark.read.parquet(yellow_file)

print("\nSchema:")
df.printSchema()

print("\nTotal Records:")
print(df.count())

                                                                                


Schema:
root
 |-- VendorID: integer (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- Airport_fee: double (nullable = true)
 |-- cbd_congestion_fee: double (nullable = true)


Total Records:
4181444


## Question 2 - Repartition & Save

In [5]:
output_path = "yellow_2025_11_repartitioned"

if os.path.exists(output_path):
    shutil.rmtree(output_path)

df_repartitioned = df.repartition(4)
df_repartitioned.write.mode("overwrite").parquet(output_path)

# Calculate average file size
parquet_files = [
    os.path.join(output_path, f)
    for f in os.listdir(output_path)
    if f.endswith(".parquet")
]

sizes_mb = [os.path.getsize(f) / (1024 * 1024) for f in parquet_files]
avg_size = sum(sizes_mb) / len(sizes_mb)

print("\nQuestion 2 - Average Parquet File Size (MB):")
print(round(avg_size, 2))





Question 2 - Average Parquet File Size (MB):
25.33


                                                                                

## Question 3 - Trips on Nov 15

In [6]:
df_15 = df.filter(
    to_date(col("tpep_pickup_datetime")) == "2025-11-15"
)

count_15 = df_15.count()

print("\nQuestion 3 - Trips on 2025-11-15:")
print(count_15)


Question 3 - Trips on 2025-11-15:
162604


## Question 4 - Longest Trip (hours)

In [7]:
df_duration = df.withColumn(
    "duration_hours",
    (unix_timestamp("tpep_dropoff_datetime") -
     unix_timestamp("tpep_pickup_datetime")) / 3600
)

max_duration = df_duration.agg(
    max("duration_hours")
).collect()[0][0]

print("\nQuestion 4 - Longest Trip (hours):")
print(round(max_duration, 2))


Question 4 - Longest Trip (hours):
90.65


## Question 5 - Spark UI Port

In [9]:
sc = spark.sparkContext

ui_url = sc.uiWebUrl

print("\nQuestion 5 - Spark UI URL:")
print(ui_url)


Question 5 - Spark UI URL:
http://e4576300-65f6-46ea-a666-de14ee3b66e8.internal.cloudapp.net:4040


## Question 6 - Least Frequent Pickup Zone

In [10]:
zones = spark.read.option("header", "true").csv(zone_file)

joined = df.join(
    zones,
    df.PULocationID == zones.LocationID,
    "inner"
)

least_zone = joined.groupBy("Zone") \
    .count() \
    .orderBy("count") \
    .limit(1) \
    .collect()[0]["Zone"]

print("\nQuestion 6 - Least Frequent Pickup Zone:")
print(least_zone)

print("\n=================================")
print("Homework Execution Complete.")
print("=================================")

[Stage 15:>                                                         (0 + 2) / 2]


Question 6 - Least Frequent Pickup Zone:
Governor's Island/Ellis Island/Liberty Island

Homework Execution Complete.


                                                                                