In [None]:
# 816040436-Micah Hosein-Assignment 1-Notebooks

In [None]:
# Part 1: Data Ingestion

In [None]:
import os
import requests
import pandas as pd


# The Required URLs to be installed.
TAXI_URL  = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet"
ZONES_URL = "https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv"


# The Local Paths.
RAW_DIR        = "data/raw"
TAXI_RAW_PATH  = os.path.join(RAW_DIR, "yellow_tripdata_2024-01.parquet")
ZONES_RAW_PATH = os.path.join(RAW_DIR, "taxi_zone_lookup.csv")
CLEAN_PATH     = "data/cleaned_taxi_data.parquet"


# The columns that must exist in the trip data:
REQUIRED_COLUMNS = [
    "tpep_pickup_datetime",
    "tpep_dropoff_datetime",
    "PULocationID",
    "DOLocationID",
    "passenger_count",
    "trip_distance",
    "fare_amount",
    "total_amount",
    "payment_type",
]

In [35]:
def download_file(url: str, dest_path: str) -> None:
    os.makedirs(os.path.dirname(dest_path), exist_ok=True)
    
    if os.path.exists(dest_path):
        print(f"  [SKIP] Already exists: {dest_path}")
        return
    print(f"  [DOWNLOAD] {url}")
    response = requests.get(url, stream=True, timeout=120)
    response.raise_for_status()
    with open(dest_path, "wb") as f:
        for chunk in response.iter_content(chunk_size=1024 * 1024):
            f.write(chunk)
    print(f"  [SAVED]  → {dest_path}")

In [None]:
# The above code is used as a Download the File function, which uses the if os.path.exists to check 
# if the file is already downloaded.

In [34]:
print("Downloading raw data files")
download_file(TAXI_URL,  TAXI_RAW_PATH)
download_file(ZONES_URL, ZONES_RAW_PATH)
print("Downloaded.\n")

Downloading raw data files
  [SKIP] Already exists: data/raw\yellow_tripdata_2024-01.parquet
  [SKIP] Already exists: data/raw\taxi_zone_lookup.csv
Downloaded.



In [None]:
# The code calls upon the download_file function and places it in data/raw/ folder.

In [None]:
# Validate & load Function

In [36]:
def validate_and_load(parquet_path: str) -> pd.DataFrame:
 
    print("Loading and Validating trip data")
    df = pd.read_parquet(parquet_path)


    # 1. Check all required columns are present
    missing_cols = [c for c in REQUIRED_COLUMNS if c not in df.columns]
    if missing_cols:
        raise ValueError(f"Missing required columns: {missing_cols}")
    print(f"All {len(REQUIRED_COLUMNS)} required columns present.")


    # 2. Convert datetime columns (in case they loaded as object/string)
    for dt_col in ["tpep_pickup_datetime", "tpep_dropoff_datetime"]:
        df[dt_col] = pd.to_datetime(df[dt_col], errors="coerce")
        if not pd.api.types.is_datetime64_any_dtype(df[dt_col]):
            raise ValueError(f"Column '{dt_col}' could not be converted to datetime.")
    print("Datetime columns validated.")


    # 3. Report row count
    total_rows = len(df)
    print(f"Total rows loaded: {total_rows:,}")

    return df



df_raw = validate_and_load(TAXI_RAW_PATH)
df_zones = pd.read_csv(ZONES_RAW_PATH)
print(f"\nZone lookup table: {len(df_zones):,} rows\n")

Loading and Validating trip data
All 9 required columns present.
Datetime columns validated.
Total rows loaded: 2,964,624

Zone lookup table: 265 rows



In [None]:
# This part of the code loads the parquet file into Pandas to perform 3 checks.
# Check 1: looks for any missing columns from your required list and raises a ValueError if any are absent.
# Check 2: converts the pickup and dropoff columns to proper datetime format, as well as error checks.
# Check 3: Print 
# NB 2,964,624  265

In [None]:
# Part 2: Data Transformation & Analysis

