# Assignment 1: NYC Yellow Taxi Trip Data Pipeline
# Part 1: Data Ingestion

In [15]:
import requests
import os


os.makedirs("data/raw", exist_ok=True)


taxi_url = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet"
zone_url = "https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv"


taxi_path = "data/raw/yellow_tripdata_2024-01.parquet"
zone_path = "data/raw/taxi_zone_lookup.csv"


def download_file(url, path):
    print(f"Downloading {url}...")
    response = requests.get(url, stream=True)
    total = int(response.headers.get('content-length', 0))
    downloaded = 0
    with open(path, 'wb') as f:
        for chunk in response.iter_content(chunk_size=8192):
            f.write(chunk)
            downloaded += len(chunk)
            if total:
                percent = downloaded / total * 100
                print(f"  Progress: {percent:.1f}%", end='\r')
    print(f"\nSaved to {path}")

#Downloading both files
download_file(taxi_url, taxi_path)
download_file(zone_url, zone_path)

Downloading https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet...
  Progress: 100.0%
Saved to data/raw/yellow_tripdata_2024-01.parquet
Downloading https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv...

Saved to data/raw/taxi_zone_lookup.csv


# Data Validation

In [16]:
import pandas as pd

#Loading both files...
print("Loading data...")
taxi_df = pd.read_parquet("data/raw/yellow_tripdata_2024-01.parquet")
zone_df = pd.read_csv("data/raw/taxi_zone_lookup.csv")

#Expected columns
expected_columns = [
    'tpep_pickup_datetime', 'tpep_dropoff_datetime',
    'PULocationID', 'DOLocationID', 'passenger_count',
    'trip_distance', 'fare_amount', 'tip_amount',
    'total_amount', 'payment_type'
]

#Check 1: All expected columns exist
print("\n--- Column Check ---")
missing_cols = [col for col in expected_columns if col not in taxi_df.columns]
if missing_cols:
    raise Exception(f"Missing columns: {missing_cols}")
else:
    print("All expected columns present")

#Check 2: Date columns are datetime types
print("\n--- Date Type Check ---")
for date_col in ['tpep_pickup_datetime', 'tpep_dropoff_datetime']:
    if pd.api.types.is_datetime64_any_dtype(taxi_df[date_col]):
        print(f"{date_col} is valid datetime")
    else:
        raise Exception(f"{date_col} is not a datetime type")

#Check 3: Row count and summary
print("\n--- Dataset Summary ---")
print(f"Total rows: {len(taxi_df):,}")
print(f"Total columns: {len(taxi_df.columns)}")
print(f"Date range: {taxi_df['tpep_pickup_datetime'].min()} to {taxi_df['tpep_pickup_datetime'].max()}")
print(f"Zone lookup table rows: {len(zone_df):,}")
print("\n--- First 5 rows ---")
taxi_df.head()

Loading data...

--- Column Check ---
All expected columns present

--- Date Type Check ---
tpep_pickup_datetime is valid datetime
tpep_dropoff_datetime is valid datetime

--- Dataset Summary ---
Total rows: 2,964,624
Total columns: 19
Date range: 2002-12-31 22:59:39 to 2024-02-01 00:01:15
Zone lookup table rows: 265

--- First 5 rows ---


Unnamed: 0,VendorID,tpep_pickup_datetime,tpep_dropoff_datetime,passenger_count,trip_distance,RatecodeID,store_and_fwd_flag,PULocationID,DOLocationID,payment_type,fare_amount,extra,mta_tax,tip_amount,tolls_amount,improvement_surcharge,total_amount,congestion_surcharge,Airport_fee
0,2,2024-01-01 00:57:55,2024-01-01 01:17:43,1.0,1.72,1.0,N,186,79,2,17.7,1.0,0.5,0.0,0.0,1.0,22.7,2.5,0.0
1,1,2024-01-01 00:03:00,2024-01-01 00:09:36,1.0,1.8,1.0,N,140,236,1,10.0,3.5,0.5,3.75,0.0,1.0,18.75,2.5,0.0
2,1,2024-01-01 00:17:06,2024-01-01 00:35:01,1.0,4.7,1.0,N,236,79,1,23.3,3.5,0.5,3.0,0.0,1.0,31.3,2.5,0.0
3,1,2024-01-01 00:36:38,2024-01-01 00:44:56,1.0,1.4,1.0,N,79,211,1,10.0,3.5,0.5,2.0,0.0,1.0,17.0,2.5,0.0
4,1,2024-01-01 00:46:51,2024-01-01 00:52:57,1.0,0.8,1.0,N,211,148,1,7.9,3.5,0.5,3.2,0.0,1.0,16.1,2.5,0.0


# Part 2: Data Transformation & Analysis
# Data Cleaning

In [26]:

original_count = len(taxi_df)
print(f"Starting rows: {original_count:,}")

#Removing nulls in critical columns
critical_cols = ['tpep_pickup_datetime', 'tpep_dropoff_datetime', 
                 'PULocationID', 'DOLocationID', 'fare_amount']
