# COMP 3610: Big Data Analytics
## Assignment 1

Build an end-to-end data pipeline that ingests, transforms, and
analyzes the NYC Yellow Taxi Trip dataset, culminating in an interactive visualization
dashboard. This assignment integrates the skills covered in weeks 1-3 of the course: Python
data engineering, SQL querying, and data visualization.

### Part 1: Data Ingestion

#### Step 1: Download Data Files
Download `taxi_zone_lookup.csv` and `yellow_tripdata_2024-01.parquet` using the python `requests` library.

In [None]:
import os
import requests
import polars as pl
# Using pyarrow.parquet to read only the metadata from the file
import pyarrow.parquet as pq
import pyarrow as pa
import duckdb

DATA_DIR: str = './data/raw'

# Define the request URLs to send the GET requests to:
YELLOW_TRIP_DATA_URL: str = 'https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2024-01.parquet'
YELLOW_TRIP_FILENAME: str = 'yellow_tripdata_2024-01.parquet'

TAXI_ZONE_LOOKUP_URL: str = 'https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv'
TAXI_ZONE_FILENAME: str   = 'taxi_zone_lookup.csv'

# Ensure the data directory exists
os.makedirs(DATA_DIR, exist_ok=True)

# Fetch taxi_zone_lookup.csv
try:
    r = requests.get(TAXI_ZONE_LOOKUP_URL)
    r.raise_for_status()

    with open(f'{DATA_DIR}/taxi_zone_lookup.csv', 'wb') as f:
        f.write(r.content)

    print('Successfully downloaded taxi_zone_lookup.csv')
except requests.RequestException as e:
    print(f'Failed to fetch taxi_zone_lookup.csv: {e}')

# Fetch yellow_tripdata_2024-01.parquet (using data streaming)
try:
    r = requests.get(YELLOW_TRIP_DATA_URL, stream=True)
    r.raise_for_status()

    with open(f'{DATA_DIR}/yellow_tripdata_2024-01.parquet', 'wb') as f:
        for chunk in r.iter_content(chunk_size=8192):
            if chunk:
                f.write(chunk)

    print('Successfully downloaded yellow_tripdata_2024-01.parquet')
except requests.RequestException as e:
    print(f'Failed to fetch yellow_tripdata_2024-01.parquet: {e}')


Successfully downloaded taxi_zone_lookup.csv
Successfully downloaded yellow_tripdata_2024-01.parquet


#### Step 2: Data Validation

Validate the downloaded dataset against the expected schema below.

##### Schema

| Column | Description |
|--------|-------------|
| `tpep_pickup_datetime` | Timestamp when the meter was engaged |
| `tpep_dropoff_datetime` | Timestamp when the meter was disengaged |
| `PULocationID` | TLC Taxi Zone ID for pickup location (join with lookup table) |
| `DOLocationID` | TLC Taxi Zone ID for dropoff location (join with lookup table) |
| `passenger_count` | Number of passengers (driver-entered) |
| `trip_distance` | Trip distance in miles (from taximeter) |
| `fare_amount` | Time-and-distance fare calculated by the meter |
| `tip_amount` | Tip amount (auto-populated for credit card payments only) |
| `total_amount` | Total amount charged to passengers (excludes cash tips) |
| `payment_type` | 1=Credit card, 2=Cash, 3=No charge, 4=Dispute, 5=Unknown |

##### Requirements

- Verify all expected columns exist in the dataset
- Check that date columns are valid datetime types
- Report total row count and print a summary to the console
- Raise an exception or exit with an error message if validation fails

In [20]:
# Read yellow trip parquet file
yt_file = pq.ParquetFile(os.path.join(DATA_DIR, YELLOW_TRIP_FILENAME))

# Report row and column counts
print(f'Rows: {yt_file.metadata.num_rows} row/s\nColumns: {yt_file.metadata.num_columns} column/s')

# Verify yellow trip data schema
try:
    required_columns = [
        'tpep_pickup_datetime', 
        'tpep_dropoff_datetime', 
        'PULocationID', 
        'DOLocationID', 
        'passenger_count', 
        'trip_distance',
        'fare_amount', 
        'tip_amount', 
        'total_amount', 
        'payment_type'
        ]
    
    schema = yt_file.schema_arrow

    # Verify that the schema contains the column
    for col in required_columns:
        if col not in schema.names:
            raise IndexError(f'column "{col}" could not be found in the schema')
        
    # Verify that the datetime columns are actually datetime types
    for col in required_columns[:2]:
        field = schema.field(col)
        if not pa.types.is_timestamp(field.type):
            raise TypeError(f'"{col}" is not a datetime type, got {field.type}')
    
    print(f'Validated {YELLOW_TRIP_FILENAME} successfully!')

except Exception as e:
    print(f'Invalid schema for yellow trip data: {e}')


Rows: 2964624 row/s
Columns: 19 column/s
Validated yellow_tripdata_2024-01.parquet successfully!