In [40]:
def clean_data(df: pd.DataFrame) -> pd.DataFrame:
  
    initial_count = len(df)
    print(f"=== Data Cleaning ===")
    print(f"Starting row count: {initial_count:,}\n")


    # Step 1 — Drop nulls in critical columns
    critical_cols = [
        "tpep_pickup_datetime", "tpep_dropoff_datetime",
        "PULocationID", "DOLocationID", "fare_amount"
    ]
    df_step1 = df.dropna(subset=critical_cols)
    removed_nulls = initial_count - len(df_step1)
    print(f"  Removed {removed_nulls:,} rows  → null values in critical columns")


    # Step 2 — Remove trips with zero or negative distance
    df_step2 = df_step1[df_step1["trip_distance"] > 0]
    removed_distance = len(df_step1) - len(df_step2)
    print(f"  Removed {removed_distance:,} rows  → trip_distance <= 0")


    # Step 3 — Remove negative fares
    df_step3 = df_step2[df_step2["fare_amount"] >= 0]
    removed_neg_fare = len(df_step2) - len(df_step3)
    print(f"  Removed {removed_neg_fare:,} rows  → fare_amount < 0")


    # Step 4 — Remove fares > $500
    df_step4 = df_step3[df_step3["fare_amount"] <= 500]
    removed_high_fare = len(df_step3) - len(df_step4)
    print(f"  Removed {removed_high_fare:,} rows  → fare_amount > 500")


    # Step 5 — Remove trips where dropoff is before pickup
    df_step5 = df_step4[
        df_step4["tpep_dropoff_datetime"] > df_step4["tpep_pickup_datetime"]
    ]
    removed_time = len(df_step4) - len(df_step5)
    print(f"  Removed {removed_time:,} rows  → dropoff time before pickup time")

    total_removed = initial_count - len(df_step5)
    print(f"\nTotal rows removed: {total_removed:,}  ({total_removed/initial_count*100:.1f}%)")
    print(f"Remaining rows:     {len(df_step5):,}\n")

    return df_step5.reset_index(drop=True)



df_clean = clean_data(df_raw)

=== Data Cleaning ===
Starting row count: 2,964,624

  Removed 0 rows  → null values in critical columns
  Removed 60,371 rows  → trip_distance <= 0
  Removed 34,065 rows  → fare_amount < 0
  Removed 30 rows  → fare_amount > 500
  Removed 112 rows  → dropoff time before pickup time

Total rows removed: 94,578  (3.2%)
Remaining rows:     2,870,046



In [None]:
# This code contains the clean_data function, which is a data cleaning pipeline. 
# It runs five sequential filter steps and after each one prints exactly how many rows were removed and why. 
# Step 1 removes rows where pickup time, dropoff time, pickup location, dropoff location, or fare are null. 
# Steps 2,3 and 4 remove rows where distance is zero or negative, fare is negative, or fare exceeds $500. 
# Step 5 removes the impossible case where someone's dropoff timestamp is earlier than their pickup timestamp. 
# Each step flows within the pipeline hence the counts are cumulative. 
# The reset_index(drop=True) at the end replaces the number for the rows from 0.
# NB 2,870,046  94,578

In [42]:
print(df_clean["tpep_pickup_datetime"].dtype)
print(df_clean["tpep_pickup_datetime"].head())

datetime64[us]
0   2024-01-01 00:57:55
1   2024-01-01 00:03:00
2   2024-01-01 00:17:06
3   2024-01-01 00:36:38
4   2024-01-01 00:46:51
Name: tpep_pickup_datetime, dtype: datetime64[us]


In [None]:
# This code was to make sure of the first 5 rows because I taught there was an error.

