# Project 1 
### Transformation and enrichment part (points 2,3)

In [2]:
import os
from pathlib import Path
from pyspark.sql import SparkSession
import pyspark.sql.functions as F

if Path("data").exists():
    DATA_BASE = Path("data")
elif Path("../data").exists():
    DATA_BASE = Path("../data")
else:
    DATA_BASE = Path("/home/jovyan/data")

INBOX = DATA_BASE / "inbox"

spark = (
    SparkSession.builder
    .appName("Project1_Group_H")
    .enableHiveSupport()
    .config("spark.sql.autoBroadcastJoinThreshold", "10MB")
    .getOrCreate()
)

print(spark.version)

trip_files = sorted(str(p) for p in INBOX.glob("yellow_tripdata_*.parquet"))
print(f"files: {[Path(f).name for f in trip_files]}")

#read trips parquets
df_raw = (
    spark.read
    .format("parquet")
    .load(trip_files)
    .withColumn("source_file", F.input_file_name())
)

#read zones parquts
zones_raw = spark.read.parquet(str(INBOX / "taxi_zone_lookup.parquet"))

print(f"trips count: {df_raw.count()}")
print(f"zone count: {zones_raw.count()}")

4.1.0
files: ['yellow_tripdata_2025-01.parquet', 'yellow_tripdata_2025-02.parquet']
trips count: 7052769
zone count: 265


### Parse and cast types

In [3]:
df_casted = df_raw.select(
    F.col("VendorID").cast("int"),
    # timestamps
    F.col("tpep_pickup_datetime").cast("timestamp").alias("pickup_datetime"),
    F.col("tpep_dropoff_datetime").cast("timestamp").alias("dropoff_datetime"),

    # ints
    F.col("PULocationID").cast("int"),
    F.col("DOLocationID").cast("int"),
    F.col("passenger_count").cast("int"),

    # floats
    F.col("trip_distance").cast("double"),
    F.col("fare_amount").cast("double"),
    F.col("total_amount").cast("double"),

    #metadata    
    F.col("source_file"),
    F.current_timestamp().alias("ingested_at")
)

df_casted.printSchema()
df_casted.show(3, truncate=False)

root
 |-- VendorID: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- source_file: string (nullable = false)
 |-- ingested_at: timestamp (nullable = false)

+--------+-------------------+-------------------+------------+------------+---------------+-------------+-----------+------------+--------------------------------------------------------------+--------------------------+
|VendorID|pickup_datetime    |dropoff_datetime   |PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|total_amount|source_file                                                   |ingested_at               |
+--------+-------------------+-----------------

In [4]:
zones_raw.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows


### Clean 

Rules which we discussed during meetup.

| # | Rule | Column(s) | Condition kept |
|---|------|-----------|----------------|
| 1 | Non-null timestamps | `pickup_datetime`, `dropoff_datetime` | `isNotNull()` |
| 2 | Chronological trip | both timestamps | `dropoff > pickup` |
| 3 | Positive distance | `trip_distance` | `> 0` |
| 4 | Valid passenger count | `passenger_count` | `> 0` |
| 5 | Positive fare | `total_amount`, `fare_amount` | `> 0` |
| 6 | Valid location IDs | `PULocationID`, `DOLocationID` | `isNotNull()` |


In [5]:
df_cleaned = df_casted.filter(
    F.col("pickup_datetime").isNotNull() &
    F.col("dropoff_datetime").isNotNull() &
    (F.col("dropoff_datetime") > F.col("pickup_datetime")) &
    (F.col("trip_distance") > 0) &
    (F.col("passenger_count") > 0) &
    ((F.col("total_amount") > 0) & (F.col("fare_amount") > 0)) &
    F.col("PULocationID").isNotNull() &
    F.col("DOLocationID").isNotNull()
)

raw_count = df_casted.count()
after_clean_count = df_cleaned.count()

print(f"Rows input: {raw_count}")
print(f"Rows after clean: {after_clean_count}(removed {raw_count - after_clean_count})")

Rows input: 7052769
Rows after clean: 5472446(removed 1580323)


In [6]:
print("trip_distance <= 0:")
df_casted.filter(
    F.col("trip_distance") <= 0
).select(
    "pickup_datetime", "dropoff_datetime", "trip_distance", "passenger_count", "total_amount"
).show(2, truncate=False)

print("passenger_count < 1:")
df_casted.filter(
    F.col("passenger_count") < 1
).select(
    "pickup_datetime", "dropoff_datetime", "trip_distance", "passenger_count", "total_amount"
).show(2, truncate=False)

