In [1]:
import numpy as np
import pandas as pd

In [3]:
df = pd.read_parquet("../data/raw/yellow_tripdata_2025-01.parquet")

In [4]:
df

Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,cbd_congestion_fee
0,1,2025-01-01 00:18:38,2025-01-01 00:26:59,1.0,1.60,1.0,N,229,237,1,10.00,3.5,0.5,3.00,0.0,1.0,18.00,2.5,0.0,0.00
1,1,2025-01-01 00:32:40,2025-01-01 00:35:13,1.0,0.50,1.0,N,236,237,1,5.10,3.5,0.5,2.02,0.0,1.0,12.12,2.5,0.0,0.00
2,1,2025-01-01 00:44:04,2025-01-01 00:46:01,1.0,0.60,1.0,N,141,141,1,5.10,3.5,0.5,2.00,0.0,1.0,12.10,2.5,0.0,0.00
3,2,2025-01-01 00:14:27,2025-01-01 00:20:01,3.0,0.52,1.0,N,244,244,2,7.20,1.0,0.5,0.00,0.0,1.0,9.70,0.0,0.0,0.00
4,2,2025-01-01 00:21:34,2025-01-01 00:25:06,3.0,0.66,1.0,N,244,116,2,5.80,1.0,0.5,0.00,0.0,1.0,8.30,0.0,0.0,0.00
...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...,...
3475221,2,2025-01-31 23:01:48,2025-01-31 23:16:29,,3.35,,,79,237,0,15.85,0.0,0.5,0.00,0.0,1.0,20.60,,,0.75
3475222,2,2025-01-31 23:50:29,2025-02-01 00:17:27,,8.73,,,161,116,0,28.14,0.0,0.5,0.00,0.0,1.0,32.89,,,0.75
3475223,2,2025-01-31 23:26:59,2025-01-31 23:43:01,,2.64,,,144,246,0,14.91,0.0,0.5,0.00,0.0,1.0,19.66,,,0.75
3475224,2,2025-01-31 23:14:34,2025-01-31 23:34:52,,3.16,,,142,107,0,17.55,0.0,0.5,0.00,0.0,1.0,22.30,,,0.75


