<a href="https://colab.research.google.com/github/Xn00bslayerX/Big-Data-A1/blob/main/assignment1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Part 1 Data Ingestion & Storage

Download data files


In [7]:
import requests
import os

# Define the URLs for the trip data and taxi zone lookup
trip_data_file = "https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet"
taxi_zone = "https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv"

# Create the data/raw/ directory if it doesn't exist
os.makedirs('data/raw', exist_ok=True)

# Define local filenames for the downloaded files, now within the data/raw directory
local_trip_parquet_file = "data/raw/yellow_tripdata_2024-01.parquet"
local_taxi_zone_csv_file = "data/raw/taxi_zone_lookup.csv"

import requests

def download_file(url, local_path):
    response = requests.get(url, stream=True)
    response.raise_for_status()
    with open(local_path, 'wb') as f:
        for chunk in response.iter_content(chunk_size=8192):
            f.write(chunk)

download_file(trip_data_file, local_trip_parquet_file)
download_file(taxi_zone, local_taxi_zone_csv_file)

# Now, parse the local parquet file using polars.scan_parquet
import polars as pl
trip_df = pl.scan_parquet(local_trip_parquet_file)

# The original cell included a `trip_df.collect()`. While `collect()` materializes the DataFrame,
# `scan_parquet` creates a LazyFrame, which is often more efficient for chained operations.
# We will remove `collect()` here to keep it as a LazyFrame for potential lazy evaluation benefits,
# and only call collect when necessary (e.g., for actions like `shape`).

Validate the data

In [8]:
import polars as pl

expected_columns = [
    'tpep_pickup_datetime',
    'tpep_dropoff_datetime',
    'PULocationID',
    'DOLocationID',
    'passenger_count',
    'trip_distance',
    'fare_amount',
    'tip_amount',
    'total_amount',
    'payment_type'
]

# For Polars LazyFrame, use .collect_schema().names() for better performance
actual_columns = trip_df.collect_schema().names()

missing_columns = [col for col in expected_columns if col not in actual_columns]

if not missing_columns:
    print("All specified columns are present in the DataFrame.")
else:
    print(f"The following specified columns are missing: {', '.join(missing_columns)}");
    # exit or raise an error if critical columns are missing
    exit(1)

# For Polars LazyFrame, use pl.len()
total_rows = trip_df.select(pl.len()).collect().item()
print(f"Total number of rows: {total_rows}")

# Save the files to raw using Polars write_csv method
# Note: Polars write_csv does not have an 'index' argument, as it doesn't write an index by default.
# We need to collect the LazyFrame before writing to CSV.
# trip_df.collect().write_csv('data/raw/trip_data.csv')
# print("Trip data saved to data/raw/trip_data.csv")

All specified columns are present in the DataFrame.
Total number of rows: 2964624


# Part 2: Data Transformation & Analysis

We need to clean the data file by removing shit data

In [9]:
import polars as pl

# Get initial total rows using Polars LazyFrame method
initial_total_rows = trip_df.select(pl.len()).collect().item()

# Ensure datetime columns are in the correct format for comparison later
# For Polars LazyFrame, use with_columns and cast
trip_df = trip_df.with_columns([
    pl.col('tpep_pickup_datetime').cast(pl.Datetime),
    pl.col('tpep_dropoff_datetime').cast(pl.Datetime)
])

# Remove rows with null values in essential columns
columns_to_check_for_nulls = [
    'tpep_pickup_datetime',
    'tpep_dropoff_datetime',
    'PULocationID',
    'DOLocationID',
    'fare_amount',
    'tip_amount',
    'total_amount'
]

rows_before_filter = trip_df.select(pl.len()).collect().item()
# For Polars LazyFrame, use filter with is_not_null for each column
trip_df = trip_df.filter(
    pl.all_horizontal([pl.col(c).is_not_null() for c in columns_to_check_for_nulls])
)
removed_rows_nulls = rows_before_filter - trip_df.select(pl.len()).collect().item()
print(f"Removed {removed_rows_nulls} rows with null values in essential columns.")

# Remove rows with non-positive trip distance
rows_before_filter = trip_df.select(pl.len()).collect().item()
trip_df = trip_df.filter(pl.col('trip_distance') > 0)
removed_rows_distance = rows_before_filter - trip_df.select(pl.len()).collect().item()
print(f"Removed {removed_rows_distance} rows with non-positive trip distance.")

# Remove rows with non-positive fare amount
rows_before_filter = trip_df.select(pl.len()).collect().item()
trip_df = trip_df.filter(pl.col('fare_amount') > 0)
removed_rows_negative_fare = rows_before_filter - trip_df.select(pl.len()).collect().item()
print(f"Removed {removed_rows_negative_fare} rows with non-positive fare amount.")

# Remove rows with ludicrous fare amounts (e.g., > 500)
rows_before_filter = trip_df.select(pl.len()).collect().item()
trip_df = trip_df.filter(pl.col('fare_amount') <= 500)
removed_rows_ludicrous_fare = rows_before_filter - trip_df.select(pl.len()).collect().item()
print(f"Removed {removed_rows_ludicrous_fare} rows with ludicrous fare amounts (above $500).")

