# Part 1: Data Ingestion
## Programmatic Download (5 marks)

In [1]:
pip install -r "requirements.txt"

Note: you may need to restart the kernel to use updated packages.


ERROR: Could not open requirements file: [Errno 2] No such file or directory: 'requirements.txt'


In [2]:
import os
import sys
import requests
import pandas as pd
import polars as pl
import duckdb
from datetime import datetime
import time
from pathlib import Path
import requests

In [3]:
data_dir = Path("data/raw")
data_dir.mkdir(parents=True, exist_ok=True)

trip_data_url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet"
trip_data_file = data_dir / "yellow_tripdata_2024-01.parquet"

zone_data_url = "https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv"
zone_data_file = data_dir / "taxi_zone_lookup.csv"

In [4]:
def download_file(url, file_path):
    if not file_path.exists():
        print(f"Downloading from: {url}")
        response = requests.get(url)
        
        with open(file_path, 'wb') as f:
            f.write(response.content)
        
        file_size_mb = os.path.getsize(file_path) / 1e6
        print(f"Downloaded: {file_path.name} ({file_size_mb:.1f} MB)")
        return True
    else:
        file_size_mb = os.path.getsize(file_path) / 1e6
        print(f"File already exists: {file_path.name} ({file_size_mb:.1f} MB)")
        return False

In [5]:
download_file(trip_data_url, trip_data_file)

download_file(zone_data_url, zone_data_file)


File already exists: yellow_tripdata_2024-01.parquet (50.0 MB)
File already exists: taxi_zone_lookup.csv (0.0 MB)


False

## Data Validation (10 marks)

In [6]:
EXPECTED_TRIP_COLUMNS = [
    'tpep_pickup_datetime',
    'tpep_dropoff_datetime',
    'PULocationID',
    'DOLocationID',
    'passenger_count',
    'trip_distance',
    'fare_amount',
    'tip_amount',
    'total_amount',
    'payment_type'
]

EXPECTED_ZONE_COLUMNS = [
    'LocationID',
    'Borough',
    'Zone',
    'service_zone'
]


In [7]:
def load_data (file_path):
    try:
        if not os.path.exists(file_path):
            print(f"✗ Error file does not exist: {file_path}")
            return False
        
        df = pl.read_parquet(file_path)
        print(f"File loaded successfully")
        print(f"Shape: {df.shape[0]} rows × {df.shape[1]} columns")
    
    except Exception as e:
        print(f"Error loading file: {e}")
        return False

load_data(trip_data_file)

File loaded successfully
Shape: 2964624 rows × 19 columns


### a) Verify all expected columns exist in the dataset

In [8]:
df = pl.read_parquet(trip_data_file)
print(df.height)

def verify_trip_columns(file_path, expected_columns):
    print(f"\nValidating Trip Data...")
    print(f"File: {file_path}")

    load_data(trip_data_file)

    df = pl.read_parquet(file_path)
    
    #Part A
    
    actual_columns = df.columns
    
    missing_columns = []
    for col in expected_columns:
        if col not in actual_columns:
            missing_columns.append(col)
    
    if missing_columns:
        print(f"Their are {len(missing_columns)} missing Colums")
        print(f"Actual columns: {actual_columns}")
        return False
    else:
        print(f"The Expected Columns are their")
    
        return True

trip_columns_valid = verify_trip_columns(trip_data_file, EXPECTED_TRIP_COLUMNS)

        

2964624

Validating Trip Data...
File: data\raw\yellow_tripdata_2024-01.parquet
File loaded successfully
Shape: 2964624 rows × 19 columns
The Expected Columns are their


