# Comprehensive NYC Taxi Analysis — Part 2

Twelve extension modules focusing on temporal, spatial, behavioral, and anomaly dimensions. Data source: curated S3 parquet (`nyc-yellowcab-data-as-2025/tlc/curated/...`). All plots save to `data/local_output/analytics/`.

In [1]:
import os
import sys
from pathlib import Path
from io import BytesIO
import re
from datetime import datetime
from collections import defaultdict

import pandas as pd
import numpy as np
import boto3
import matplotlib.pyplot as plt
import seaborn as sns
from scipy import stats
from sklearn.ensemble import IsolationForest
from sklearn.cluster import KMeans

from dotenv import load_dotenv
load_dotenv(Path('../.env'))

BUCKET = 'nyc-yellowcab-data-as-2025'
s3_client = boto3.client('s3')

plt.style.use('seaborn-v0_8-darkgrid')
sns.set_palette('husl')
plt.rcParams['figure.figsize'] = (14, 6)
plt.rcParams['font.size'] = 10

print(f"AWS Region: {os.getenv('AWS_DEFAULT_REGION')}")
print(f"Bucket: {BUCKET}")
print("✓ Libraries loaded")

ModuleNotFoundError: No module named 'sklearn'

In [None]:
def list_curated_files(cab_type):
    prefix = f'tlc/curated/{cab_type}/'
    paginator = s3_client.get_paginator('list_objects_v2')
    keys = []
    for page in paginator.paginate(Bucket=BUCKET, Prefix=prefix):
        for obj in page.get('Contents', []):
            if obj['Key'].endswith('.parquet'):
                keys.append(obj['Key'])
    return keys


def read_parquet_from_s3(key, columns=None):
    resp = s3_client.get_object(Bucket=BUCKET, Key=key)
    return pd.read_parquet(BytesIO(resp['Body'].read()), columns=columns)


def extract_year_month_day_hour(df, pickup_col):
    if pickup_col not in df.columns:
        return df
    ts = pd.to_datetime(df[pickup_col])
    df = df.copy()
    df['year'] = ts.dt.year
    df['month'] = ts.dt.month
    df['day'] = ts.dt.day
    df['hour'] = ts.dt.hour
    df['date'] = ts.dt.date
    return df


def sample_trips(cab_types, files_per_cab=2, rows_per_file=50000, columns=None):
    samples = []
    for cab in cab_types:
        keys = list_curated_files(cab)
        if not keys:
            print(f"⚠ No files for {cab}")
            continue
        chosen = np.random.choice(keys, min(files_per_cab, len(keys)), replace=False)
        for key in chosen:
            try:
                df = read_parquet_from_s3(key, columns=columns)
                df['cab_type'] = cab
                samples.append(df.sample(min(rows_per_file, len(df))))
            except Exception as e:
                print(f"⚠ Error reading {key}: {e}")
    if not samples:
        return pd.DataFrame()
    return pd.concat(samples, ignore_index=True)


def ensure_output_dir():
    outdir = Path('../data/local_output/analytics')
    outdir.mkdir(parents=True, exist_ok=True)
    return outdir

outdir = ensure_output_dir()
print(f"Output dir: {outdir}")

In [None]:
def pick_pickup_column(df):
    candidates = [
        'tpep_pickup_datetime',
        'lpep_pickup_datetime',
        'pickup_datetime',
        'pickup_datetime_utc'
    ]
    for c in candidates:
        if c in df.columns:
            return c
    return None

print("✓ Helper pick_pickup_column ready")

## Import Required Libraries and Load Data
Uses sampled trip-level data from curated S3 parquet. Sampling keeps memory low while preserving distribution shape. Adjust `files_per_cab` and `rows_per_file` if needed.

In [None]:
CAB_TYPES = ['yellow', 'green', 'fhv', 'fhvhv']
BASE_COLUMNS = [
    'tpep_pickup_datetime', 'lpep_pickup_datetime', 'pickup_datetime', 'pickup_datetime_utc',
    'PULocationID', 'DOLocationID', 'borough', 'fare_amount', 'tip_amount', 'total_amount',
    'trip_distance', 'passenger_count', 'trip_duration_seconds'
]

print("Sampling trip data (may take 1-2 minutes)...")
trips_sample = sample_trips(CAB_TYPES, files_per_cab=2, rows_per_file=40000, columns=None)
if trips_sample.empty:
    print("⚠ No sampled data; downstream cells will skip.")