In [44]:
def engineer_features(df: pd.DataFrame) -> pd.DataFrame:

    print("Feature Engineering")
    df = df.copy()

    df["tpep_pickup_datetime"]  = pd.to_datetime(df["tpep_pickup_datetime"]).dt.tz_localize(None)
    df["tpep_dropoff_datetime"] = pd.to_datetime(df["tpep_dropoff_datetime"]).dt.tz_localize(None)


    # 1. Trip duration in minutes (2 decimal places)
    df["trip_duration_minutes"] = (
        (df["tpep_dropoff_datetime"] - df["tpep_pickup_datetime"])
        .dt.total_seconds() / 60
    ).round(2)


    # 2. Trip speed in mph — guard against zero duration (2 decimal places)
    df["trip_speed_mph"] = df.apply(
        lambda row: (row["trip_distance"] / (row["trip_duration_minutes"] / 60))
        if row["trip_duration_minutes"] > 0 else 0.0,
        axis=1,
    ).round(2)


    # 3. Hour of day (0-23)
    df["pickup_hour"] = df["tpep_pickup_datetime"].dt.hour


    # 4. Day of week name (Monday–Sunday)
    df["pickup_day_of_week"] = df["tpep_pickup_datetime"].dt.day_name()

    print("Confirm trip_duration_minutes")
    print("Confirm trip_speed_mph")
    print("Confirm pickup_hour")
    print("Confirm pickup_day_of_week")
    print(f"\nSample: \n{df[['trip_duration_minutes','trip_speed_mph','pickup_hour','pickup_day_of_week']].head()}\n")
    return df

df_featured = engineer_features(df_clean)

Feature Engineering
Confirm trip_duration_minutes
Confirm trip_speed_mph
Confirm pickup_hour
Confirm pickup_day_of_week

Sample: 
   trip_duration_minutes  trip_speed_mph  pickup_hour pickup_day_of_week
0                  19.80            5.21            0             Monday
1                   6.60           16.36            0             Monday
2                  17.92           15.74            0             Monday
3                   8.30           10.12            0             Monday
4                   6.10            7.87            0             Monday



In [None]:
# This is the engineer_features function which adds four derived columns to the cleaned data. 
# 1) Trip duration is calculated by subtracting pickup from dropoff and converting to minutes 
# using .dt.total_seconds() / 60. 
# 2) Trip speed divides distance by duration and is converted to hours, 
# with an .apply() row-by-row check that skips the division if duration is zero to avoid error.
# 3) Pickup hour uses .dt.hour which extracts the 0–23 int from the full datetime.
# 4) Pickup day of week uses .dt.day_name() to give Monday, Tuesday Wednesday, etc, as strings.

In [9]:
os.makedirs("data", exist_ok=True)
df_featured.to_parquet(CLEAN_PATH, index=False)
print(f"Cleaned dataset saved to: {CLEAN_PATH}")

Cleaned dataset saved to: data/cleaned_taxi_data.parquet


In [None]:
#The code above saves the cleaned parquet for the Streamlit to use.

In [10]:
import duckdb

# Register DataFrames as DuckDB views
con = duckdb.connect()
con.register("trips", df_featured)
con.register("zones", df_zones)
print("DuckDB connection ready. Tables: trips, zones\n")

DuckDB connection ready. Tables: trips, zones



In [None]:
# Imports DuckDB.

In [26]:
def query_top_pickup_zones(con) -> pd.DataFrame:
    sql = """
        SELECT
            z.Zone          AS pickup_zone,
            z.Borough       AS borough,
            COUNT(*)        AS total_trips
        FROM trips t
        JOIN zones z ON t.PULocationID = z.LocationID
        GROUP BY z.Zone, z.Borough
        ORDER BY total_trips DESC
        LIMIT 10
    """
    return con.execute(sql).fetchdf()

q1_result = query_top_pickup_zones(con)
print("Question 1) Top 10 Busiest Pickup Zones:")
print(q1_result.to_string(index=False))
print()

Question 1) Top 10 Busiest Pickup Zones:
                 pickup_zone   borough  total_trips
              Midtown Center Manhattan       140161
       Upper East Side South Manhattan       140131
                 JFK Airport    Queens       138474
       Upper East Side North Manhattan       133975
                Midtown East Manhattan       104353
   Times Sq/Theatre District Manhattan       102972
Penn Station/Madison Sq West Manhattan       102160
         Lincoln Square East Manhattan       101800
           LaGuardia Airport    Queens        87714
       Upper West Side South Manhattan        86473



In [None]:
# This function aims to return the 10 zones with the most pickups and include zone names.
# JOINs trips table to the zones table on the location ID, groups by zone name and borough, counts rows, 