In [9]:
def verify_zone_columns(file_path, expected_columns):
    print(f"\nValidating Zone Lookup Table")
    print(f"File: {file_path}")
    
    try:
        # Check file exists
        if not os.path.exists(file_path):
            print(f"✗ ERROR: File not found: {file_path}")
            return False
        
        # Load CSV file with Pandas
        df = pd.read_csv(file_path)
        print(f"File loaded successfully")
        print(f"  Shape: {df.shape[0]} rows × {df.shape[1]} columns")
        
        actual_columns = df.columns.tolist()
        
        # Find missing columns
        missing_columns = [col for col in expected_columns if col not in actual_columns]
        
        if missing_columns:
            print(f"Their are {len(missing_columns)} missing Colums")
            print(f"  Available columns: {actual_columns}")
            return False
        else:
            print(f"The Expected Columns are their")
            
            # Show extra columns (if any)
            extra_columns = [col for col in actual_columns if col not in expected_columns]
            if extra_columns:
                print(f"  Note: Found {len(extra_columns)} additional columns:")
                for col in extra_columns:
                    print(f"    - {col}")
            
            return True
            
    except Exception as e:
        print(f"Error file did'nt load {e}")
        return False

zone_columns_valid = verify_zone_columns(zone_data_file, EXPECTED_ZONE_COLUMNS)



Validating Zone Lookup Table
File: data\raw\taxi_zone_lookup.csv
File loaded successfully
  Shape: 265 rows × 4 columns
The Expected Columns are their


In [10]:
def validate_datetime_cols(df, expected_cols):

    df = pl.scan_parquet(df)
    
    for col in expected_cols:
        dtype = df.schema.get(col)

        if dtype is None:
            print(f"{col} missing")
            return False

        if dtype.base_type() != pl.Datetime:
            print(f"{col} wrong type: {dtype}")
            return False

    return True

expected_datetime_cols = ["tpep_pickup_datetime", "tpep_dropoff_datetime"]
validate_datetime_cols(trip_data_file, expected_datetime_cols)


  dtype = df.schema.get(col)


True

In [11]:
# Validate trip data columns
trip_columns_valid = verify_trip_columns(trip_data_file, EXPECTED_TRIP_COLUMNS)

# Validate zone data columns  
zone_columns_valid = verify_zone_columns(zone_data_file, EXPECTED_ZONE_COLUMNS)

# Validate datetime types
expected_datetime_cols = ["tpep_pickup_datetime", "tpep_dropoff_datetime"]
datatime_colums_valid = validate_datetime_cols(trip_data_file, expected_datetime_cols)

# Summary
print("\n\nSummary:\n\n")
if trip_columns_valid and zone_columns_valid and datatime_colums_valid:
    print("All required columns are present in both datasets")
else:
    print("Error unvalidated required columns")
    
    if not trip_columns_valid:
        print("  - Trip data: Column validation failed")
    
    if not zone_columns_valid:
        print("  - Zone lookup: Column validation failed")

    if not datatime_colums_valid:
        print("  - Datetime : Column validation failed")
    
    print("\nExiting due to validation failures.")
    sys.exit(1)


Validating Trip Data...
File: data\raw\yellow_tripdata_2024-01.parquet
File loaded successfully
Shape: 2964624 rows × 19 columns
The Expected Columns are their

Validating Zone Lookup Table
File: data\raw\taxi_zone_lookup.csv
File loaded successfully
  Shape: 265 rows × 4 columns
The Expected Columns are their


Summary:


All required columns are present in both datasets


  dtype = df.schema.get(col)


# Part 2: Data Transformation & Analysis (30 marks)

## 4. Data Cleaning (10 marks)

### e) Removing rows with null values in critical columns (pickup/dropoff times, locations, fare) 

In [12]:
df = pl.read_parquet(trip_data_file)
cur = df.height

print(f"This is the data before dropping null vaules: {cur}")

clean = df.drop_nulls(subset=["tpep_pickup_datetime", "tpep_dropoff_datetime", "PULocationID", "DOLocationID", "fare_amount"])
print(f"This is the data after dropping null vaules: {clean.height}")
print(f"Amount dropped: {cur - clean.height}")

This is the data before dropping null vaules: 2964624
This is the data after dropping null vaules: 2964624
Amount dropped: 0