else:
    pickup_col = pick_pickup_column(trips_sample)
    if pickup_col:
        trips_sample = extract_year_month_day_hour(trips_sample, pickup_col)
        trips_sample['pickup_hour'] = trips_sample['hour']
        print(f"Using pickup column: {pickup_col}")
    else:
        print("⚠ No pickup timestamp column found.")
    print(trips_sample.head())

trips_sample.shape

In [None]:
OUTPUT_DIR = str(ensure_output_dir())
print(f"OUTPUT_DIR set to {OUTPUT_DIR}")

## Analysis 1: Hour-of-Day Demand Patterns
- Compute pickup counts by hour for each cab type.
- Plot line charts to compare temporal profiles.
- Save figures under `../data/local_output/analytics/`.


In [None]:
if trips_sample.empty or 'pickup_hour' not in trips_sample.columns:
    print('Skipping: no sampled data or missing pickup_hour')
else:
    ensure_output_dir()
    hour_counts = trips_sample.groupby(['cab_type', 'pickup_hour']).size().reset_index(name='trips')
    plt.figure(figsize=(10,6))
    sns.lineplot(data=hour_counts, x='pickup_hour', y='trips', hue='cab_type', marker='o')
    plt.title('Hourly Demand by Cab Type')
    plt.xlabel('Hour of Day')
    plt.ylabel('Trips')
    plt.xticks(range(0,24))
    plt.tight_layout()
    out_path = os.path.join(OUTPUT_DIR, 'analysis1_hourly_demand.png')
    plt.savefig(out_path)
    plt.show()
    print(f'Saved plot to {out_path}')
    hour_counts.head()

## Analysis 2: Tips vs Fare Distribution
- Compare tip percentages across cab types.
- Plot boxen/violin to visualize distribution.
- Save output figure to analytics folder.


In [None]:
if trips_sample.empty or 'fare_amount' not in trips_sample.columns or 'tip_amount' not in trips_sample.columns:
    print('Skipping: tip/fare columns missing or no data')
else:
    ensure_output_dir()
    tips_df = trips_sample.copy()
    tips_df = tips_df[(tips_df['fare_amount'] > 0) & (tips_df['tip_amount'] >= 0)]
    tips_df['tip_pct'] = (tips_df['tip_amount'] / tips_df['fare_amount']).clip(upper=1.0)
    plt.figure(figsize=(10,6))
    sns.violinplot(data=tips_df, x='cab_type', y='tip_pct', inner='quart', cut=0)
    plt.title('Tip Percentage by Cab Type')
    plt.xlabel('Cab Type')
    plt.ylabel('Tip % of Fare')
    plt.tight_layout()
    out_path = os.path.join(OUTPUT_DIR, 'analysis2_tip_pct_violin.png')
    plt.savefig(out_path)
    plt.show()
    print(f'Saved plot to {out_path}')
    tips_df[['cab_type','fare_amount','tip_amount','tip_pct']].head()

## Analysis 3: Zone-Level Demand
- Rank pickup zones by trip volume (top 20 overall).
- Compare composition by cab type.
- Save bar chart to analytics folder.


In [None]:
if trips_sample.empty or 'PULocationID' not in trips_sample.columns:
    print('Skipping: missing PULocationID or no data')
else:
    ensure_output_dir()
    zone_counts = (
        trips_sample.groupby(['PULocationID', 'cab_type'])
        .size()
        .reset_index(name='trips')
    )
    top_zones = zone_counts.groupby('PULocationID')['trips'].sum().nlargest(20).index
    top_df = zone_counts[zone_counts['PULocationID'].isin(top_zones)]
    plt.figure(figsize=(12,6))
    sns.barplot(data=top_df, x='PULocationID', y='trips', hue='cab_type')
    plt.title('Top 20 Pickup Zones by Cab Type')
    plt.xlabel('Pickup Zone (ID)')
    plt.ylabel('Trips')
    plt.xticks(rotation=60)
    plt.tight_layout()
    out_path = os.path.join(OUTPUT_DIR, 'analysis3_zone_demand.png')
    plt.savefig(out_path)
    plt.show()
    print(f'Saved plot to {out_path}')
    top_df.head()

## Analysis 4: Origin-Destination Flows
- Identify top pickup→dropoff pairs by trip volume.
- Show stacked bar for top 15 OD pairs.
- Save chart to analytics folder.


In [None]:
if trips_sample.empty or 'PULocationID' not in trips_sample.columns or 'DOLocationID' not in trips_sample.columns:
    print('Skipping: missing PULocationID/DOLocationID or no data')
