Part 1: Data Ingestion

In [5]:
import requests
from pathlib import Path
import polars as pl

# Define URLs for required files
taxi_url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet"
zone_url = "https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv"

# Create data/raw directory if it doesn't exist
data_dir = Path("data/raw")
data_dir.mkdir(parents=True, exist_ok=True)

# Defines File paths for downloaded data
taxi_path = data_dir / "yellow_tripdata_2024-01.parquet"
zone_path = data_dir / "taxi_zone_lookup.csv"

# Download Files and write to specified paths
def download_file(url, path):
     if path.exists():
        return
     
     with requests.get(url, stream=True) as r:
         r.raise_for_status()
         with open(path, "wb") as f:
             for chunk in r.iter_content(chunk_size=8192):
                 f.write(chunk)

download_file(taxi_url, taxi_path)
download_file(zone_url, zone_path)

In [6]:
# Define expected columns
expected_columns = ["tpep_pickup_datetime", "tpep_dropoff_datetime", "PULocationID", "DOLocationID", 
           "passenger_count", "trip_distance", "fare_amount", "tip_amount", "total_amount",
           "payment_type"]

datetime_columns = ["tpep_pickup_datetime", "tpep_dropoff_datetime"]

# Load Data with Polars
df = pl.read_parquet(taxi_path, columns=expected_columns)

def validate_data(df):
    # Check for missing columns
    missing_cols = set(expected_columns) - set(df.columns)
    if missing_cols:
        raise Exception(f"Missing expected columns: {missing_cols}")

    # Validate datetime columns
    for col in datetime_columns:
        if not df[col].dtype == pl.Datetime:
            try:
                df = df.with_columns(pl.col(col).cast(pl.Datetime))
            except Exception:
                raise Exception(f"Invalid datetime values detected in column: {col}")
    return df

# Print Row Count and Summary
def print_summary(df):
    print("\n=== Dataset Summary ===")
    print(f"Total rows: {len(df):,}")
    print(f"Shape: {df.shape}")
    print("\nData Validated Successfully!")

validate_data(df)
print_summary(df)


=== Dataset Summary ===
Total rows: 2,964,624
Shape: (2964624, 10)

Data Validated Successfully!


Part 2: Data Transformation & Analysis

In [7]:
# Remove rows with nulls
def remove_nulls(df):
    num_rows = len(df)

    critical_columns = ["tpep_pickup_datetime", "tpep_dropoff_datetime", "PULocationID", 
                    "DOLocationID", "fare_amount"]
    
    df = df.drop_nulls(critical_columns)

    removed_nulls = num_rows - len(df)
    return df, removed_nulls

# Filter out invalid trips tracking reasons for removal
def filter_trips(df):
    current_rows = len(df)

    df = df.filter(pl.col("trip_distance") > 0)
    invalid_distance = current_rows - len(df)
    current_rows = len(df)

    df = df.filter(pl.col("fare_amount") >= 0)
    negative_fare = current_rows - len(df)
    current_rows = len(df)

    df = df.filter(pl.col("fare_amount") <= 500)
    exceeding_max = current_rows - len(df)

    return df, invalid_distance, negative_fare, exceeding_max

# Filter out trips with dropoff before pickup
def filter_time(df):
    num_rows = len(df)

    df = df.filter(pl.col("tpep_dropoff_datetime") >= pl.col("tpep_pickup_datetime"))

    removed_time = num_rows - len(df)
    return df, removed_time

# Save cleaned data and print summary of removals
def save_and_print(df, total_removed, removed_nulls, invalid_distance, negative_fare, exceeding_max, removed_time):
    print("\n=== Cleaned Dataset Summary ===")
    print(f"Total rows removed: {total_removed:,}")
    print(f"Removed null values: {removed_nulls:,}")
    print(f"Removed invalid distances: {invalid_distance:,}")
    print(f"Removed negative fares: {negative_fare:,}")
    print(f"Removed exceeding $500: {exceeding_max:,}")
    print(f"Removed invalid times: {removed_time:,}")

original_rows = len(df)

df, removed_nulls = remove_nulls(df)
df, invalid_distance, negative_fare, exceeding_max = filter_trips(df)
df, removed_time = filter_time(df)

total_removed = original_rows - len(df)

save_and_print(df, total_removed, removed_nulls, invalid_distance, negative_fare, exceeding_max, removed_time)


=== Cleaned Dataset Summary ===
Total rows removed: 94,522
Removed null values: 0
Removed invalid distances: 60,371
Removed negative fares: 34,065
Removed exceeding $500: 30
Removed invalid times: 56


In [None]:
def create_derived_columns(df):
    df = df.with_columns([
        ((pl.col("tpep_dropoff_datetime") - pl.col("tpep_pickup_datetime")).dt.total_seconds() / 60)
        .alias("trip_duration_minutes")
    ])

    df = df.with_columns([
        (pl.when(pl.col("trip_duration_minutes") > 0)
         .then(pl.col("trip_distance") / (pl.col("trip_duration_minutes") / 60))
         .otherwise(0)
        ).alias("trip_speed_mph"),

        pl.col('tpep_pickup_datetime').dt.hour().alias('pickup_hour'),

        pl.col('tpep_pickup_datetime').dt.weekday().alias('pickup_day_of_week')
    ])

    return df
        
df = create_derived_columns(df)

expected_columns = [
    "trip_duration_minutes",
    "trip_speed_mph",
    "pickup_hour",
    "pickup_day_of_week"
]

# Check existence
missing_cols = set(expected_columns) - set(df.columns)
if missing_cols:
    print(f"❌ Missing columns: {missing_cols}")
else:
    print("✅ All derived columns created successfully.")

df.select(expected_columns).head(10)