### f) Filtering out invalid trips: trips with zero or negative distance, negative fares, or fares exceeding $500 

In [13]:
# Filtering out invalid trips: trips with zero or negative distance
cur = clean.height
print(f"The current rows before removing invaild distances are {cur}")
clean = clean.filter(pl.col("trip_distance").is_not_null() & (pl.col("trip_distance") > 0))
invalid_distances = cur - clean.height
print(f"The cleaned distances are {invalid_distances}")

# Filtering out invalid trips: trips with negative fares, or fares exceeding $500 
cur = clean.height
print(f"The current rows before removing invaild fares are {cur}")
clean = clean.filter((pl.col('fare_amount') > 0) & (pl.col('fare_amount') < 500))
invalid_fares = cur - clean.height
print(f"The cleaned fares are {invalid_fares}")

# Removing trips where dropoff time is before pickup time 
cur = clean.height
print(f"The current rows before removing invaild dates are {cur}")
clean = clean.filter(pl.col('tpep_dropoff_datetime') > pl.col('tpep_pickup_datetime'))
invalid_dates = cur - clean.height
print(f"The cleaned dates are {invalid_dates}")


The current rows before removing invaild distances are 2964624
The cleaned distances are 60371
The current rows before removing invaild fares are 2904253
The cleaned fares are 34573
The current rows before removing invaild dates are 2869680
The cleaned dates are 112


## Feature Engineering (10 marks)

In [14]:
clean = df.with_columns([
    # i) trip_duration_minutes: calculated from pickup and dropoff timestamps
    (
        (pl.col("tpep_dropoff_datetime") - pl.col("tpep_pickup_datetime"))
        .dt.total_seconds() / 60).alias("trip_duration_minutes"),

    # j) trip_speed_mph: distance divided by duration (handle division by zero) 
    pl.when(
        ((pl.col("tpep_dropoff_datetime") - pl.col("tpep_pickup_datetime"))
         .dt.total_seconds()) > 0
    )
    .then(
        pl.col("trip_distance") /
        (((pl.col("tpep_dropoff_datetime") - pl.col("tpep_pickup_datetime"))
          .dt.total_seconds()) / 3600)
    )
    .otherwise(None)
    .alias("trip_speed_mph"),

    # k) pickup_hour: hour of day (0-23) extracted from pickup timestamp
    pl.col("tpep_pickup_datetime").dt.hour().alias("pickup_hour"),

    # l) pickup_day_of_week: day name (Monday-Sunday) extracted from pickup timestamp
    pl.col("tpep_pickup_datetime").dt.strftime("%A").alias("pickup_day_of_week")
])

clean

VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee,trip_duration_minutes,trip_speed_mph,pickup_hour,pickup_day_of_week
i32,datetime[ns],datetime[ns],i64,f64,i64,str,i32,i32,i64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,f64,i8,str
2,2024-01-01 00:57:55,2024-01-01 01:17:43,1,1.72,1,"""N""",186,79,2,17.7,1.0,0.5,0.0,0.0,1.0,22.7,2.5,0.0,19.8,5.212121,0,"""Monday"""
1,2024-01-01 00:03:00,2024-01-01 00:09:36,1,1.8,1,"""N""",140,236,1,10.0,3.5,0.5,3.75,0.0,1.0,18.75,2.5,0.0,6.6,16.363636,0,"""Monday"""
1,2024-01-01 00:17:06,2024-01-01 00:35:01,1,4.7,1,"""N""",236,79,1,23.3,3.5,0.5,3.0,0.0,1.0,31.3,2.5,0.0,17.916667,15.739535,0,"""Monday"""
1,2024-01-01 00:36:38,2024-01-01 00:44:56,1,1.4,1,"""N""",79,211,1,10.0,3.5,0.5,2.0,0.0,1.0,17.0,2.5,0.0,8.3,10.120482,0,"""Monday"""
1,2024-01-01 00:46:51,2024-01-01 00:52:57,1,0.8,1,"""N""",211,148,1,7.9,3.5,0.5,3.2,0.0,1.0,16.1,2.5,0.0,6.1,7.868852,0,"""Monday"""
…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…,…
2,2024-01-31 23:45:59,2024-01-31 23:54:36,,3.18,,,107,263,0,15.77,0.0,0.5,2.0,0.0,1.0,21.77,,,8.616667,22.143133,23,"""Wednesday"""
1,2024-01-31 23:13:07,2024-01-31 23:27:52,,4.0,,,114,236,0,18.4,1.0,0.5,2.34,0.0,1.0,25.74,,,14.75,16.271186,23,"""Wednesday"""
2,2024-01-31 23:19:00,2024-01-31 23:38:00,,3.33,,,211,25,0,19.97,0.0,0.5,0.0,0.0,1.0,23.97,,,19.0,10.515789,23,"""Wednesday"""
2,2024-01-31 23:07:23,2024-01-31 23:25:14,,3.06,,,107,13,0,23.88,0.0,0.5,5.58,0.0,1.0,33.46,,,17.85,10.285714,23,"""Wednesday"""