else:
    ensure_output_dir()
    od = (
        trips_sample.groupby(['PULocationID', 'DOLocationID', 'cab_type'])
        .size()
        .reset_index(name='trips')
    )
    od['od_pair'] = od['PULocationID'].astype(str) + '→' + od['DOLocationID'].astype(str)
    top_pairs = od.groupby('od_pair')['trips'].sum().nlargest(15).index
    top_od = od[od['od_pair'].isin(top_pairs)]
    plt.figure(figsize=(14,6))
    sns.barplot(data=top_od, x='od_pair', y='trips', hue='cab_type')
    plt.title('Top 15 Origin-Destination Pairs')
    plt.xlabel('OD Pair (PU→DO)')
    plt.ylabel('Trips')
    plt.xticks(rotation=60)
    plt.tight_layout()
    out_path = os.path.join(OUTPUT_DIR, 'analysis4_od_pairs.png')
    plt.savefig(out_path)
    plt.show()
    print(f'Saved plot to {out_path}')
    top_od.head()

## Analysis 5: Congestion Proxy (Speed by Hour)
- Compute median speed (mph) by hour and cab type.
- Use trip_distance and trip_duration_seconds as inputs.
- Save line plot to analytics folder.


In [None]:
required_cols = {'trip_distance', 'trip_duration_seconds', 'hour', 'cab_type'}
if trips_sample.empty or not required_cols.issubset(set(trips_sample.columns)):
    print('Skipping: missing distance/duration/hour columns or no data')
else:
    ensure_output_dir()
    speed_df = trips_sample.copy()
    speed_df = speed_df[(speed_df['trip_distance'] > 0) & (speed_df['trip_duration_seconds'] > 0)]
    speed_df['speed_mph'] = speed_df['trip_distance'] / (speed_df['trip_duration_seconds'] / 3600)
    agg = speed_df.groupby(['cab_type', 'hour'])['speed_mph'].median().reset_index()
    plt.figure(figsize=(10,6))
    sns.lineplot(data=agg, x='hour', y='speed_mph', hue='cab_type', marker='o')
    plt.title('Median Speed by Hour (mph)')
    plt.xlabel('Pickup Hour')
    plt.ylabel('Median Speed (mph)')
    plt.xticks(range(0,24))
    plt.tight_layout()
    out_path = os.path.join(OUTPUT_DIR, 'analysis5_speed_by_hour.png')
    plt.savefig(out_path)
    plt.show()
    print(f'Saved plot to {out_path}')
    agg.head()

## Analysis 6: Airport Demand Patterns
- Filter trips touching airport zones (JFK/LGA/EWR).
- Show hourly pickup counts and cab-type mix.
- Save stacked line/area-style plot.


In [None]:
airport_zones = {132, 138, 1, 140}  # JFK, LGA, EWR, Newark area
if trips_sample.empty or not {'PULocationID', 'DOLocationID', 'hour'}.issubset(trips_sample.columns):
    print('Skipping: missing zone/hour columns or no data')
else:
    ensure_output_dir()
    airport_df = trips_sample[
        trips_sample['PULocationID'].isin(airport_zones) | trips_sample['DOLocationID'].isin(airport_zones)
    ].copy()
    if airport_df.empty:
        print('No airport-touching trips in sample')
    else:
        hourly = airport_df.groupby(['cab_type', 'hour']).size().reset_index(name='trips')
        plt.figure(figsize=(10,6))
        sns.lineplot(data=hourly, x='hour', y='trips', hue='cab_type', marker='o')
        plt.title('Airport-Related Trips by Hour')
        plt.xlabel('Hour of Day')
        plt.ylabel('Trips touching airports')
        plt.xticks(range(0,24))
        plt.tight_layout()
        out_path = os.path.join(OUTPUT_DIR, 'analysis6_airport_hourly.png')
        plt.savefig(out_path)
        plt.show()
        print(f'Saved plot to {out_path}')
        hourly.head()

## Analysis 7: Nightlife Hotspots (Late Hours)
- Filter pickups 8pm–4am.
- Rank top 15 pickup zones for nightlife window.
- Save bar chart to analytics folder.


In [None]:
if trips_sample.empty or not {'hour', 'PULocationID'}.issubset(trips_sample.columns):
    print('Skipping: missing hour/PULocationID or no data')