In [29]:
def query_avg_fare_by_hour(con) -> pd.DataFrame:
    sql = """
        SELECT
            pickup_hour,
            ROUND(AVG(fare_amount), 2) AS avg_fare
        FROM trips
        GROUP BY pickup_hour
        ORDER BY pickup_hour
    """
    return con.execute(sql).fetchdf()

q2_result = query_avg_fare_by_hour(con)
print("Question 2) Average Fare by Hour: ")
print(q2_result.to_string(index=False))
print()

Question 2) Average Fare by Hour: 
 pickup_hour  avg_fare
           0     19.68
           1     17.73
           2     16.62
           3     18.53
           4     23.44
           5     27.49
           6     22.03
           7     18.75
           8     17.82
           9     17.94
          10     18.05
          11     17.63
          12     17.80
          13     18.42
          14     19.27
          15     19.11
          16     19.46
          17     18.12
          18     17.01
          19     17.63
          20     18.05
          21     18.29
          22     19.11
          23     20.24



In [None]:
# This code aims to return average fare for each hour from row 0 to 23, ordered by hour.
# Groups all trips by their pickup_hour column, then computes the mean fare for each group 
# and sorts by the hour keeping to 2 dp.

In [30]:
def query_payment_type_pct(con) -> pd.DataFrame:
    sql = """
        SELECT
            payment_type,
            COUNT(*)                                          AS trip_count,
            ROUND(COUNT(*) * 100.0 / SUM(COUNT(*)) OVER(), 2) AS pct_of_total
        FROM trips
        GROUP BY payment_type
        ORDER BY trip_count DESC
    """
    return con.execute(sql).fetchdf()

q3_result = query_payment_type_pct(con)
print("Question 3) Payment Type Distribution:")
print(q3_result.to_string(index=False))
print()


Question 3) Payment Type Distribution:
 payment_type  trip_count  pct_of_total
            1     2298379         80.08
            2      422905         14.74
            0      115239          4.02
            4       22876          0.80
            3       10647          0.37



In [None]:
# Returns the count and percentage share for each payment type.
# Uses a window function (Given by Claude), SUM(COUNT(*)) OVER()
# to calculate the total trip count across all groups in the same query.
# This computes each payment type's percentage share in a single SQL statement without a subquery.

In [31]:
def query_tip_pct_by_day(con) -> pd.DataFrame:
    sql = """
        SELECT
            pickup_day_of_week                                    AS day_of_week,
            ROUND(AVG(tip_amount / NULLIF(fare_amount, 0)) * 100, 2) AS avg_tip_pct
        FROM trips
        WHERE payment_type = 1
          AND fare_amount > 0
        GROUP BY pickup_day_of_week
        ORDER BY avg_tip_pct DESC
    """
    return con.execute(sql).fetchdf()

q4_result = query_tip_pct_by_day(con)
print("Question 4) Average Tip Percentage by Day of The Week (Credit Card Only): ")
print(q4_result.to_string(index=False))
print()


Question 4) Average Tip Percentage by Day of The Week (Credit Card Only): 
day_of_week  avg_tip_pct
   Thursday        29.73
   Saturday        26.29
    Tuesday        25.73
  Wednesday        25.71
     Friday        25.60
     Monday        25.51
     Sunday        25.10



In [None]:
# Returns avg tip % per days of the week for credit card trips exclusively. 
# Filters to payment_type = 1, then calculates average tip as a fraction of fare 
# using NULLIF(fare_amount, 0) to avoid dividing by zero. 
# Multiplies by 100 to express as a percentage. Groups and orders by days of the week.

In [32]:
def query_top_zone_pairs(con) -> pd.DataFrame:
    sql = """
        SELECT
            pu.Zone     AS pickup_zone,
            dropoff.Zone AS dropoff_zone,
            COUNT(*)    AS trip_count
        FROM trips t
        JOIN zones pu      ON t.PULocationID = pu.LocationID
        JOIN zones dropoff ON t.DOLocationID = dropoff.LocationID
        GROUP BY pu.Zone, dropoff.Zone
        ORDER BY trip_count DESC
        LIMIT 5
    """
    return con.execute(sql).fetchdf()