## SQL Analysis (10 marks)

### m) What are the top 10 busiest pickup zones by total number of trips? (Include zone names from lookup table) 

In [15]:
con = duckdb.connect()

result = con.execute('''
    WITH trips AS (
        SELECT * FROM 'clean'
    ),
    zones AS (
        SELECT * FROM read_csv_auto('data/raw/taxi_zone_lookup.csv')
    )
    SELECT
        z.Zone       AS zone_name,
        COUNT(*)     AS trips_count
    FROM trips t
    JOIN zones z
      ON t.PULocationID = z.LocationID    -- adjust column names here if needed
    GROUP BY z.Zone
    ORDER BY trips_count DESC
    LIMIT 10;
''').fetchdf()

print(result)


                      zone_name  trips_count
0                   JFK Airport       145240
1                Midtown Center       143471
2         Upper East Side South       142708
3         Upper East Side North       136465
4                  Midtown East       106717
5     Times Sq/Theatre District       106324
6  Penn Station/Madison Sq West       104523
7           Lincoln Square East       104080
8             LaGuardia Airport        89533
9         Upper West Side South        88474


### n) What is the average fare amount for each hour of the day? (Order by hour) 

In [16]:
import duckdb

con = duckdb.connect()

result = con.execute('''
WITH trips AS (
    SELECT * FROM read_parquet('data/raw/yellow_tripdata_2024-01.parquet')
)
SELECT
    CAST(EXTRACT(hour FROM CAST(tpep_pickup_datetime AS TIMESTAMP)) AS INTEGER) AS pickup_hour,
    AVG(CAST(fare_amount AS DOUBLE)) AS avg_fare_amount,
    COUNT(*) AS trips_count
FROM trips
WHERE tpep_pickup_datetime IS NOT NULL
  AND fare_amount IS NOT NULL
GROUP BY pickup_hour
ORDER BY pickup_hour;
''').fetchdf()

print(result)


    pickup_hour  avg_fare_amount  trips_count
0             0        19.202658        79094
1             1        17.527296        53627
2             2        16.482882        37517
3             3        18.150135        24811
4             4        22.518645        16742
5             5        26.619918        18764
6             6        21.650399        41429
7             7        18.539180        83719
8             8        17.654683       117209
9             9        17.708415       128970
10           10        17.753394       138778
11           11        17.432059       150542
12           12        17.476313       164559
13           13        18.116046       169903
14           14        18.945787       182898
15           15        18.774634       189359
16           16        19.121762       190201
17           17        17.838371       206257
18           18        16.723151       212788
19           19        17.294794       184032
20           20        17.695375  

### o) What percentage of trips use each payment type?

In [17]:
import duckdb

con = duckdb.connect()

