In [70]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("NYC Taxi Lab").config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem").config("spark.hadoop.fs.s3a.aws.credentials.provider","com.amazonaws.auth.DefaultAWSCredentialsProviderChain").getOrCreate()

In [71]:
spark.version

'4.1.0'

In [72]:
import pandas as pd

s3_path = "s3://my-data-lake-lab-nandnioubt/raw/nyc_taxi/yellow_tripdata_2023-01.parquet"

pdf = pd.read_parquet(s3_path)

pdf.head()


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,airport_fee
0,2,2023-01-01 00:32:10,2023-01-01 00:40:36,1.0,0.97,1.0,N,161,141,2,9.3,1.0,0.5,0.0,0.0,1.0,14.3,2.5,0.0
1,2,2023-01-01 00:55:08,2023-01-01 01:01:27,1.0,1.1,1.0,N,43,237,1,7.9,1.0,0.5,4.0,0.0,1.0,16.9,2.5,0.0
2,2,2023-01-01 00:25:04,2023-01-01 00:37:49,1.0,2.51,1.0,N,48,238,1,14.9,1.0,0.5,15.0,0.0,1.0,34.9,2.5,0.0
3,1,2023-01-01 00:03:48,2023-01-01 00:13:25,0.0,1.9,1.0,N,138,7,1,12.1,7.25,0.5,0.0,0.0,1.0,20.85,0.0,1.25
4,2,2023-01-01 00:10:29,2023-01-01 00:21:19,1.0,1.43,1.0,N,107,79,1,11.4,1.0,0.5,3.28,0.0,1.0,19.68,2.5,0.0


In [73]:
pdf_5k = pdf.head(5000)

len(pdf_5k)

5000

In [74]:
#Convert to Spark DataFrame
trips_df = spark.createDataFrame(pdf_5k)

trips_df.show(5)
trips_df.count()


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2023-01-01 00:32:10|  2023-01-01 00:40:36|            1.0|         0.97|       1.0|                 N|         161|         141|           2|        9.3|  1.0|    0.5|       0.

5000

In [75]:
import pandas as pd
zones_pdf = pd.read_csv("taxi_zone_lookup.csv")
zones_pdf.head()



Unnamed: 0,LocationID,Borough,Zone,service_zone
0,1,EWR,Newark Airport,EWR
1,2,Queens,Jamaica Bay,Boro Zone
2,3,Bronx,Allerton/Pelham Gardens,Boro Zone
3,4,Manhattan,Alphabet City,Yellow Zone
4,5,Staten Island,Arden Heights,Boro Zone


In [76]:
##Convert it to Spark DataFrame for joining
zones_df = spark.createDataFrame(zones_pdf)
zones_df.show(5)


+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows


Step 1: Data Cleaning / Filtering

Taxi data often has invalid trips. Filter them out:

In [78]:
from pyspark.sql.functions import col

clean_trips_df = trips_df.filter(
    (col("fare_amount") > 0) & 
    (col("passenger_count") > 0) & 
    (col("trip_distance") > 0)
)

clean_trips_df.show(5)
clean_trips_df.count()


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+
|       2| 2023-01-01 00:32:10|  2023-01-01 00:40:36|            1.0|         0.97|       1.0|                 N|         161|         141|           2|        9.3|  1.0|    0.5|       0.

4838

Step 2: Feature Engineering / Transformations

Extract useful information from pickup/dropoff timestamps:

In [80]:
from pyspark.sql.functions import year, month, dayofmonth, hour, unix_timestamp

trips_df = clean_trips_df.withColumn("pickup_year", year("tpep_pickup_datetime")) \
                          .withColumn("pickup_month", month("tpep_pickup_datetime")) \
                          .withColumn("pickup_day", dayofmonth("tpep_pickup_datetime")) \
                          .withColumn("pickup_hour", hour("tpep_pickup_datetime")) \
                          .withColumn("trip_duration_minutes", 
                                      (unix_timestamp("tpep_dropoff_datetime") - unix_timestamp("tpep_pickup_datetime")) / 60)
