In [1]:
from pyspark.sql import SparkSession
spark = (SparkSession.builder.appName("VSCodeTest")
         .master("local[*]")
         .config("spark.sql.adaptive.enabled","true").getOrCreate())
spark.sparkContext.setLogLevel("ERROR")
spark.range(10).show()


Using Spark's default log4j profile: org/apache/spark/log4j2-defaults.properties
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/11/10 15:54:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


+---+
| id|
+---+
|  0|
|  1|
|  2|
|  3|
|  4|
|  5|
|  6|
|  7|
|  8|
|  9|
+---+



In [8]:
from pyspark.sql.functions import col, lit
from pyspark.sql import DataFrame

CANONICAL_TYPES = {
    "VendorID": "int",
    "tpep_pickup_datetime": "timestamp",
    "tpep_dropoff_datetime": "timestamp",
    "passenger_count": "int",
    "trip_distance": "double",
    "RatecodeID": "int",
    "store_and_fwd_flag": "string",
    "PULocationID": "int",
    "DOLocationID": "int",
    "payment_type": "int",
    "fare_amount": "double",
    "extra": "double",
    "mta_tax": "double",
    "tip_amount": "double",
    "tolls_amount": "double",
    "improvement_surcharge": "double",
    "total_amount": "double",
    "congestion_surcharge": "double",
    "airport_fee": "double",  
}

def normalize_columns(df: DataFrame) -> DataFrame:
    if "Airport_fee" in df.columns and "airport_fee" not in df.columns:
        df = df.withColumnRenamed("Airport_fee", "airport_fee")

    for c, t in CANONICAL_TYPES.items():
        if c not in df.columns:
            df = df.withColumn(c, lit(None).cast(t))

    for c, t in CANONICAL_TYPES.items():
        if c in df.columns:
            df = df.withColumn(c, col(c).cast(t))

    return df.select(*CANONICAL_TYPES.keys())


In [11]:

try:
    spark.stop()
except:
    pass

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("NYCTaxi-ETL")
    .master("local[*]")                           
    .config("spark.driver.memory", "8g")          
    .config("spark.sql.adaptive.enabled", "true") 
    .config("spark.sql.shuffle.partitions", "24") 
    .config("spark.sql.files.maxPartitionBytes", "32m")  
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")


In [12]:
from pyspark.sql.functions import col, lit
from pyspark.sql import DataFrame

CANONICAL_TYPES = {
    "VendorID": "int",
    "tpep_pickup_datetime": "timestamp",
    "tpep_dropoff_datetime": "timestamp",
    "passenger_count": "int",
    "trip_distance": "double",
    "RatecodeID": "int",
    "store_and_fwd_flag": "string",
    "PULocationID": "int",
    "DOLocationID": "int",
    "payment_type": "int",
    "fare_amount": "double",
    "extra": "double",
    "mta_tax": "double",
    "tip_amount": "double",
    "tolls_amount": "double",
    "improvement_surcharge": "double",
    "total_amount": "double",
    "congestion_surcharge": "double",
    "airport_fee": "double",
}

def normalize_columns(df: DataFrame) -> DataFrame:
    if "Airport_fee" in df.columns and "airport_fee" not in df.columns:
        df = df.withColumnRenamed("Airport_fee", "airport_fee")
    for c, t in CANONICAL_TYPES.items():
        if c not in df.columns:
            df = df.withColumn(c, lit(None).cast(t))
    for c, t in CANONICAL_TYPES.items():
        if c in df.columns:
            df = df.withColumn(c, col(c).cast(t))
    return df.select(*CANONICAL_TYPES.keys())


In [13]:
from pyspark.sql.functions import to_timestamp, expr, date_format