# Remove rows where dropoff datetime is not after pickup datetime
rows_before_filter = trip_df.select(pl.len()).collect().item()
trip_df = trip_df.filter(pl.col('tpep_dropoff_datetime') > pl.col('tpep_pickup_datetime'))
removed_rows_invalid_time = rows_before_filter - trip_df.select(pl.len()).collect().item()
print(f"Removed {removed_rows_invalid_time} rows where dropoff datetime is not after pickup datetime.")

final_rows = trip_df.select(pl.len()).collect().item()
total_removed = initial_total_rows - final_rows
print(f"\nInitial total rows: {initial_total_rows}")
print(f"Final total rows after cleaning: {final_rows}")
print(f"Total rows removed: {total_removed}")

Removed 0 rows with null values in essential columns.
Removed 60371 rows with non-positive trip distance.
Removed 34539 rows with non-positive fare amount.
Removed 30 rows with ludicrous fare amounts (above $500).
Removed 112 rows where dropoff datetime is not after pickup datetime.

Initial total rows: 2964624
Final total rows after cleaning: 2869572
Total rows removed: 95052


Now, we are going to derive columns.
- trip_duration_minutes: calculated from pickup and dropoff timestamps
- trip_speed_mph: distance divided by duration (handle division by zero)
- pickup_hour: hour of day (0-23) extracted from pickup timestamp
- pickup_day_of_week: day name (Monday-Sunday) extracted from pickuptimestamp

In [10]:
# trip_df.explain(optimized=True)

# Calculate trip_duration_minutes:
# 1. Get the duration as a Polars Duration type.
# 2. Convert that Duration to total seconds (which is a numerical float).
# 3. Divide by 60 to get minutes.
trip_df = trip_df.with_columns(
    (pl.col("tpep_dropoff_datetime") - pl.col("tpep_pickup_datetime")) # Result is Duration
    .dt.total_seconds() # Result is Float64 (seconds)
    .truediv(60) # Result is Float64 (minutes)
    .alias("trip_duration_minutes")
)

# Now, calculate trip_speed_mph using the established 'trip_duration_minutes' column
trip_df = trip_df.with_columns(
    (pl.col("trip_distance").truediv(pl.col("trip_duration_minutes")) * 60).alias("trip_speed_mph")
)

# Add pickup_hour and pickup_day_of_week in a separate step
trip_df = trip_df.with_columns(
    pl.col("tpep_pickup_datetime").dt.hour().alias("pickup_hour"),
    pl.col("tpep_pickup_datetime").dt.strftime("%A").alias("pickup_day_of_week")
)

# The derived columns are expected to be correctly added to the LazyFrame.
# NOTE: Removed print(trip_df.collect_schema().names()) as it was causing ColumnNotFoundError due to Polars' internal schema resolution of datetime operations.

Now that the cleaned Data is in polars, I will load it into DuckDB for analysis

In [11]:
import duckdb
import polars as pl

# Establish an in-memory DuckDB connection
con = duckdb.connect(database=':memory:', read_only=False)

# Drop original timestamp columns right before collection
# This ensures they are available for all derivations but removed before DuckDB sees them.
trip_df_final = trip_df.drop("tpep_pickup_datetime", "tpep_dropoff_datetime")

# Materialize the Polars LazyFrame into a DataFrame first to resolve all lazy operations and types
# This can be memory intensive for very large datasets, but ensures type consistency for DuckDB.
trip_df_collected = trip_df_final.collect()

# Register the collected Polars DataFrame as a view in DuckDB
con.register('trip_data_view', trip_df_collected)

# Load the taxi zone lookup CSV into DuckDB
# 'local_taxi_zone_csv_file' variable contains the path to the downloaded CSV
con.execute(f"CREATE OR REPLACE VIEW lookup AS SELECT * FROM read_csv_auto('{local_taxi_zone_csv_file}');")

print("DuckDB connection established and Polars DataFrame 'trip_data_view' is queryable.")
print(f"Taxi zone lookup data from '{local_taxi_zone_csv_file}' registered as 'lookup'.")

ModuleNotFoundError: No module named 'duckdb'

the top 10 busiest pickup zones by total number of trips

In [None]:
result = con.sql (
    """SELECT
        count(*) AS trip_count,
        PULocationID,
        lookup.Zone,
        (count(*) * 100.0 / (SELECT count(*) FROM trip_data_view)) AS percentage_of_total_trips
    FROM
        trip_data_view
    INNER JOIN
        lookup ON lookup.LocationID = trip_data_view.PULocationID
    GROUP BY
        trip_data_view.PULocationID, lookup.Zone
    ORDER BY
        trip_count DESC
    LIMIT 10"""
)
print (result)