trips_df.show(5)


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-----------+------------+----------+-----------+---------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|pickup_year|pickup_month|pickup_day|pickup_hour|trip_duration_minutes|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-----------+------------+----------+-----------+

Step 3: Master Data Enrichment

Join with zones_df to get human-readable zone names:

In [82]:
#Create a fresh DataFrame with only the original trips columns
# Keep only the original trips columns to start fresh
trips_clean = trips_df.select(
    "VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime",
    "passenger_count", "trip_distance", "RatecodeID",
    "store_and_fwd_flag", "PULocationID", "DOLocationID",
    "payment_type", "fare_amount", "extra", "mta_tax",
    "tip_amount", "tolls_amount", "improvement_surcharge",
    "total_amount", "congestion_surcharge"
)
trips_clean.head()

Row(VendorID=2, tpep_pickup_datetime=datetime.datetime(2023, 1, 1, 0, 32, 10), tpep_dropoff_datetime=datetime.datetime(2023, 1, 1, 0, 40, 36), passenger_count=1.0, trip_distance=0.97, RatecodeID=1.0, store_and_fwd_flag='N', PULocationID=161, DOLocationID=141, payment_type=2, fare_amount=9.3, extra=1.0, mta_tax=0.5, tip_amount=0.0, tolls_amount=0.0, improvement_surcharge=1.0, total_amount=14.3, congestion_surcharge=2.5)

In [83]:
##Enrich pickup zone (broadcast join, minimal)
from pyspark.sql.functions import broadcast, col

pickup_zones = zones_df.select(
    col("LocationID").alias("pickup_LocationID"),
    col("Zone").alias("pickup_zone_name")
)

trips_with_pickup = trips_clean.join(
    broadcast(pickup_zones),
    trips_clean.PULocationID == pickup_zones.pickup_LocationID,
    "left"
).select(
    trips_clean["*"],  # only original columns
    pickup_zones["pickup_zone_name"]
)

# Verify pickup enrichment
trips_with_pickup.select("PULocationID", "pickup_zone_name").show(5)


+------------+----------------+
|PULocationID|pickup_zone_name|
+------------+----------------+
|         161|  Midtown Center|
|          43|    Central Park|
|          48|    Clinton East|
|         107|        Gramercy|
|         161|  Midtown Center|
+------------+----------------+
only showing top 5 rows


In [84]:
#Enrich dropoff zone (broadcast join, minimal)
dropoff_zones = zones_df.select(
    col("LocationID").alias("dropoff_LocationID"),
    col("Zone").alias("dropoff_zone_name")
)

trips_enriched = trips_with_pickup.join(
    broadcast(dropoff_zones),
    trips_with_pickup.DOLocationID == dropoff_zones.dropoff_LocationID,
    "left"
).select(
    trips_with_pickup["*"],  # all original + pickup_zone_name
    dropoff_zones["dropoff_zone_name"]
)

# Verify dropoff enrichment
trips_enriched.select(
    "PULocationID", "pickup_zone_name",
    "DOLocationID", "dropoff_zone_name"
).show(5)


+------------+----------------+------------+--------------------+
|PULocationID|pickup_zone_name|DOLocationID|   dropoff_zone_name|
+------------+----------------+------------+--------------------+
|         161|  Midtown Center|         141|     Lenox Hill West|
|          43|    Central Park|         237|Upper East Side S...|
|          48|    Clinton East|         238|Upper West Side N...|
|         107|        Gramercy|          79|        East Village|
|         161|  Midtown Center|         137|            Kips Bay|
+------------+----------------+------------+--------------------+
only showing top 5 rows


Step 4: Add date/time and duration features

In [86]:
from pyspark.sql.functions import hour, dayofweek, month, col

# Step 4a: pickup hour
trips_enriched = trips_enriched.withColumn("pickup_hour", hour(col("tpep_pickup_datetime")))