def etl_and_write_one(path: str, out_path: str):
    df = normalize_columns(spark.read.parquet(path))

    df2 = (
        df
        .withColumn("pickup_ts",  to_timestamp(col("tpep_pickup_datetime")))
        .withColumn("dropoff_ts", to_timestamp(col("tpep_dropoff_datetime")))
        .withColumn("trip_minutes", expr("timestampdiff(MINUTE, pickup_ts, dropoff_ts)"))
        .filter((col("trip_distance") > 0) & (col("fare_amount") > 0))
        .filter((col("trip_minutes") > 0) & (col("trip_minutes") < 360))
        .withColumn("year",  date_format(col("pickup_ts"), "yyyy"))
        .withColumn("month", date_format(col("pickup_ts"), "MM"))
        .select(
            "pickup_ts","dropoff_ts","trip_minutes","passenger_count","trip_distance",
            "payment_type","fare_amount","tip_amount","tolls_amount","improvement_surcharge",
            "total_amount","congestion_surcharge","airport_fee","PULocationID","DOLocationID",
            "year","month"
        )
    )

    (df2
     .repartition(4, "year", "month")       
     .write.mode("append")
     .option("compression","snappy")
     .partitionBy("year","month")
     .parquet(out_path))


In [14]:
import glob

raw_2019 = sorted(glob.glob("../data/raw/yellow_tripdata_2019-*.parquet"))
raw_2024 = sorted(glob.glob("../data/raw/yellow_tripdata_2024-*.parquet"))
raw_paths = raw_2019 + raw_2024

out_path = "../data/processed/yellow_bronze.parquet"

for p in raw_paths:
    print(">>> processing:", p)
    etl_and_write_one(p, out_path)

print("✅ All files processed and written to", out_path)


>>> processing: ../data/raw/yellow_tripdata_2019-01.parquet


                                                                                

>>> processing: ../data/raw/yellow_tripdata_2019-02.parquet


                                                                                

>>> processing: ../data/raw/yellow_tripdata_2019-03.parquet


                                                                                

>>> processing: ../data/raw/yellow_tripdata_2019-04.parquet


                                                                                

>>> processing: ../data/raw/yellow_tripdata_2019-05.parquet


                                                                                

>>> processing: ../data/raw/yellow_tripdata_2019-06.parquet


                                                                                

>>> processing: ../data/raw/yellow_tripdata_2019-07.parquet


                                                                                

>>> processing: ../data/raw/yellow_tripdata_2019-08.parquet


                                                                                

>>> processing: ../data/raw/yellow_tripdata_2019-09.parquet


                                                                                

>>> processing: ../data/raw/yellow_tripdata_2019-10.parquet


                                                                                

>>> processing: ../data/raw/yellow_tripdata_2019-11.parquet


                                                                                

>>> processing: ../data/raw/yellow_tripdata_2019-12.parquet


                                                                                

>>> processing: ../data/raw/yellow_tripdata_2024-01.parquet


                                                                                

>>> processing: ../data/raw/yellow_tripdata_2024-02.parquet


                                                                                

>>> processing: ../data/raw/yellow_tripdata_2024-03.parquet




✅ All files processed and written to ../data/processed/yellow_bronze.parquet


                                                                                

In [15]:
bronze = spark.read.parquet("../data/processed/yellow_bronze.parquet")
bronze.printSchema()
bronze.limit(5).show(truncate=False)


from pyspark.sql.functions import col
jan2019 = bronze.filter((col("year")=="2019") & (col("month")=="01"))
jan2019.count()  