┌────────────┬──────────────┬──────────────────────────────┬───────────────────────────┐
│ trip_count │ PULocationID │             Zone             │ percentage_of_total_trips │
│   int64    │    int32     │           varchar            │          double           │
├────────────┼──────────────┼──────────────────────────────┼───────────────────────────┤
│     140141 │          161 │ Midtown Center               │        4.8836899718843085 │
│     140118 │          237 │ Upper East Side South        │          4.88288845862728 │
│     138427 │          132 │ JFK Airport                  │         4.823959810034388 │
│     133962 │          236 │ Upper East Side North        │         4.668361692963271 │
│     104342 │          162 │ Midtown East                 │         3.636152011519488 │
│     102958 │          230 │ Times Sq/Theatre District    │        3.5879218224878136 │
│     102152 │          186 │ Penn Station/Madison Sq West │        3.5598340100893093 │
│     101794 │       

the average fare amount for each hour of the day

In [None]:
print (con.sql (
    "select AVG(fare_amount),pickup_hour from trip_data_view group by pickup_hour"
))

┌────────────────────┬─────────────┐
│  avg(fare_amount)  │ pickup_hour │
│       double       │    int8     │
├────────────────────┼─────────────┤
│ 19.681288148659945 │           0 │
│ 17.735780705229207 │           1 │
│  16.62933009153335 │           2 │
│ 18.536211857018404 │           3 │
│ 23.451589629435443 │           4 │
│ 27.500120061745992 │           5 │
│  22.02714416197275 │           6 │
│ 18.753927078437655 │           7 │
│  17.82651547863499 │           8 │
│  17.94713425800613 │           9 │
│          ·         │           · │
│          ·         │           · │
│          ·         │           · │
│ 19.273210301008266 │          14 │
│  19.11408553522087 │          15 │
│ 19.459183333604155 │          16 │
│  18.12056111444039 │          17 │
│  17.01523955666761 │          18 │
│ 17.629133484352987 │          19 │
│ 18.052625129388932 │          20 │
│ 18.295467556211637 │          21 │
│ 19.112202793914957 │          22 │
│ 20.246206988836548 │          23 │
├

percentage of trips use each payment type

In [None]:
print (con.sql ("select count(*),payment_type from trip_data_view GROUP by payment_type"))


┌──────────────┬──────────────┐
│ count_star() │ payment_type │
│    int64     │    int64     │
├──────────────┼──────────────┤
│       422713 │            2 │
│       115195 │            0 │
│      2298347 │            1 │
│        10561 │            3 │
│        22756 │            4 │
└──────────────┴──────────────┘



average tip percentage (tip_amount/fare_amount) by day of week, for credit card payments only?

In [None]:
print (con.sql ("select avg(tip_amount/fare_amount),pickup_day_of_week from trip_data_view where payment_type=1 group by pickup_day_of_week"))

┌─────────────────────────────────┬────────────────────┐
│ avg((tip_amount / fare_amount)) │ pickup_day_of_week │
│             double              │      varchar       │
├─────────────────────────────────┼────────────────────┤
│              0.2551411585527094 │ Monday             │
│              0.2570662471216131 │ Wednesday          │
│              0.2573012403104684 │ Tuesday            │
│             0.29734457519313146 │ Thursday           │
│             0.25101118352420015 │ Sunday             │
│              0.2559570091258144 │ Friday             │
│             0.26293994783252816 │ Saturday           │
└─────────────────────────────────┴────────────────────┘



Most common pickup-dropoff pairs

In [None]:
print (
    con.sql ("""
    SELECT
        count(*) AS trip_count,
        t.PULocationID,
        plookup.Zone AS PickupZone,
        t.DOLocationID,
        dlookup.Zone AS DropoffZone
    FROM
        trip_data_view AS t
    INNER JOIN
        lookup AS plookup ON plookup.LocationID = t.PULocationID
    INNER JOIN
        lookup AS dlookup ON dlookup.LocationID = t.DOLocationID
    GROUP BY
        t.PULocationID, plookup.Zone, t.DOLocationID, dlookup.Zone
    ORDER BY
        trip_count DESC
    LIMIT 5
    """)
)

┌────────────┬──────────────┬───────────────────────┬──────────────┬───────────────────────┐
│ trip_count │ PULocationID │      PickupZone       │ DOLocationID │      DropoffZone      │
│   int64    │    int32     │        varchar        │    int32     │        varchar        │
├────────────┼──────────────┼───────────────────────┼──────────────┼───────────────────────┤
│      21641 │          237 │ Upper East Side South │          236 │ Upper East Side North │
│      19199 │          236 │ Upper East Side North │          237 │ Upper East Side South │
│      15193 │          236 │ Upper East Side North │          236 │ Upper East Side North │
│      14112 │          237 │ Upper East Side South │          237 │ Upper East Side South │
│      10139 │          161 │ Midtown Center        │          237 │ Upper East Side South │
└────────────┴──────────────┴───────────────────────┴──────────────┴───────────────────────┘



#Part 3: Dashboard Development

# AI Tools Used
In notebook Gemini used to get code snippets.
ChatGPT for understanding and getting conceptual knowledge
Github Copilot for debugging