# TLC Hourly Demand Processing (Optimized for Memory)
This notebook processes TLC Parquet files efficiently by:
- Processing files in chunks (avoids loading all data at once)
- Saving intermediate hourly aggregates
- Combining results and adding features
- Saving final partitioned Parquet files
- Optional cleanup of intermediate files


In [1]:
from pathlib import Path
import pandas as pd
import numpy as np
import glob
import os
import holidays
import re
from tqdm import tqdm

## Step 2: Setup Directories
We create separate directories for intermediate and final outputs:
- `final_dir` for final partitioned Parquet files
- `intermediate_dir` for intermediate hourly aggregates


In [2]:
output_dir = "data/daily_demand_partitioned"
final_dir = Path(output_dir) / "final"
intermediate_dir = Path(output_dir) / "intermediate_location"
final_dir.mkdir(parents=True, exist_ok=True)
intermediate_dir.mkdir(parents=True, exist_ok=True)



## Step 3: Infer Date Range from Filenames
We infer the date range from filenames to filter data later.


In [3]:
# Input files
input_pattern = "data/tlc_*/**/*.parquet"
input_files = glob.glob(input_pattern, recursive=True)

# Infer date range
dates = []
for f in input_files:
    match = re.search(r'(\d{4})[-_]?(\d{2})', f)
    if match:
        year, month = int(match.group(1)), int(match.group(2))
        dates.append(pd.Timestamp(year=year, month=month, day=1))
start_date, end_date = min(dates), max(dates) + pd.offsets.MonthEnd(1)

print(f"✅ Date range inferred: {start_date} to {end_date}")


✅ Date range inferred: 2023-01-01 00:00:00 to 2025-06-30 00:00:00


In [4]:
import pyarrow.parquet as pq
import glob

input_files = glob.glob("data/tlc_*/**/*.parquet", recursive=True)

total_rows = 0
for f in input_files:
    metadata = pq.ParquetFile(f).metadata
    total_rows += metadata.num_rows

print("Total rows:", total_rows)


Total rows: 105314015


## Step 4: Process Files in Chunks and Save Intermediate Aggregates
We process each file individually, normalize schema, compute hourly aggregates, and save intermediate results.


In [4]:
# Process files in chunks -> DAILY aggregates
for f in tqdm(input_files, desc="Processing files"):
    chunk = pd.read_parquet(f)

    # Normalize schema
    if 'tpep_pickup_datetime' in chunk.columns:
        chunk.rename(columns={'tpep_pickup_datetime': 'pickup_datetime'}, inplace=True)
    elif 'lpep_pickup_datetime' in chunk.columns:
        chunk.rename(columns={'lpep_pickup_datetime': 'pickup_datetime'}, inplace=True)

    # Keep necessary columns
    chunk = chunk[['pickup_datetime', 'PULocationID', 'DOLocationID']]
    chunk['pickup_datetime'] = pd.to_datetime(chunk['pickup_datetime'])

    # Aggregate by DATE
    chunk['date'] = chunk['pickup_datetime'].dt.date
    daily = chunk.groupby(['date', 'PULocationID', 'DOLocationID']).size().reset_index(name='rides')

    # Save intermediate
    daily.to_parquet(intermediate_dir / f"daily_{os.path.basename(f)}", index=False)

Processing files: 100%|██████████| 60/60 [03:17<00:00,  3.30s/it]


## Step 5: Combine Intermediate Results
We combine all intermediate hourly aggregates and filter by date range.


In [5]:

# Combine intermediate results
daily_files = glob.glob(str(intermediate_dir / "daily_*.parquet"))
daily_demand = pd.concat([pd.read_parquet(f) for f in daily_files], ignore_index=True)
daily_demand = daily_demand.groupby(['date', 'PULocationID', 'DOLocationID'], as_index=False)['rides'].sum()

# Filter by date range
daily_demand['date'] = pd.to_datetime(daily_demand['date'])
daily_demand = daily_demand[(daily_demand['date'] >= start_date) & (daily_demand['date'] <= end_date)]


## Step 6-9: Add Features (Time-based, Lag, Holiday, Weather)
We add additional features for modeling and analysis.


In [6]:

# Add features
daily_demand['year'] = daily_demand['date'].dt.year.astype('int16')
daily_demand['month'] = daily_demand['date'].dt.month.astype('int8')
daily_demand['weekday'] = daily_demand['date'].dt.weekday.astype('int8')

# Lag features
daily_demand.sort_values('date', inplace=True)
for lag in [1, 2, 7]:
    daily_demand[f'lag_{lag}'] = daily_demand['rides'].shift(lag)

# Holiday flag
years = daily_demand['year'].unique()
us_holidays = holidays.US(years=years)
daily_demand['is_holiday'] = daily_demand['date'].dt.date.astype(str).isin(us_holidays).astype('int8')

In [7]:
daily_demand.head(2)

Unnamed: 0,date,PULocationID,DOLocationID,rides,year,month,weekday,lag_1,lag_2,lag_7,is_holiday
148,2023-01-01,1,1,37,2023,1,6,,,,0
5562,2023-01-01,181,33,2,2023,1,6,37.0,,,0


## Step 10: Save Final Partitioned Parquet Files
We save the final dataset partitioned by year and month.


In [8]:
# Save final partitioned Parquet
daily_demand.dropna(inplace=True)
daily_demand.iloc[:-1, :].to_parquet(final_dir, engine="pyarrow", partition_cols=['year', 'month'], index=False)
print(f"✅ Daily processing complete. Final data saved to {final_dir}")

✅ Daily processing complete. Final data saved to data\daily_demand_partitioned\final