# Step 4b: pickup day of week
trips_enriched = trips_enriched.withColumn("pickup_day_of_week", dayofweek(col("tpep_pickup_datetime")))

# Step 4c: pickup month
trips_enriched = trips_enriched.withColumn("pickup_month", month(col("tpep_pickup_datetime")))

# Step 4d: trip duration in minutes
trips_enriched = trips_enriched.withColumn(
    "trip_duration_minutes",
    (col("tpep_dropoff_datetime").cast("long") - col("tpep_pickup_datetime").cast("long")) / 60
)

# Verify results
trips_enriched.select(
    "tpep_pickup_datetime", "tpep_dropoff_datetime",
    "pickup_hour", "pickup_day_of_week", "pickup_month",
    "trip_duration_minutes"
).show(5)


+--------------------+---------------------+-----------+------------------+------------+---------------------+
|tpep_pickup_datetime|tpep_dropoff_datetime|pickup_hour|pickup_day_of_week|pickup_month|trip_duration_minutes|
+--------------------+---------------------+-----------+------------------+------------+---------------------+
| 2023-01-01 00:32:10|  2023-01-01 00:40:36|          0|                 1|           1|    8.433333333333334|
| 2023-01-01 00:55:08|  2023-01-01 01:01:27|          0|                 1|           1|    6.316666666666666|
| 2023-01-01 00:25:04|  2023-01-01 00:37:49|          0|                 1|           1|                12.75|
| 2023-01-01 00:10:29|  2023-01-01 00:21:19|          0|                 1|           1|   10.833333333333334|
| 2023-01-01 00:50:34|  2023-01-01 01:02:52|          0|                 1|           1|                 12.3|
+--------------------+---------------------+-----------+------------------+------------+---------------------+
o

Step 5: Filter invalid data

In [88]:
# Remove trips with negative or zero duration or distance
trips_enriched = trips_enriched.filter(
    (col("trip_duration_minutes") > 0) &
    (col("trip_distance") > 0)
)

# Optional: remove extreme outliers (e.g., trips longer than 3 hours)
trips_enriched = trips_enriched.filter(col("trip_duration_minutes") < 180)

trips_enriched.show(5)


+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+----------------+--------------------+-----------+------------------+------------+---------------------+
|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|RatecodeID|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|pickup_zone_name|   dropoff_zone_name|pickup_hour|pickup_day_of_week|pickup_month|trip_duration_minutes|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+----------------

In [89]:
trips_enriched \
    .coalesce(4) \
    .write \
    .mode("overwrite") \
    .parquet("/tmp/curated_nyc_taxi/")


In [24]:
#aws s3 cp /tmp/curated_nyc_taxi/ s3://my-data-lake-lab-nandnioubt/processed/nyc_taxi/ --recursive
#run this command in bash to avoid kernel restart

Step 7: Master Data Lookups (Vendor & Rate Codes)

Goal: Enrich your trips with descriptive names for VendorID and RatecodeID, and check for invalid references.

In [105]:
## Create Vendor master data (inline)
## NYC Taxi vendor data is small & static, so we define it directly.
vendor_df = spark.createDataFrame(
    [
        (1, "Creative Mobile Technologies"),
        (2, "VeriFone Inc")
    ],
    ["VendorID", "vendor_name"]
)

vendor_df.show()


+--------+--------------------+
|VendorID|         vendor_name|
+--------+--------------------+
|       1|Creative Mobile T...|
|       2|        VeriFone Inc|
+--------+--------------------+



In [107]:
## Create Rate Code master data
ratecode_df = spark.createDataFrame(
    [
        (1, "Standard rate"),
        (2, "JFK"),
        (3, "Newark"),
        (4, "Nassau or Westchester"),
        (5, "Negotiated fare"),
        (6, "Group ride")
    ],
    ["RatecodeID", "rate_code_desc"]
)

ratecode_df.show()