before = len(taxi_df)
taxi_df = taxi_df.dropna(subset=critical_cols)
removed_nulls = before - len(taxi_df)
print(f"\nRemoved {removed_nulls:,} rows with null values in critical columns")

#Filtering invalid trips...
before = len(taxi_df)
taxi_df = taxi_df[taxi_df['trip_distance'] > 0]          
taxi_df = taxi_df[taxi_df['fare_amount'] > 0]             
taxi_df = taxi_df[taxi_df['fare_amount'] <= 500]          
removed_invalid = before - len(taxi_df)
print(f"Removed {removed_invalid:,} rows with invalid distance or fare")

#Remove trips where dropoff is before pickup time!
before = len(taxi_df)
taxi_df = taxi_df[taxi_df['tpep_dropoff_datetime'] > taxi_df['tpep_pickup_datetime']]
removed_time = before - len(taxi_df)
print(f"Removed {removed_time:,} rows where dropoff time is before pickup time")

#Filtering to only January 2024 (dataset has some bad dates)
before = len(taxi_df)
taxi_df = taxi_df[
    (taxi_df['tpep_pickup_datetime'] >= '2024-01-01') &
    (taxi_df['tpep_pickup_datetime'] < '2024-02-01')
]
removed_dates = before - len(taxi_df)
print(f"Removed {removed_dates:,} rows outside January 2024")

#Summary...
total_removed = original_count - len(taxi_df)
print(f"\n--- Cleaning Summary ---")
print(f"Original rows:  {original_count:,}")
print(f"Rows removed:   {total_removed:,}")
print(f"Remaining rows: {len(taxi_df):,}")
print(f"Data retained:  {len(taxi_df)/original_count*100:.1f}%")

Starting rows: 2,869,555

Removed 0 rows with null values in critical columns
Removed 0 rows with invalid distance or fare
Removed 0 rows where dropoff time is before pickup time
Removed 0 rows outside January 2024

--- Cleaning Summary ---
Original rows:  2,869,555
Rows removed:   0
Remaining rows: 2,869,555
Data retained:  100.0%


# Feature Engineering

In [18]:
#Created 4 new derived columns as required

#1.Trip duration in minutes
taxi_df['trip_duration_minutes'] = (
    (taxi_df['tpep_dropoff_datetime'] - taxi_df['tpep_pickup_datetime'])
    .dt.total_seconds() / 60
)

#2.Trip speed in mph (handled division by zero with .where())
taxi_df['trip_speed_mph'] = (
    taxi_df['trip_distance'] / (taxi_df['trip_duration_minutes'] / 60)
).where(taxi_df['trip_duration_minutes'] > 0, other=0)

#3.Hour of day (0-23)
taxi_df['pickup_hour'] = taxi_df['tpep_pickup_datetime'].dt.hour

#4.Day of week (Monday-Sunday)
taxi_df['pickup_day_of_week'] = taxi_df['tpep_pickup_datetime'].dt.day_name()

print("New columns created successfully!")
print(f"\nSample values:")
print(taxi_df[['trip_duration_minutes', 'trip_speed_mph', 
               'pickup_hour', 'pickup_day_of_week']].head(5))

print(f"\n--- Feature Summary ---")
print(f"Avg trip duration: {taxi_df['trip_duration_minutes'].mean():.1f} minutes")
print(f"Avg trip speed:    {taxi_df['trip_speed_mph'].mean():.1f} mph")
print(f"Most common hour:  {taxi_df['pickup_hour'].mode()[0]}:00")
print(f"Most common day:   {taxi_df['pickup_day_of_week'].mode()[0]}")

New columns created successfully!

Sample values:
   trip_duration_minutes  trip_speed_mph  pickup_hour pickup_day_of_week
0              19.800000        5.212121            0             Monday
1               6.600000       16.363636            0             Monday
2              17.916667       15.739535            0             Monday
3               8.300000       10.120482            0             Monday
4               6.100000        7.868852            0             Monday

--- Feature Summary ---
Avg trip duration: 15.8 minutes
Avg trip speed:    14.0 mph
Most common hour:  18:00
Most common day:   Wednesday


# SQL Analysis

# Query 1: What are the top 10 busiest pickup zones by total number of trips?

In [19]:
import duckdb

#Loading cleaned data into DuckDB...
con = duckdb.connect()
con.register('trips', taxi_df)
con.register('zones', zone_df)

print("Data loaded into DuckDB successfully!")
print(f"   Trips table: {len(taxi_df):,} rows")
print(f"   Zones table: {len(zone_df):,} rows")




Data loaded into DuckDB successfully!
   Trips table: 2,869,555 rows
   Zones table: 265 rows


In [20]:
query1 = con.execute("""
    SELECT 
        z.Zone,
        z.Borough,
        COUNT(*) AS total_trips
    FROM trips t
    JOIN zones z ON t.PULocationID = z.LocationID
    GROUP BY z.Zone, z.Borough
    ORDER BY total_trips DESC
    LIMIT 10
""").df()