result = con.execute('''
WITH trips AS (
    SELECT * FROM read_parquet('data/raw/yellow_tripdata_2024-01.parquet')
),
counts AS (
    SELECT
        COALESCE(payment_type, 'UNKNOWN') AS payment_type,
        COUNT(*) AS trips_count
    FROM trips
    GROUP BY COALESCE(payment_type, 'UNKNOWN')
)
SELECT
    payment_type,
    trips_count,
    ROUND(100.0 * trips_count / SUM(trips_count) OVER (), 2) AS pct_trips
FROM counts
ORDER BY trips_count DESC;
''').fetchdf()

print(result)


   payment_type  trips_count  pct_trips
0             1      2319046      78.22
1             2       439191      14.81
2             0       140162       4.73
3             4        46628       1.57
4             3        19597       0.66


### p) What is the average tip percentage (tip_amount/fare_amount) by day of week, for credit card payments only?

In [18]:
import duckdb

con = duckdb.connect()

result = con.execute('''
WITH trips AS (
    SELECT * FROM read_parquet('data/raw/yellow_tripdata_2024-01.parquet')
)
SELECT
    EXTRACT(ISODOW FROM CAST(tpep_pickup_datetime AS TIMESTAMP)) AS iso_dow,  -- 1=Mon .. 7=Sun
    DAYNAME(CAST(tpep_pickup_datetime AS TIMESTAMP)) AS day_name,
    ROUND(AVG(CAST(tip_amount AS DOUBLE) / NULLIF(CAST(fare_amount AS DOUBLE), 0)) * 100, 2) AS avg_tip_pct,
    ROUND(AVG(CAST(tip_amount AS DOUBLE) / NULLIF(CAST(fare_amount AS DOUBLE), 0)), 4) AS avg_tip_ratio,
    COUNT(*) AS trips_count
FROM trips
WHERE CAST(payment_type AS INTEGER) = 1    -- 1 = Credit card
  AND tpep_pickup_datetime IS NOT NULL
  AND fare_amount IS NOT NULL
  AND CAST(fare_amount AS DOUBLE) > 0
GROUP BY iso_dow, day_name
ORDER BY iso_dow;
''').fetchdf()

print(result)


   iso_dow   day_name  avg_tip_pct  avg_tip_ratio  trips_count
0        1     Monday        39.83         0.3983       312675
1        2    Tuesday        26.07         0.2607       362261
2        3  Wednesday        25.76         0.2576       392563
3        4   Thursday        30.11         0.3011       339953
4        5     Friday        27.77         0.2777       321784
5        6   Saturday        26.61         0.2661       327420
6        7     Sunday        25.34         0.2534       262202


### q) What are the top 5 most common pickup-dropoff zone pairs? (Include zone names)

In [19]:
import duckdb

con = duckdb.connect()

result = con.execute('''
WITH trips AS (
    SELECT * FROM read_parquet('data/raw/yellow_tripdata_2024-01.parquet')
),
zones AS (
    SELECT * FROM read_csv_auto('data/raw/taxi_zone_lookup.csv')
)
SELECT
  COALESCE(zp.Zone, CAST(t.PULocationID AS VARCHAR)) AS pickup_zone,
  COALESCE(zd.Zone, CAST(t.DOLocationID AS VARCHAR)) AS dropoff_zone,
  COUNT(*) AS trips_count
FROM trips t
LEFT JOIN zones zp ON t.PULocationID = zp.LocationID
LEFT JOIN zones zd ON t.DOLocationID = zd.LocationID
GROUP BY pickup_zone, dropoff_zone
ORDER BY trips_count DESC
LIMIT 5;
''').fetchdf()

print(result)


             pickup_zone           dropoff_zone  trips_count
0  Upper East Side South  Upper East Side North        21883
1  Upper East Side North  Upper East Side South        19402
2  Upper East Side North  Upper East Side North        15955
3  Upper East Side South  Upper East Side South        14938
4         Midtown Center  Upper East Side South        10275


# Part 3: Visualization Dashboard (40 marks)

## Dashboard Structure (5 marks)