+----------+--------------------+
|RatecodeID|      rate_code_desc|
+----------+--------------------+
|         1|       Standard rate|
|         2|                 JFK|
|         3|              Newark|
|         4|Nassau or Westche...|
|         5|     Negotiated fare|
|         6|          Group ride|
+----------+--------------------+



In [28]:
## Join Vendor master (lookup enrichment)
trips_with_vendor = trips_enriched.join(
    vendor_df,
    on="VendorID",
    how="left"
)

trips_with_vendor.select("VendorID", "vendor_name").show(5)


+--------+------------+
|VendorID| vendor_name|
+--------+------------+
|       2|VeriFone Inc|
|       2|VeriFone Inc|
|       2|VeriFone Inc|
|       2|VeriFone Inc|
|       2|VeriFone Inc|
+--------+------------+
only showing top 5 rows


                                                                                

In [109]:
## Join Rate Code master
trips_with_master = trips_with_vendor.join(
    ratecode_df,
    on="RatecodeID",
    how="left"
)

trips_with_master.select(
    "RatecodeID", "rate_code_desc"
).show(5)


                                                                                

+----------+--------------+
|RatecodeID|rate_code_desc|
+----------+--------------+
|       1.0| Standard rate|
|       1.0| Standard rate|
|       1.0| Standard rate|
|       1.0| Standard rate|
|       1.0| Standard rate|
+----------+--------------+
only showing top 5 rows


In [111]:
##Identify invalid master references (important!)
from pyspark.sql.functions import col

invalid_master_records = trips_with_master.filter(
    col("vendor_name").isNull() | col("rate_code_desc").isNull()
)

invalid_master_records.show(5)


                                                                                

+----------+--------+--------------------+---------------------+---------------+-------------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+--------------------+--------------------+-----------+------------------+------------+---------------------+--------------------+--------------+
|RatecodeID|VendorID|tpep_pickup_datetime|tpep_dropoff_datetime|passenger_count|trip_distance|store_and_fwd_flag|PULocationID|DOLocationID|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|    pickup_zone_name|   dropoff_zone_name|pickup_hour|pickup_day_of_week|pickup_month|trip_duration_minutes|         vendor_name|rate_code_desc|
+----------+--------+--------------------+---------------------+---------------+-------------+------------------+------------+------------+------------+-----------+-----+-------+----------+-----

Step 8: Reference Data Validation
Goal

Validate allowed values for reference columns and flag bad data.

Typical NYC taxi reference fields:

payment_type

store_and_fwd_flag

In [114]:
# 8.1: Define reference data (allowed values)
# Payment Type reference
payment_type_ref = spark.createDataFrame(
    [
        (1, "Credit card"),
        (2, "Cash"),
        (3, "No charge"),
        (4, "Dispute"),
        (5, "Unknown"),
        (6, "Voided trip")
    ],
    ["payment_type", "payment_type_desc"]
)

payment_type_ref.show()


+------------+-----------------+
|payment_type|payment_type_desc|
+------------+-----------------+
|           1|      Credit card|
|           2|             Cash|
|           3|        No charge|
|           4|          Dispute|
|           5|          Unknown|
|           6|      Voided trip|
+------------+-----------------+



In [116]:
# Store-and-forward flag reference
store_fwd_ref = spark.createDataFrame(
    [
        ("Y", "Stored and forwarded"),
        ("N", "Not stored")
    ],
    ["store_and_fwd_flag", "store_and_fwd_desc"]
)

store_fwd_ref.show()


+------------------+--------------------+
|store_and_fwd_flag|  store_and_fwd_desc|
+------------------+--------------------+
|                 Y|Stored and forwarded|
|                 N|          Not stored|
+------------------+--------------------+



In [118]:
# Validate payment_type
# Find invalid payment types

invalid_payment_type = trips_with_master.join(
    payment_type_ref,
    on="payment_type",
    how="left_anti"
)

invalid_payment_type.select("payment_type").distinct().show()