q5_result = query_top_zone_pairs(con)
print("Question 5) Top 5 Pickup-Dropoff Zone Pairs: ")
print(q5_result.to_string(index=False))
print()

Question 5) Top 5 Pickup-Dropoff Zone Pairs: 
          pickup_zone          dropoff_zone  trip_count
Upper East Side South Upper East Side North       21642
Upper East Side North Upper East Side South       19199
Upper East Side North Upper East Side North       15200
Upper East Side South Upper East Side South       14116
       Midtown Center Upper East Side South       10139



In [None]:
# Returns the 5 most common pickup→dropoff zone pairs with zone names.
# JOINs the zones table twice once for pickup location and once for dropoff location.
# Used Claude to come up with this solution.

In [None]:
# Part 3: Dashboard Develpoment

In [None]:
import plotly.express as px


# Chart 1 — Top 10 pickup zones
fig1 = px.bar(
    q1_result,
    x="total_trips", y="pickup_zone",
    orientation="h", title="Top 10 Pickup Zones",
    labels={"total_trips": "Number of Trips", "pickup_zone": "Zone"},
    color="total_trips", color_continuous_scale="Blues",
)
fig1.update_layout(yaxis=dict(autorange="reversed"))
fig1.show()


# Chart 2 — Average fare by hour
fig2 = px.line(
    q2_result, x="pickup_hour", y="avg_fare",
    title="Average Fare by Hour of Day",
    labels={"pickup_hour": "Hour (0–23)", "avg_fare": "Avg Fare ($)"},
    markers=True,
)
fig2.show()


# Chart 3 — Trip distance histogram (sample for speed in notebook)
fig3 = px.histogram(
    df_featured.sample(50000, random_state=42),
    x="trip_distance",
    nbins=60,
    title="Distribution of Trip Distances",
    labels={"trip_distance": "Distance (miles)"},
    range_x=[0, 30],
)
fig3.show()


# Chart 4 — Payment type pie chart
PAYMENT_LABELS = {1: "Credit Card", 2: "Cash", 3: "No Charge", 4: "Dispute", 0: "Unknown", 6: "Voided Trip"}
q3_plot = q3_result.copy()
q3_plot["payment_label"] = q3_plot["payment_type"].map(PAYMENT_LABELS).fillna("Other")
fig4 = px.pie(
    q3_plot, names="payment_label", values="trip_count",
    title="Payment Type Breakdown",
)
fig4.show()


# Chart 5 — Trips by day of week and hour (heatmap)
pivot = (
    df_featured.groupby(["pickup_day_of_week", "pickup_hour"])
    .size()
    .reset_index(name="trip_count")
)
DAY_ORDER = ["Monday", "Tuesday", "Wednesday", "Thursday", "Friday", "Saturday", "Sunday"]
fig5 = px.density_heatmap(
    pivot, x="pickup_hour", y="pickup_day_of_week",
    z="trip_count", category_orders={"pickup_day_of_week": DAY_ORDER},
    title="Trip Volume by Day of Week and Hour",
    labels={"pickup_hour": "Hour", "pickup_day_of_week": "Day", "trip_count": "Trips"},
    color_continuous_scale="YlOrRd",
)
fig5.show()

In [None]:
# Used Plotly Express to test the charts inside the notebook before they go into the Streamlit app. 
# They use the query results from Q1 to Q5.
# I made Payment Type Breakdown a pie chart just for the sakes of learning the code for pie chart even 
# though data scientist don't like pie chart usage.
# Using Claude: The heatmap at the end requires one extra step — a groupby on the full DataFrame to create a pivot-style summary of trips per day-hour combination, which is then passed to px.density_heatmap.

In [None]:
# AI Tools Used: 
# Chatgpt was used to read through the assignment PDF to summarize and help with my 
# planning. It was also used to get the streamlit URl up.
# Claude was used in certain areas of coding heavily, those areas are explained in the comments throughout
# the notebook. It was also used for debugging my code if I was unable to figure it out myself.
# Claude was used more the app.py Dashboard design to speed up the process. I tried to do the requirements
# for the assignment myself, but if I was struggling I would use claude to teach me how it should be done.