In [5]:
def clean_yellow_taxi_data(df):
    df = df.copy()

    # Renaming Columns (Ensuring Consistency in terms of Column Names)
    column_name_map = {
        'VendorID': 'vendor_id',
        'tpep_pickup_datetime': 'pickup_datetime',
        'tpep_dropoff_datetime': 'dropoff_datetime',
        'passenger_count': 'passenger_count',
        'trip_distance': 'trip_distance',
        'RatecodeID': 'rate_code_id',
        'store_and_fwd_flag': 'store_and_fwd_flag',
        'PULocationID': 'pu_location_id',
        'DOLocationID': 'do_location_id',
        'payment_type': 'payment_type',
        'fare_amount': 'fare_amount',
        'extra': 'extra',
        'mta_tax': 'mta_tax',
        'tip_amount': 'tip_amount',
        'tolls_amount': 'tolls_amount',
        'improvement_surcharge': 'improvement_surcharge_applied',
        'total_amount': 'total_amount',
        'congestion_surcharge': 'congestion_surcharge',
        'Airport_fee': 'airport_fee',
        'cbd_congestion_fee': 'cbd_congestion_fee'
    }
    df = df.rename(columns=column_name_map)

    # Category columns (no filtering dependency)
    category_columns = [
        'vendor_id',
        'rate_code_id',
        'store_and_fwd_flag',
        'payment_type'
    ]
    for col in category_columns:
        if col in df.columns:
            df[col] = df[col].astype('category')

    # Numeric coercion ONLY (no casting yet)
    numeric_columns = [
        'trip_distance', 'fare_amount', 'extra', 'mta_tax', 'tip_amount',
        'tolls_amount', 'improvement_surcharge_applied', 'total_amount',
        'congestion_surcharge', 'airport_fee', 'cbd_congestion_fee',
        'passenger_count', 'pu_location_id', 'do_location_id'
    ]
    for col in numeric_columns:
        if col in df.columns:
            df[col] = pd.to_numeric(df[col], errors='coerce')

    # Datetime parsing
    df['pickup_datetime'] = pd.to_datetime(df['pickup_datetime'], errors='coerce')
    df['dropoff_datetime'] = pd.to_datetime(df['dropoff_datetime'], errors='coerce')

    # Parsing Datetimes
    df = df.dropna(subset=['pickup_datetime', 'dropoff_datetime']).copy()

    # Validating Time Order
    df = df[df['dropoff_datetime'] > df['pickup_datetime']].copy()

    # Filtering Target Month (implicit manual logic)
    TARGET_YEAR = 2025
    TARGET_MONTH = 1  # January Month

    df = df[
        (df['pickup_datetime'].dt.year == TARGET_YEAR) &
        (df['pickup_datetime'].dt.month == TARGET_MONTH)
    ].copy()

    # Calculating Trip Duration
    df['trip_duration_min'] = (
        (df['dropoff_datetime'] - df['pickup_datetime'])
        .dt.total_seconds() / 60
    )

    # Duration & Distance Cleaning
    df = df[
        (df['trip_duration_min'] > 0) &
        (df['trip_duration_min'] <= 24 * 60)
    ].copy()

    df = df[
        (df['trip_distance'] > 0) &
        (df['trip_distance'] <= 100)
    ].copy()

    # Fare Checks
    df = df[
        (df['fare_amount'] >= 0) &
        (df['total_amount'] >= df['fare_amount'])
    ].copy()

    # Passenger Count Check
    df = df[
        (df['passenger_count'] > 0) &
        (df['passenger_count'] <= 6)
    ].copy()

    # Valid Rate Codes
    df = df[df['rate_code_id'].isin([1, 2, 3, 4, 5, 6])].copy()

    # Valid MTA Tax
    df = df[df['mta_tax'].isin([0.0, 0.5])].copy()

    # Payment filtering and mapping
    payment_map = {
        1: 'Credit Card',
        2: 'Cash',
        3: 'No Charge',
        4: 'Dispute'
    }
    df = df[df['payment_type'].isin(payment_map.keys())].copy()
    df['payment_type'] = df['payment_type'].map(payment_map).astype('category')

    # Location IDs Validation
    df = df[
        df['pu_location_id'].between(1, 263) &
        df['do_location_id'].between(1, 263)
    ].copy()

    # Tip Amount Validation
    df = df[
        (df['tip_amount'] >= 0) &
        (df['tip_amount'] <= 200)
    ].copy()

    # Rounding Tolls Amount
    df['tolls_amount'] = df['tolls_amount'].round(2)

    # Creating Amount Difference Column
    df['amount_diff'] = df['total_amount'] - (
        df['fare_amount']
        + df['extra']
        + df['mta_tax']
        + df['tip_amount']
        + df['tolls_amount']
        + df['congestion_surcharge']
        + df['airport_fee']
        + df['cbd_congestion_fee']
    )

    # FINAL dtype casting (manual-equivalent)
    df['passenger_count'] = df['passenger_count'].astype('Int8')
    df['pu_location_id'] = df['pu_location_id'].astype('Int16')
    df['do_location_id'] = df['do_location_id'].astype('Int16')

    float_cols = [
        'trip_distance', 'fare_amount', 'extra', 'mta_tax', 'tip_amount',
        'tolls_amount', 'improvement_surcharge_applied', 'total_amount',
        'congestion_surcharge', 'airport_fee', 'cbd_congestion_fee'
    ]
    for col in float_cols:
        if col in df.columns:
            df[col] = df[col].astype('Float64')

    return df.reset_index(drop=True)


In [6]:
df = clean_yellow_taxi_data(df)

In [7]:
df.shape

(2752626, 22)

In [8]:
import pandas as pd
import numpy as np