print("Top 10 Busiest Pickup Zones:")
print(query1.to_string(index=False))

Top 10 Busiest Pickup Zones:
                        Zone   Borough  total_trips
              Midtown Center Manhattan       140139
       Upper East Side South Manhattan       140117
                 JFK Airport    Queens       138427
       Upper East Side North Manhattan       133961
                Midtown East Manhattan       104342
   Times Sq/Theatre District Manhattan       102958
Penn Station/Madison Sq West Manhattan       102151
         Lincoln Square East Manhattan       101794
           LaGuardia Airport    Queens        87690
       Upper West Side South Manhattan        86466


# Query 2: What is the average fare amount for each hour of the day?

In [21]:
query2 = con.execute("""
    SELECT 
        pickup_hour,
        ROUND(AVG(fare_amount), 2) AS avg_fare
    FROM trips
    GROUP BY pickup_hour
    ORDER BY pickup_hour
""").df()

print("Average Fare by Hour of Day:")
print(query2.to_string(index=False))

Average Fare by Hour of Day:
 pickup_hour  avg_fare
           0     19.68
           1     17.74
           2     16.63
           3     18.54
           4     23.45
           5     27.50
           6     22.03
           7     18.75
           8     17.83
           9     17.95
          10     18.05
          11     17.63
          12     17.80
          13     18.42
          14     19.27
          15     19.11
          16     19.46
          17     18.12
          18     17.02
          19     17.63
          20     18.05
          21     18.30
          22     19.11
          23     20.25


# Query 3: What percentage of trips use each payment type?

In [22]:
query3 = con.execute("""
    SELECT 
        CASE payment_type
            WHEN 1 THEN 'Credit Card'
            WHEN 2 THEN 'Cash'
            WHEN 3 THEN 'No Charge'
            WHEN 4 THEN 'Dispute'
            WHEN 5 THEN 'Unknown'
            ELSE 'Other'
        END AS payment_name,
        COUNT(*) AS total_trips,
        ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) AS percentage
    FROM trips
    GROUP BY payment_type
    ORDER BY total_trips DESC
""").df()

print("Payment Type Breakdown:")
print(query3.to_string(index=False))

Payment Type Breakdown:
payment_name  total_trips  percentage
 Credit Card      2298337       80.09
        Cash       422707       14.73
       Other       115195        4.01
     Dispute        22756        0.79
   No Charge        10560        0.37


# Query 4: What is the average tip percentage by day of week for credit card payments only?

In [23]:
query4 = con.execute("""
    SELECT 
        pickup_day_of_week,
        ROUND(AVG(tip_amount / fare_amount * 100), 2) AS avg_tip_percentage
    FROM trips
    WHERE payment_type = 1
      AND fare_amount > 0
    GROUP BY pickup_day_of_week
    ORDER BY avg_tip_percentage DESC
""").df()

print("Average Tip Percentage by Day of Week (Credit Card Only):")
print(query4.to_string(index=False))

Average Tip Percentage by Day of Week (Credit Card Only):
pickup_day_of_week  avg_tip_percentage
          Thursday               29.73
          Saturday               26.29
           Tuesday               25.73
         Wednesday               25.71
            Friday               25.60
            Monday               25.51
            Sunday               25.10


# Query 5: What are the top 5 most common pickup-dropoff zone pairs?

In [24]:
query5 = con.execute("""
    SELECT 
        pu.Zone AS pickup_zone,
        dropoff.Zone AS dropoff_zone,
        COUNT(*) AS total_trips
    FROM trips t
    JOIN zones pu ON t.PULocationID = pu.LocationID
    JOIN zones dropoff ON t.DOLocationID = dropoff.LocationID
    GROUP BY pu.Zone, dropoff.Zone
    ORDER BY total_trips DESC
    LIMIT 5
""").df()

print("Top 5 Most Common Pickup-Dropoff Zone Pairs:")
print(query5.to_string(index=False))

Top 5 Most Common Pickup-Dropoff Zone Pairs:
          pickup_zone          dropoff_zone  total_trips
Upper East Side South Upper East Side North        21641
Upper East Side North Upper East Side South        19199
Upper East Side North Upper East Side North        15193
Upper East Side South Upper East Side South        14112
       Midtown Center Upper East Side South        10139


# Save Cleaned Data for Dashboard

In [25]:
taxi_df.to_parquet('data/raw/taxi_cleaned.parquet', index=False)

print(f"Cleaned data saved to data/raw/taxi_cleaned.parquet")
print(f"   Rows saved: {len(taxi_df):,}")
print(f"   Columns saved: {len(taxi_df.columns)}")


Cleaned data saved to data/raw/taxi_cleaned.parquet
   Rows saved: 2,869,555
   Columns saved: 23


# For some parts of this project Claude AI was used, such as fixing the DuckDB reserved keyword conflict in Query 5 and resolving the Streamlit date filter index error as well as helping refreshing my knowledge of how to do some of the SQl queries. I also used it for some inspiration on how to organize the dashboard and interactive filters.