else:
    ensure_output_dir()
    nightlife_hours = set(range(20, 24)) | set(range(0, 5))
    night_df = trips_sample[trips_sample['hour'].isin(nightlife_hours)].copy()
    if night_df.empty:
        print('No nightlife-window trips in sample')
    else:
        top_night = (
            night_df.groupby(['PULocationID', 'cab_type'])
            .size()
            .reset_index(name='trips')
        )
        top_ids = top_night.groupby('PULocationID')['trips'].sum().nlargest(15).index
        plot_df = top_night[top_night['PULocationID'].isin(top_ids)]
        plt.figure(figsize=(12,6))
        sns.barplot(data=plot_df, x='PULocationID', y='trips', hue='cab_type')
        plt.title('Nightlife Pickup Hotspots (20:00-04:00)')
        plt.xlabel('Pickup Zone (ID)')
        plt.ylabel('Trips')
        plt.xticks(rotation=60)
        plt.tight_layout()
        out_path = os.path.join(OUTPUT_DIR, 'analysis7_nightlife_hotspots.png')
        plt.savefig(out_path)
        plt.show()
        print(f'Saved plot to {out_path}')
        plot_df.head()

## Analysis 8: Outlier Detection (Isolation Forest)
- Use trip_distance, trip_duration_seconds, total_amount as features.
- Flag anomalies per cab type; visualize scatter.
- Save plot to analytics folder.


In [None]:
feature_cols = ['trip_distance', 'trip_duration_seconds', 'total_amount']
if trips_sample.empty or not set(feature_cols).issubset(trips_sample.columns):
    print('Skipping: missing distance/duration/total_amount or no data')
else:
    ensure_output_dir()
    plot_sample = trips_sample[feature_cols + ['cab_type']].dropna().copy()
    plot_sample = plot_sample[(plot_sample['trip_distance'] > 0) & (plot_sample['trip_duration_seconds'] > 0)]
    if plot_sample.empty:
        print('No usable rows for outlier detection')
    else:
        plot_sample = plot_sample.sample(min(len(plot_sample), 5000), random_state=42)
        anomalies = []
        for cab, sub in plot_sample.groupby('cab_type'):
            iso = IsolationForest(n_estimators=100, contamination=0.02, random_state=42)
            preds = iso.fit_predict(sub[feature_cols])
            tmp = sub.copy()
            tmp['anomaly'] = np.where(preds == -1, 'anomaly', 'normal')
            anomalies.append(tmp)
        scored = pd.concat(anomalies, ignore_index=True)
        plt.figure(figsize=(10,6))
        sns.scatterplot(data=scored, x='trip_distance', y='total_amount', hue='anomaly', style='cab_type', alpha=0.6)
        plt.title('Isolation Forest Anomalies (sampled)')
        plt.xlabel('Trip Distance (miles)')
        plt.ylabel('Total Amount ($)')
        plt.tight_layout()
        out_path = os.path.join(OUTPUT_DIR, 'analysis8_outliers.png')
        plt.savefig(out_path)
        plt.show()
        print(f'Saved plot to {out_path}')
        scored['anomaly'].value_counts()

## Analysis 9: Trip Pattern Clustering
- Cluster trips by distance, duration, and cost.
- Use KMeans (k=4) on a sample; label clusters.
- Save scatter plot with cluster hues.


In [None]:
cluster_cols = ['trip_distance', 'trip_duration_seconds', 'total_amount']
if trips_sample.empty or not set(cluster_cols).issubset(trips_sample.columns):
    print('Skipping: missing columns for clustering or no data')
else:
    ensure_output_dir()
    cluster_df = trips_sample[cluster_cols + ['cab_type']].dropna().copy()
    cluster_df = cluster_df[(cluster_df['trip_distance'] > 0) & (cluster_df['trip_duration_seconds'] > 0)]
    if cluster_df.empty:
        print('No usable rows for clustering')
    else:
        cluster_df = cluster_df.sample(min(len(cluster_df), 5000), random_state=7)
        X = np.log1p(cluster_df[cluster_cols])
        kmeans = KMeans(n_clusters=4, random_state=7, n_init='auto')
        cluster_df['cluster'] = kmeans.fit_predict(X)
        plt.figure(figsize=(10,6))
        sns.scatterplot(data=cluster_df, x='trip_distance', y='total_amount', hue='cluster', style='cab_type', alpha=0.6)
        plt.title('Trip Pattern Clusters (log-scaled features)')
        plt.xlabel('Trip Distance (miles)')
        plt.ylabel('Total Amount ($)')
        plt.tight_layout()
        out_path = os.path.join(OUTPUT_DIR, 'analysis9_clusters.png')
        plt.savefig(out_path)
        plt.show()
        print(f'Saved plot to {out_path}')
        cluster_df[['trip_distance','trip_duration_seconds','total_amount','cluster','cab_type']].head()