def validate_cleaned_yellow_taxi_month(
    df,
    expected_year: int,
    expected_month: int
):
    """
    Validation & diagnosis for a SINGLE cleaned monthly dataset
    produced by clean_yellow_taxi_data().
    DOES NOT MODIFY DATA.
    """

    report = {}

    # 1. Structure and Schema Validation
    expected_columns = [
        'vendor_id', 'pickup_datetime', 'dropoff_datetime',
        'passenger_count', 'trip_distance', 'rate_code_id',
        'store_and_fwd_flag', 'pu_location_id', 'do_location_id',
        'payment_type', 'fare_amount', 'extra', 'mta_tax',
        'tip_amount', 'tolls_amount', 'improvement_surcharge_applied',
        'total_amount', 'congestion_surcharge', 'airport_fee',
        'cbd_congestion_fee', 'trip_duration_min', 'amount_diff'
    ]

    report['missing_columns'] = list(set(expected_columns) - set(df.columns))
    report['unexpected_columns'] = list(set(df.columns) - set(expected_columns))

    
    # 2. Datetime Guarantees
    
    report['pickup_nulls'] = int(df['pickup_datetime'].isna().sum())
    report['dropoff_nulls'] = int(df['dropoff_datetime'].isna().sum())

    report['pickup_after_dropoff'] = int(
        (df['dropoff_datetime'] <= df['pickup_datetime']).sum()
    )

    report['wrong_year_rows'] = int(
        (df['pickup_datetime'].dt.year != expected_year).sum()
    )

    report['wrong_month_rows'] = int(
        (df['pickup_datetime'].dt.month != expected_month).sum()
    )

    # 3. Trip Duration & Distance (Pipeline Filters)
    report['duration_le_0'] = int(
        (df['trip_duration_min'] <= 0).sum()
    )

    report['duration_gt_24h'] = int(
        (df['trip_duration_min'] > 1440).sum()
    )

    report['distance_le_0'] = int(
        (df['trip_distance'] <= 0).sum()
    )

    report['distance_gt_100'] = int(
        (df['trip_distance'] > 100).sum()
    )

    # 4. Passenger Count Validation
    report['passenger_le_0'] = int(
        (df['passenger_count'] <= 0).sum()
    )

    report['passenger_gt_6'] = int(
        (df['passenger_count'] > 6).sum()
    )

    # 5. Rate Code and Tax Logic
    report['invalid_rate_code'] = int(
        (~df['rate_code_id'].isin([1, 2, 3, 4, 5, 6])).sum()
    )

    report['invalid_mta_tax'] = int(
        (~df['mta_tax'].isin([0.0, 0.5])).sum()
    )

    # 6. Payment Type Consistency
    valid_payment_types = ['Credit Card', 'Cash', 'No Charge', 'Dispute']

    report['invalid_payment_type'] = int(
        (~df['payment_type'].isin(valid_payment_types)).sum()
    )

    # 7. Location ID Integrity
    report['invalid_pu_location'] = int(
        (~df['pu_location_id'].between(1, 263)).sum()
    )

    report['invalid_do_location'] = int(
        (~df['do_location_id'].between(1, 263)).sum()
    )

    # 8. Monetary Rules
    monetary_cols = [
        'fare_amount', 'extra', 'mta_tax', 'tip_amount',
        'tolls_amount', 'improvement_surcharge_applied',
        'congestion_surcharge', 'airport_fee',
        'cbd_congestion_fee', 'total_amount'
    ]

    for col in monetary_cols:
        report[f'negative_{col}'] = int((df[col] < 0).sum())


    report['tip_gt_200'] = int((df['tip_amount'] > 200).sum())

    report['total_lt_fare'] = int(
        (df['total_amount'] < df['fare_amount']).sum()
    )

    # 9. Amount Difference Diagnostics 
    recalculated_total = (
        df['fare_amount']
        + df['extra']
        + df['mta_tax']
        + df['tip_amount']
        + df['tolls_amount']
        + df['congestion_surcharge']
        + df['airport_fee']
        + df['cbd_congestion_fee']
    )

    report['amount_diff_mismatch'] = int(
        (np.abs(df['total_amount'] - recalculated_total) > 0.01).sum()
    )

    report['amount_diff_large_abs'] = int(
        (df['amount_diff'].abs() > 1).sum()
    )

    # 10. DTYPE Regression Check 
    expected_dtypes = {
        'passenger_count': 'Int8',
        'pu_location_id': 'Int16',
        'do_location_id': 'Int16',
        'trip_distance': 'Float64',
        'fare_amount': 'Float64',
        'total_amount': 'Float64'
    }

    for col, expected in expected_dtypes.items():
        report[f'dtype_{col}'] = str(df[col].dtype)
        report[f'dtype_{col}_ok'] = str(df[col].dtype) == expected


    # 11. Duplicates & Null Density
    report['exact_duplicates'] = int(df.duplicated().sum())

    report['high_null_rate_pct'] = (
        (df.isna().mean() * 100)
        .round(2)
        .loc[lambda x: x > 0]
        .to_dict()
    )

    # 12. Metadata
    report['row_count'] = int(df.shape[0])
    report['month'] = expected_month
    report['year'] = expected_year

    return report


In [9]:
monthly_report = validate_cleaned_yellow_taxi_month(
    df,
    expected_year=2025,
    expected_month=1
)

for k, v in monthly_report.items():
    print(f"{k}: {v}")


missing_columns: []
unexpected_columns: []
pickup_nulls: 0
dropoff_nulls: 0
pickup_after_dropoff: 0
wrong_year_rows: 0
wrong_month_rows: 0
duration_le_0: 0
duration_gt_24h: 0
distance_le_0: 0
distance_gt_100: 0
passenger_le_0: 0
passenger_gt_6: 0
invalid_rate_code: 0
invalid_mta_tax: 0
invalid_payment_type: 0
invalid_pu_location: 0
invalid_do_location: 0
negative_fare_amount: 0
negative_extra: 0
negative_mta_tax: 0
negative_tip_amount: 0
negative_tolls_amount: 0
negative_improvement_surcharge_applied: 0
negative_congestion_surcharge: 0
negative_airport_fee: 0
negative_cbd_congestion_fee: 0
negative_total_amount: 0
tip_gt_200: 0
total_lt_fare: 0
amount_diff_mismatch: 2752349
amount_diff_large_abs: 707991
dtype_passenger_count: Int8
dtype_passenger_count_ok: True
dtype_pu_location_id: Int16
dtype_pu_location_id_ok: True
dtype_do_location_id: Int16
dtype_do_location_id_ok: True
dtype_trip_distance: Float64
dtype_trip_distance_ok: True
dtype_fare_amount: Float64
dtype_fare_amount_ok: True


In [10]:
df.to_csv('Jan_2025_Cleaned.csv', index=False)