print("dropoff_datetime <= pickup_datetime:")
df_casted.filter(
    F.col("pickup_datetime").isNotNull() &
    (F.col("dropoff_datetime") <= F.col("pickup_datetime"))
).select(
    "pickup_datetime", "dropoff_datetime", "trip_distance", "passenger_count"
).show(1, truncate=False)
df_casted.filter(
    F.col("pickup_datetime").isNotNull() &
    (F.col("dropoff_datetime") < F.col("pickup_datetime"))
).select(
    "pickup_datetime", "dropoff_datetime", "trip_distance", "passenger_count"
).show(1, truncate=False)

trip_distance <= 0:
+-------------------+-------------------+-------------+---------------+------------+
|pickup_datetime    |dropoff_datetime   |trip_distance|passenger_count|total_amount|
+-------------------+-------------------+-------------+---------------+------------+
|2025-02-01 00:30:36|2025-02-01 00:31:28|0.0          |1              |8.75        |
|2025-02-01 00:35:53|2025-02-01 00:35:58|0.0          |1              |17.25       |
+-------------------+-------------------+-------------+---------------+------------+
only showing top 2 rows
passenger_count < 1:
+-------------------+-------------------+-------------+---------------+------------+
|pickup_datetime    |dropoff_datetime   |trip_distance|passenger_count|total_amount|
+-------------------+-------------------+-------------+---------------+------------+
|2025-02-01 00:06:09|2025-02-01 00:11:51|0.4          |0              |13.25       |
|2025-02-01 00:15:13|2025-02-01 00:20:19|0.7          |0              |14.95       |


# Do we need to clean zones as well? How should we handle them?

### Deduplication
**Key:** `(pickup_datetime, dropoff_datetime, PULocationID, DOLocationID, trip_distance, vendorID)`  
Two real trips cannot share the same origin, destination, exact times, and distance.


### Is this wrong? is the dataset perfect or am i doing it wrong?

In [7]:
deduplication_key = [
    "pickup_datetime",
    "dropoff_datetime",
    "PULocationID",
    "DOLocationID",
    "trip_distance",
    "VendorID",
]

df_dedup = df_cleaned.dropDuplicates(deduplication_key)

after_dedup_count = df_dedup.count()
print(f"Rows after clean: {after_clean_count}")
print(f"Rows after dedup: {after_dedup_count}(removed {after_clean_count - after_dedup_count} duplicates)")

Rows after clean: 5472446
Rows after dedup: 5472446(removed 0 duplicates)


### derived columns

In [8]:
df_derived = df_dedup.selectExpr(
    "*",
    "round((unix_timestamp(dropoff_datetime) - unix_timestamp(pickup_datetime)) / 60.0, 2) AS trip_duration_minutes",
    "to_date(pickup_datetime) AS pickup_date"
)

df_derived.select(
    "pickup_datetime", "dropoff_datetime",
    "trip_distance", "trip_duration_minutes", "pickup_date"
).show(5, truncate=False)

+-------------------+-------------------+-------------+---------------------+-----------+
|pickup_datetime    |dropoff_datetime   |trip_distance|trip_duration_minutes|pickup_date|
+-------------------+-------------------+-------------+---------------------+-----------+
|2025-02-01 00:26:21|2025-02-01 00:36:43|2.65         |10.37                |2025-02-01 |
|2025-02-01 00:13:13|2025-02-01 00:31:36|2.28         |18.38                |2025-02-01 |
|2025-02-01 00:19:55|2025-02-01 00:40:20|3.71         |20.42                |2025-02-01 |
|2025-02-01 00:03:37|2025-02-01 00:18:05|3.41         |14.47                |2025-02-01 |
|2025-02-01 00:29:11|2025-02-01 00:38:09|2.63         |8.97                 |2025-02-01 |
+-------------------+-------------------+-------------+---------------------+-----------+
only showing top 5 rows


### Zone Enrichment
Cast column names for the zones once, then create separate pickup and dropoff views. the zone table has 265 rows therefore smaller than main data aand we'll use broacast join



In [9]:
zones = zones_raw.select(
    F.col("LocationID").cast("int"),
    F.col("Zone").alias("zone_name"),
    F.col("Borough").alias("borough")
)

pickup_zones = zones.select(
    F.col("LocationID").alias("PULocationID"),
    F.col("zone_name").alias("pickup_zone"),
    F.col("borough").alias("pickup_borough")
)

dropoff_zones = zones.select(
    F.col("LocationID").alias("DOLocationID"),
    F.col("zone_name").alias("dropoff_zone"),
    F.col("borough").alias("dropoff_borough")
)