## Analysis 10: Price per Mile vs Hour (Surge-Like Signal)
- Compute unit price = total_amount / trip_distance.
- Track median unit price by hour and cab type.
- Save line chart to analytics folder.


In [None]:
price_cols = {'trip_distance', 'total_amount', 'hour', 'cab_type'}
if trips_sample.empty or not price_cols.issubset(trips_sample.columns):
    print('Skipping: missing distance/total/hour columns or no data')
else:
    ensure_output_dir()
    price_df = trips_sample.copy()
    price_df = price_df[price_df['trip_distance'] > 0]
    if price_df.empty:
        print('No positive-distance rows')
    else:
        price_df['unit_price'] = (price_df['total_amount'] / price_df['trip_distance']).clip(upper=150)
        hourly_price = price_df.groupby(['cab_type', 'hour'])['unit_price'].median().reset_index()
        plt.figure(figsize=(10,6))
        sns.lineplot(data=hourly_price, x='hour', y='unit_price', hue='cab_type', marker='o')
        plt.title('Median Price per Mile by Hour')
        plt.xlabel('Hour of Day')
        plt.ylabel('Median $/mile')
        plt.xticks(range(0,24))
        plt.tight_layout()
        out_path = os.path.join(OUTPUT_DIR, 'analysis10_price_per_mile.png')
        plt.savefig(out_path)
        plt.show()
        print(f'Saved plot to {out_path}')
        hourly_price.head()

## Analysis 11: Weather Correlation (if available)
- Check for weather columns (e.g., `temp_f`, `precipitation_inches`).
- Correlate daily trip volume with weather metrics.
- Save scatter/regression plot when data exists.


In [None]:
weather_cols = ['temp_f', 'precipitation_inches', 'wind_mph', 'snow_inches']
if trips_sample.empty or 'date' not in trips_sample.columns:
    print('Skipping: no date column to aggregate by')
else:
    available_weather = [c for c in weather_cols if c in trips_sample.columns]
    if not available_weather:
        print('Weather columns not found; add weather joins to enable this analysis.')
    else:
        ensure_output_dir()
        daily_trips = trips_sample.groupby('date').size().reset_index(name='trips')
        weather_daily = trips_sample.groupby('date')[available_weather].mean().reset_index()
        merged = pd.merge(daily_trips, weather_daily, on='date', how='inner')
        if merged.empty:
            print('No merged weather rows')
        else:
            metric = available_weather[0]
            plt.figure(figsize=(8,5))
            sns.regplot(data=merged, x=metric, y='trips', scatter_kws={'alpha':0.5})
            plt.title(f'Daily Trips vs {metric}')
            plt.xlabel(metric)
            plt.ylabel('Trips')
            plt.tight_layout()
            out_path = os.path.join(OUTPUT_DIR, 'analysis11_weather_correlation.png')
            plt.savefig(out_path)
            plt.show()
            print(f'Saved plot to {out_path}')
            merged[['date','trips',metric]].head()

## Analysis 12: Extreme Event Detection by Day
- Flag unusually high/low trip days (volume z-scores).
- Plot top/bottom 5 anomalous dates.
- Save bar chart to analytics folder.


In [None]:
if trips_sample.empty or 'date' not in trips_sample.columns:
    print('Skipping: no date column to aggregate by')
else:
    ensure_output_dir()
    daily = trips_sample.groupby('date').size().reset_index(name='trips')
    if daily.empty:
        print('No daily data available')
    else:
        daily['z'] = stats.zscore(daily['trips'])
        top = daily.nlargest(5, 'z')
        bottom = daily.nsmallest(5, 'z')
        extreme = pd.concat([top, bottom]).sort_values('date')
        plt.figure(figsize=(12,5))
        sns.barplot(data=extreme, x='date', y='trips', hue=pd.cut(extreme['z'], [-np.inf,-2,2,np.inf], labels=['low','normal','high']))
        plt.title('Extreme Trip Volume Days (z-score)')
        plt.xlabel('Date')
        plt.ylabel('Trips')
        plt.xticks(rotation=45)
        plt.tight_layout()
        out_path = os.path.join(OUTPUT_DIR, 'analysis12_extreme_days.png')
        plt.savefig(out_path)
        plt.show()
        print(f'Saved plot to {out_path}')
        extreme[['date','trips','z']]