root
 |-- pickup_ts: timestamp (nullable = true)
 |-- dropoff_ts: timestamp (nullable = true)
 |-- trip_minutes: long (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- payment_type: integer (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- year: integer (nullable = true)
 |-- month: integer (nullable = true)

+-------------------+-------------------+------------+---------------+-------------+------------+-----------+----------+------------+---------------------+------------+--------------------+-----------+------------+------------+----+-----+


7584818

In [6]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("NYCTaxi-Big")
    .master("local[*]")
    .config("spark.driver.memory", "6g")          
    .config("spark.sql.adaptive.enabled", "true") 
    .config("spark.sql.shuffle.partitions", "48") 
    .config("spark.sql.files.maxPartitionBytes", "64m") 
    .getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")


Pipeline Starts here

In [16]:
try:
    spark.stop()
except:
    pass

from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("NYCTaxi-PIPELINE")
    .master("local[*]")
    .config("spark.driver.memory", "8g")          
    .config("spark.sql.adaptive.enabled", "true") 
    .config("spark.sql.shuffle.partitions", "24") 
    .config("spark.sql.files.maxPartitionBytes", "32m")  
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer")
    .getOrCreate()
)
spark.sparkContext.setLogLevel("ERROR")
print("UI:", spark.sparkContext.uiWebUrl)  


UI: http://patricks-air-4.lan:4040


In [19]:
from pyspark.sql.functions import col

bronze_path = "../data/processed/yellow_bronze.parquet"

bronze = (
    spark.read.parquet(bronze_path)
    .where(col("year").cast("string").isin("2019","2024"))
    .where(col("month").cast("string").isin("01","02","03"))
    .select(

        "pickup_ts","dropoff_ts","trip_minutes","passenger_count","trip_distance",
        "payment_type","fare_amount","tip_amount","tolls_amount","improvement_surcharge",
        "total_amount","congestion_surcharge","airport_fee","PULocationID","DOLocationID",
        "year","month"
    )
)

bronze.limit(5).show(truncate=False)
bronze.explain("formatted")   


+---------+----------+------------+---------------+-------------+------------+-----------+----------+------------+---------------------+------------+--------------------+-----------+------------+------------+----+-----+
|pickup_ts|dropoff_ts|trip_minutes|passenger_count|trip_distance|payment_type|fare_amount|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|PULocationID|DOLocationID|year|month|
+---------+----------+------------+---------------+-------------+------------+-----------+----------+------------+---------------------+------------+--------------------+-----------+------------+------------+----+-----+
+---------+----------+------------+---------------+-------------+------------+-----------+----------+------------+---------------------+------------+--------------------+-----------+------------+------------+----+-----+

== Physical Plan ==
* ColumnarToRow (2)
+- Scan parquet  (1)


(1) Scan parquet 
Output [17]: [pickup_ts#2960, dropoff_

In [20]:
from pyspark.sql.functions import when

df = (
    bronze
    .withColumn("trip_hours",  (col("trip_minutes")/60.0))
    .withColumn("avg_mph",     when(col("trip_hours") > 0, col("trip_distance")/col("trip_hours")).otherwise(None))
    .withColumn("tip_rate",    when(col("fare_amount") > 0, col("tip_amount")/col("fare_amount")).otherwise(None))
)

df_f = (
    df

    .filter((col("trip_distance") > 0) & (col("fare_amount") > 0))
    .filter((col("trip_minutes")  > 0) & (col("trip_minutes") < 360))
    .filter((col("passenger_count") >= 1) & (col("passenger_count") <= 6))
    .filter((col("avg_mph") >= 1) & (col("avg_mph") <= 80))
)

df_f.limit(5).show(truncate=False)
df_f.explain("formatted")   


+---------+----------+------------+---------------+-------------+------------+-----------+----------+------------+---------------------+------------+--------------------+-----------+------------+------------+----+-----+----------+-------+--------+
|pickup_ts|dropoff_ts|trip_minutes|passenger_count|trip_distance|payment_type|fare_amount|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|PULocationID|DOLocationID|year|month|trip_hours|avg_mph|tip_rate|
+---------+----------+------------+---------------+-------------+------------+-----------+----------+------------+---------------------+------------+--------------------+-----------+------------+------------+----+-----+----------+-------+--------+
+---------+----------+------------+---------------+-------------+------------+-----------+----------+------------+---------------------+------------+--------------------+-----------+------------+------------+----+-----+----------+-------+--------+

== Phys

In [22]:
from pyspark.sql.functions import col, when

bronze_path = "../data/processed/yellow_bronze.parquet"

bronze = (
    spark.read.parquet(bronze_path)
    .where(col("year").cast("string").isin("2019","2024"))
    .where(col("month").cast("string").isin("01","02","03"))
    .select(
        "pickup_ts","dropoff_ts","trip_minutes","passenger_count","trip_distance",
        "payment_type","fare_amount","tip_amount","tolls_amount","improvement_surcharge",
        "total_amount","congestion_surcharge","airport_fee","PULocationID","DOLocationID",
        "year","month"
    )
)

df = (
    bronze
    .withColumn("trip_hours",  col("trip_minutes") / 60.0)
    .withColumn("avg_mph",     when(col("trip_hours") > 0, col("trip_distance")/col("trip_hours")))
    .withColumn("tip_rate",    when(col("fare_amount") > 0, col("tip_amount")/col("fare_amount")))
)


df_f = (
    df
    .filter((col("trip_distance") > 0) & (col("fare_amount") > 0))
    .filter((col("trip_minutes")  > 0) & (col("trip_minutes") < 360))    
    .filter((col("passenger_count") >= 1) & (col("passenger_count") <= 6))
    .filter((col("avg_mph") >= 1) & (col("avg_mph") <= 80))              
)

df_f.limit(5).show(truncate=False)
df_f.explain("formatted")  


+---------+----------+------------+---------------+-------------+------------+-----------+----------+------------+---------------------+------------+--------------------+-----------+------------+------------+----+-----+----------+-------+--------+
|pickup_ts|dropoff_ts|trip_minutes|passenger_count|trip_distance|payment_type|fare_amount|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|PULocationID|DOLocationID|year|month|trip_hours|avg_mph|tip_rate|
+---------+----------+------------+---------------+-------------+------------+-----------+----------+------------+---------------------+------------+--------------------+-----------+------------+------------+----+-----+----------+-------+--------+
+---------+----------+------------+---------------+-------------+------------+-----------+----------+------------+---------------------+------------+--------------------+-----------+------------+------------+----+-----+----------+-------+--------+

== Phys

In [23]:
from pyspark.sql import functions as F

monthly_kpi = (
    df_f.groupBy("year","month","payment_type")
        .agg(
            F.count("*").alias("trips"),
            F.avg("trip_distance").alias("avg_dist"),
            F.avg("tip_rate").alias("avg_tip_rate"),
            F.sum("total_amount").alias("revenue"),
            F.sum("tolls_amount").alias("tolls")
        )
        .orderBy("year","month","payment_type")
)

monthly_kpi.show(12, truncate=False)  
monthly_kpi.explain("formatted")      

dist_quantiles = (
    df_f.groupBy("year","month")
        .agg(
            F.expr("percentile_approx(trip_distance, 0.5, 1000)").alias("p50_dist"),
            F.expr("percentile_approx(trip_distance, 0.95, 1000)").alias("p95_dist"),
            F.approx_count_distinct(F.struct("PULocationID","DOLocationID")).alias("unique_od_pairs")
        )
        .orderBy("year","month")
)

dist_quantiles.show(truncate=False)
dist_quantiles.explain("formatted")   


+----+-----+------------+-----+--------+------------+-------+-----+
|year|month|payment_type|trips|avg_dist|avg_tip_rate|revenue|tolls|
+----+-----+------------+-----+--------+------------+-------+-----+
+----+-----+------------+-----+--------+------------+-------+-----+

== Physical Plan ==
AdaptiveSparkPlan (9)
+- Sort (8)
   +- Exchange (7)
      +- HashAggregate (6)
         +- Exchange (5)
            +- HashAggregate (4)
               +- Project (3)
                  +- Filter (2)
                     +- Scan parquet  (1)


(1) Scan parquet 
Output [10]: [trip_minutes#3097L, passenger_count#3098, trip_distance#3099, payment_type#3100, fare_amount#3101, tip_amount#3102, tolls_amount#3103, total_amount#3105, year#3110, month#3111]
Batched: true
Location: InMemoryFileIndex [file:/Users/patrickzhu/Desktop/pyspark_pipeline_project/data/processed/yellow_bronze.parquet]
PartitionFilters: [cast(year#3110 as string) IN (2019,2024), cast(month#3111 as string) IN (01,02,03)]
PushedFilters:

In [24]:
df_f.createOrReplaceTempView("trips")

sql1 = spark.sql("""
SELECT year, month, payment_type,
       COUNT(*) AS trips,
       ROUND(AVG(tip_rate), 3) AS avg_tip_rate,
       ROUND(AVG(avg_mph),  2) AS avg_speed_mph
FROM trips
GROUP BY year, month, payment_type
ORDER BY year, month, payment_type
""")
sql1.show(truncate=False)
sql1.explain("formatted")             


sql2 = spark.sql("""
SELECT year, month, PULocationID, DOLocationID,
       COUNT(*) AS trips
FROM trips
GROUP BY year, month, PULocationID, DOLocationID
ORDER BY trips DESC
LIMIT 20
""")
sql2.show(truncate=False)
sql2.explain("formatted")               


+----+-----+------------+-----+------------+-------------+
|year|month|payment_type|trips|avg_tip_rate|avg_speed_mph|
+----+-----+------------+-----+------------+-------------+
+----+-----+------------+-----+------------+-------------+

== Physical Plan ==
AdaptiveSparkPlan (10)
+- Sort (9)
   +- Exchange (8)
      +- HashAggregate (7)
         +- Exchange (6)
            +- HashAggregate (5)
               +- Project (4)
                  +- Project (3)
                     +- Filter (2)
                        +- Scan parquet  (1)


(1) Scan parquet 
Output [8]: [trip_minutes#3097L, passenger_count#3098, trip_distance#3099, payment_type#3100, fare_amount#3101, tip_amount#3102, year#3110, month#3111]
Batched: true
Location: InMemoryFileIndex [file:/Users/patrickzhu/Desktop/pyspark_pipeline_project/data/processed/yellow_bronze.parquet]
PartitionFilters: [cast(year#3110 as string) IN (2019,2024), cast(month#3111 as string) IN (01,02,03)]
PushedFilters: [IsNotNull(trip_distance), IsNotNu

In [25]:
(
    monthly_kpi
    .coalesce(1)  
    .write.mode("overwrite").option("compression","snappy")
    .parquet("../data/processed/gold_monthly_kpi.parquet")
)

(
    dist_quantiles
    .coalesce(1)
    .write.mode("overwrite").option("compression","snappy")
    .parquet("../data/processed/gold_dist_quantiles.parquet")
)

print("✅ Gold outputs written.")


✅ Gold outputs written.


In [26]:
import time

df_2019_01 = df_f.filter((col("year")=="2019") & (col("month")=="01"))


t0 = time.perf_counter()
_ = (df_2019_01.groupBy("payment_type").agg(F.count("*").alias("trips"),
                                            F.avg("tip_rate").alias("avg_tip")).collect())
t1 = time.perf_counter()
print(f"First run (no cache): {t1 - t0:.2f}s")


df_2019_01.cache()
_ = df_2019_01.count()  

t2 = time.perf_counter()
_ = (df_2019_01.groupBy("payment_type").agg(F.count("*").alias("trips"),
                                            F.avg("tip_rate").alias("avg_tip")).collect())
t3 = time.perf_counter()
print(f"Second run (after cache): {t3 - t2:.2f}s")


df_2019_01.unpersist()


First run (no cache): 0.19s
Second run (after cache): 0.03s


DataFrame[pickup_ts: timestamp, dropoff_ts: timestamp, trip_minutes: bigint, passenger_count: int, trip_distance: double, payment_type: int, fare_amount: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, congestion_surcharge: double, airport_fee: double, PULocationID: int, DOLocationID: int, year: int, month: int, trip_hours: double, avg_mph: double, tip_rate: double]

In [27]:

t_demo = (bronze
          .select("trip_distance","fare_amount")
          .filter(col("trip_distance") > 0)
          .withColumn("fare_per_mile", col("fare_amount")/col("trip_distance")))


print("count():", t_demo.count())  
print("first():", t_demo.first())  


count(): 0
first(): None