print("autoBroadcastJoinThreshold:", spark.conf.get("spark.sql.autoBroadcastJoinThreshold"))

df_enriched = (
    df_derived
    .join(F.broadcast(pickup_zones), on="PULocationID", how="left")
    .join(F.broadcast(dropoff_zones), on="DOLocationID", how="left")
)

df_enriched.select(
    "pickup_datetime", 
    "PULocationID", 
    "pickup_zone", 
    "pickup_borough",
    "DOLocationID", 
    "dropoff_zone", 
    "dropoff_borough"
).show(5, truncate=False)

autoBroadcastJoinThreshold: 10MB
+-------------------+------------+------------+--------------+------------+------------------------+---------------+
|pickup_datetime    |PULocationID|pickup_zone |pickup_borough|DOLocationID|dropoff_zone            |dropoff_borough|
+-------------------+------------+------------+--------------+------------+------------------------+---------------+
|2025-02-01 00:26:21|79          |East Village|Manhattan     |161         |Midtown Center          |Manhattan      |
|2025-02-01 00:13:13|249         |West Village|Manhattan     |87          |Financial District North|Manhattan      |
|2025-02-01 00:19:55|48          |Clinton East|Manhattan     |87          |Financial District North|Manhattan      |
|2025-02-01 00:03:37|90          |Flatiron    |Manhattan     |236         |Upper East Side North   |Manhattan      |
|2025-02-01 00:29:11|170         |Murray Hill |Manhattan     |236         |Upper East Side North   |Manhattan      |
+-------------------+----------

### End result

Required output fields must include at least:
* pickup and dropoff timestamps
* pickup and dropoff LocationID
* pickup and dropoff zone name (from lookup)
* passenger_count, trip_distance
* derived: trip_duration_minutes, pickup_date
* metadata: source_file, ingested_at

In [13]:
df_output = df_enriched.select(
    "VendorID",
    "pickup_datetime",
    "dropoff_datetime",
    "PULocationID",
    "DOLocationID",
    "pickup_zone",
    "pickup_borough",
    "dropoff_zone",
    "dropoff_borough",
    "passenger_count",
    "trip_distance",
    "fare_amount",
    "total_amount",
    "trip_duration_minutes",
    "pickup_date",
    "source_file",
    "ingested_at"
)

df_output.printSchema()
df_output.show(5, truncate=False)

final_count = df_output.count()
print(f"Input(raw): {raw_count}")
print(f"After cleaning: {after_clean_count}")
print(f"After dedup: {after_dedup_count}")
print(f"Final output: {final_count}")

root
 |-- VendorID: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- pickup_zone: string (nullable = true)
 |-- pickup_borough: string (nullable = true)
 |-- dropoff_zone: string (nullable = true)
 |-- dropoff_borough: string (nullable = true)
 |-- passenger_count: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- trip_duration_minutes: decimal(24,2) (nullable = true)
 |-- pickup_date: date (nullable = true)
 |-- source_file: string (nullable = false)
 |-- ingested_at: timestamp (nullable = false)

+--------+-------------------+-------------------+------------+------------+--------------+--------------+--------------+---------------+---------------+-------------+-----------+------------+------------------

In [12]:
df_output.explain(True)

== Parsed Logical Plan ==
'Project ['VendorID, 'pickup_datetime, 'dropoff_datetime, 'PULocationID, 'DOLocationID, 'pickup_zone, 'pickup_borough, 'dropoff_zone, 'dropoff_borough, 'passenger_count, 'trip_distance, 'fare_amount, 'total_amount, 'trip_duration_minutes, 'pickup_date, 'source_file, 'ingested_at]
+- Project [DOLocationID#64, PULocationID#63, VendorID#62, pickup_datetime#59, dropoff_datetime#60, passenger_count#65, trip_distance#66, fare_amount#67, total_amount#68, source_file#21, ingested_at#61, trip_duration_minutes#230, pickup_date#231, pickup_zone#262, pickup_borough#263, dropoff_zone#265, dropoff_borough#266]
   +- Join LeftOuter, (DOLocationID#64 = DOLocationID#264)
      :- Project [PULocationID#63, VendorID#62, pickup_datetime#59, dropoff_datetime#60, DOLocationID#64, passenger_count#65, trip_distance#66, fare_amount#67, total_amount#68, source_file#21, ingested_at#61, trip_duration_minutes#230, pickup_date#231, pickup_zone#262, pickup_borough#263]
      :  +- Join Left