### Part 2: Data Transformation & Analysis

##### Data Cleaning & Feature Engineering

Using `polars`, remove all rows with null values in critical columns (`tpep_pickup_datetime`, `tpep_dropoff_datetime`, `PULocationID`, `DOLocationID`, `fare_amount`). Also invalid trips should be filtered, i.e.,
- `trip_distance <= 0`
- `fare_amount < 0 || fare_amount > 500.0`
- `tpep_dropoff_datetime < tpep_pickup_datetime`

Additionally, create 4 new derived columns, i.e.,
- `trip_speed_mph` = `trip_distance` / (`trip_duration_minutes` / 60)
- `pickup_hour` = `tpep_pickup_datetime`.hour
- `pickup_day_of_week` = `tpep_pickup_datetime`.weekday

In [21]:
df = pl.scan_parquet(os.path.join(DATA_DIR, YELLOW_TRIP_FILENAME))
initial_row_count = yt_file.metadata.num_rows

# Filter out invalid trips
df = df.filter(
    ~(
        (pl.col('trip_distance') <= 0) |
        (pl.col('fare_amount') < 0) |
        (pl.col('fare_amount') > 500.0) |
        (pl.col('tpep_dropoff_datetime') < pl.col('tpep_pickup_datetime'))
    )
).with_columns(
    (pl.col('tpep_dropoff_datetime') - pl.col('tpep_pickup_datetime')).dt.total_minutes().alias('trip_duration_minutes'),
).with_columns(
    (pl.col('trip_distance') / (pl.col('trip_duration_minutes') / 60)).alias('trip_speed_mph'),
    pl.col('tpep_pickup_datetime').dt.hour().alias('pickup_hour'),
    pl.col('tpep_pickup_datetime').dt.to_string('%A').alias('pickup_day_of_week'),
)

df = df.collect()
final_row_count = df.shape[0]

print(f'Initial rows:              {initial_row_count}')
print(f'Total rows removed:        {initial_row_count - final_row_count}')
print(f'Remaining rows:            {final_row_count}')

Initial rows:              2964624
Total rows removed:        94522
Remaining rows:            2870102


##### 1. What are the top 10 busiest pickup zones by total number of trips?

In [None]:
zones = pl.read_csv(os.path.join(DATA_DIR, TAXI_ZONE_FILENAME))

duckdb.sql("""
    SELECT z.Zone as zone, COUNT(*) as trip_count 
    FROM df 
    JOIN zones z ON df.PULocationID = z.LocationID
    GROUP BY PULocationID, z.Zone 
    ORDER BY trip_count DESC
    LIMIT 10
""")

┌──────────────────────────────┬────────────┐
│             zone             │ trip_count │
│           varchar            │   int64    │
├──────────────────────────────┼────────────┤
│ Midtown Center               │     140161 │
│ Upper East Side South        │     140134 │
│ JFK Airport                  │     138478 │
│ Upper East Side North        │     133975 │
│ Midtown East                 │     104356 │
│ Times Sq/Theatre District    │     102972 │
│ Penn Station/Madison Sq West │     102161 │
│ Lincoln Square East          │     101800 │
│ LaGuardia Airport            │      87715 │
│ Upper West Side South        │      86475 │
├──────────────────────────────┴────────────┤
│ 10 rows                         2 columns │
└───────────────────────────────────────────┘

The top 10 pick up zone are `Midtown Center`, `Upper East Side South`, `JFK Airport`, `Upper East Side North`, `Midtown East`, `Times Sq/Theatre District`, `Pen Station/Madison Sq West`, `Lincoln Square East`, `LaGuardia Airport`, and `Upper West Side South`.

##### 2. What is the average fare amount for each hour of the day?

In [40]:
duckdb.sql("""
    SELECT PRINTF('$%.2f', AVG(CAST(fare_amount AS float))) as average_fare_amount, pickup_hour
    FROM df
    GROUP BY pickup_hour
    ORDER BY pickup_hour ASC
""")

┌─────────────────────┬─────────────┐
│ average_fare_amount │ pickup_hour │
│       varchar       │    int8     │
├─────────────────────┼─────────────┤
│ $19.68              │           0 │
│ $17.73              │           1 │
│ $16.62              │           2 │
│ $18.53              │           3 │
│ $23.44              │           4 │
│ $27.49              │           5 │
│ $22.03              │           6 │
│ $18.75              │           7 │
│ $17.82              │           8 │
│ $17.94              │           9 │
│   ·                 │           · │
│   ·                 │           · │
│   ·                 │           · │
│ $19.27              │          14 │
│ $19.11              │          15 │
│ $19.46              │          16 │
│ $18.12              │          17 │
│ $17.01              │          18 │
│ $17.63              │          19 │
│ $18.05              │          20 │
│ $18.29              │          21 │
│ $19.11              │          22 │
│ $20.24    