+------------+
|payment_type|
+------------+
+------------+



In [120]:
#Validate store_and_fwd_flag
invalid_store_fwd = trips_with_master.join(
    store_fwd_ref,
    on="store_and_fwd_flag",
    how="left_anti"
)

invalid_store_fwd.select("store_and_fwd_flag").distinct().show()


+------------------+
|store_and_fwd_flag|
+------------------+
+------------------+



In [122]:
#Add validation flags 
from pyspark.sql.functions import when, col

validated_trips = trips_with_master \
    .withColumn(
        "is_valid_payment_type",
        when(col("payment_type").isin([1,2,3,4,5,6]), "Y").otherwise("N")
    ) \
    .withColumn(
        "is_valid_store_fwd",
        when(col("store_and_fwd_flag").isin(["Y","N"]), "Y").otherwise("N")
    )

validated_trips.select(
    "payment_type", "is_valid_payment_type",
    "store_and_fwd_flag", "is_valid_store_fwd"
).show(5)


                                                                                

+------------+---------------------+------------------+------------------+
|payment_type|is_valid_payment_type|store_and_fwd_flag|is_valid_store_fwd|
+------------+---------------------+------------------+------------------+
|           2|                    Y|                 N|                 Y|
|           1|                    Y|                 N|                 Y|
|           1|                    Y|                 N|                 Y|
|           1|                    Y|                 N|                 Y|
|           1|                    Y|                 N|                 Y|
+------------+---------------------+------------------+------------------+
only showing top 5 rows


In [124]:
#Data quality summary
validated_trips.groupBy(
    "is_valid_payment_type",
    "is_valid_store_fwd"
).count().show()


                                                                                

+---------------------+------------------+-----+
|is_valid_payment_type|is_valid_store_fwd|count|
+---------------------+------------------+-----+
|                    Y|                 Y| 4827|
+---------------------+------------------+-----+



Validated reference attributes using controlled value lists, flagged invalid values instead of deleting records, and produced data quality metrics for governance reporting

In [78]:
!pip install --upgrade boto3 botocore




In [126]:
import boto3
import pandas as pd
from io import BytesIO

pdf = validated_trips.toPandas()

buffer = BytesIO()
pdf.to_parquet(buffer, engine="pyarrow", index=False)

s3 = boto3.client("s3")
s3.put_object(
    Bucket="my-data-lake-lab-nandnioubt",
    Key="curated/nyc_taxi/validated_trips.parquet",
    Body=buffer.getvalue()
)


                                                                                

{'ResponseMetadata': {'RequestId': 'WZFWRKPYM95F6QAB',
  'HostId': 'wr4N5Mu1mdNoe7bJzTDQ5+664X1nmrMz+lZjckwOMS9hH8evPMGmdO4vC+eEUbvrGG4L6Z8q7MGzL5+cRacKqA==',
  'HTTPStatusCode': 200,
  'HTTPHeaders': {'x-amz-id-2': 'wr4N5Mu1mdNoe7bJzTDQ5+664X1nmrMz+lZjckwOMS9hH8evPMGmdO4vC+eEUbvrGG4L6Z8q7MGzL5+cRacKqA==',
   'x-amz-request-id': 'WZFWRKPYM95F6QAB',
   'date': 'Sun, 04 Jan 2026 03:40:41 GMT',
   'x-amz-server-side-encryption': 'AES256',
   'etag': '"9dbd9e89c983c716d5d8b4a3a324cdd6"',
   'x-amz-checksum-crc32': 'et5zOQ==',
   'x-amz-checksum-type': 'FULL_OBJECT',
   'content-length': '0',
   'server': 'AmazonS3'},
  'RetryAttempts': 0},
 'ETag': '"9dbd9e89c983c716d5d8b4a3a324cdd6"',
 'ChecksumCRC32': 'et5zOQ==',
 'ChecksumType': 'FULL_OBJECT',
 'ServerSideEncryption': 'AES256'}