 ### HVFHV Data Download Pipeline - Stage 0

 **Pipeline Position:** Stage 0 of 4
 ```
 - Stage 0: Data Download ← **Current**
 - Stage 1: Data Validation
 - Stage 2: Exploratory Analysis
 - Stage 3: Modeling
 ```

**Overview**
 
 This notebook downloads NYC High-Volume For-Hire Vehicle (HVFHV) trip data from NYC Open Data.
 HVFHV includes app-based rideshare services: Uber, Lyft, Via, and Juno.

**What This Notebook Does**

 1. Downloads monthly HVFHV Parquet files from NYC Open Data (2020-2024)
 2. Combines files using DuckDB (memory-efficient, faster than pandas)
 3. Exports combined data in Parquet and CSV formats

**Dataset**

 - Source: NYC TLC HVFHV data (Uber, Lyft, Via)  
 - Files: `fhvhv_tripdata_*.parquet`  
 - Output: `data/final/combined_hvfhv_tripdata.parquet`

**Next Step**

  - After data is loaded, run `01_data_validation.ipynb` to validate data quality.

 #### 1. Setup

 ##### 1.1 Import Libraries

In [None]:
import urllib.request
from pathlib import Path
import pandas as pd
import duckdb

##### 1.2 Configuration
 All constants and configuration parameters are defined here for easy modification.

In [39]:
# === Configuration ===
DATA_ROOT = Path("..").resolve()

# Full dataset: 2020-2024 (5 years, 60 months)
YEARS = [2020, 2021, 2022, 2023, 2024]

# Directory structure
raw_folder = DATA_ROOT / "data" / "raw"
final_folder = DATA_ROOT / "data" / "final"

# External resources
BASE_URL = "https://d37ci6vzurychx.cloudfront.net/trip-data"

# Dataset configuration
DATASET_TYPE = "fhvhv"  # High-Volume For-Hire Vehicle (Uber/Lyft)
OUTPUT_FILENAME = f"combined_{DATASET_TYPE}_tripdata"

# Display configuration
print("Configuration:")
print(f"  Dataset: {DATASET_TYPE.upper()} (High-Volume FHV - Uber/Lyft/Via/Juno)")
print(f"  Years: {YEARS[0]}-{YEARS[-1]} ({len(YEARS)} years)")
print(f"  Months per year: 12")
print(f"  Total files: {len(YEARS) * 12}")
print(f"  Output base name: {OUTPUT_FILENAME}")
print(f"\n  Using DuckDB for memory-efficient processing")


Configuration:
  Dataset: FHVHV (High-Volume FHV - Uber/Lyft/Via/Juno)
  Years: 2020-2024 (5 years)
  Months per year: 12
  Total files: 60
  Output base name: combined_fhvhv_tripdata

  Using DuckDB for memory-efficient processing


 ##### 1.3 Create Directory Structure

In [40]:
raw_folder.mkdir(parents=True, exist_ok=True)
final_folder.mkdir(parents=True, exist_ok=True)


 #### 2. Download Data

 ##### 2.1 Generate Download Tasks
 Create a list of filenames to download. URLs and save paths are derived
 on-demand to avoid redundancy.

In [41]:
# Generate download tasks (store only filename)
download_tasks = [
    {'filename': f"{DATASET_TYPE}_tripdata_{year}-{month:02d}.parquet"}
    for year in YEARS
    for month in range(1, 13)
]

print(f"Generated {len(download_tasks)} download tasks")


Generated 60 download tasks


 ##### 2.2 Execute Downloads
 Download each file from NYC Open Data, deriving the URL and save path from the filename.
 **Note:** Files already downloaded will be skipped automatically.
 Some future months may not be available yet.

In [42]:
downloaded_files = []
failed_files = []

for i, task in enumerate(download_tasks, 1):
    filename = task['filename']
    url = f"{BASE_URL}/{filename}"
    save_path = raw_folder / filename
    
    # Skip existing files
    if save_path.exists():
        downloaded_files.append(save_path)
        continue
    
    # Download with error handling
    try:
        print(f"[{i}/{len(download_tasks)}] {filename}...", end=" ")
        urllib.request.urlretrieve(url, save_path)
        downloaded_files.append(save_path)
        print("✓")
    except Exception as e:
        print(f"✗ {str(e)[:50]}")
        failed_files.append(filename)

# Summary
print(f"\nDownload complete: {len(downloaded_files)} files available")
if failed_files:
    print(f"Failed: {len(failed_files)} files (may not exist yet)")
    for fname in failed_files[:5]:  # Show first 5
        print(f"  - {fname}")
    if len(failed_files) > 5:
        print(f"  ... and {len(failed_files) - 5} more")



Download complete: 60 files available


 #### 3. Combine Files with DuckDB
 Use DuckDB to combine files efficiently without loading everything into memory.
 This approach scales to any dataset size and is much faster than pandas.

 ##### 3.1 Initialize DuckDB Connection

In [43]:
# Create DuckDB connection
con = duckdb.connect()
print("✓ DuckDB connection established")

✓ DuckDB connection established


 ##### 3.2 Analyze Available Data
 Query the downloaded files to understand what data we have before combining.

In [44]:
print("Analyzing downloaded files...")

# Pattern to match all downloaded files
parquet_pattern = str(raw_folder / f"{DATASET_TYPE}_tripdata_*.parquet")

# Query to select all data from all files
query = f"""
SELECT *
FROM read_parquet('{parquet_pattern}')
"""

# Get total row count (fast - doesn't load data into memory)
row_count = con.execute(f"SELECT COUNT(*) FROM ({query})").fetchone()[0]
print(f"✓ Total rows across all files: {row_count:,}")

# Get column information
columns = con.execute(f"SELECT * FROM ({query}) LIMIT 0").description
column_names = [col[0] for col in columns]
print(f"✓ Dataset has {len(column_names)} columns")

# Get date range
date_range = con.execute(f"""
    SELECT 
        MIN(pickup_datetime) as start_date,
        MAX(pickup_datetime) as end_date
    FROM ({query})
""").fetchone()
print(f"✓ Date range: {date_range[0]} to {date_range[1]}")


Analyzing downloaded files...
✓ Total rows across all files: 1,002,283,074
✓ Dataset has 24 columns
✓ Date range: 2020-01-01 00:00:00 to 2024-12-31 23:59:59


 #### 4. Save Output

 ##### 4.1 Export Combined Dataset with DuckDB
 Save the combined dataset directly from DuckDB to Parquet and CSV.
 This is much faster and more memory-efficient than pandas.
 **Note:** CSV export may take several minutes for large datasets.

In [None]:
# Define output paths
parquet_path = final_folder / f"{OUTPUT_FILENAME}.parquet"
csv_path = final_folder / f"{OUTPUT_FILENAME}.csv"

# Save to Parquet (fast!)
print(f"Saving to Parquet: {parquet_path.name}...")
con.execute(f"""
    COPY ({query})
    TO '{str(parquet_path)}'
    (FORMAT PARQUET, COMPRESSION SNAPPY)
""")
parquet_size_mb = parquet_path.stat().st_size / 1024**2
print(f"✓ Saved Parquet: {parquet_size_mb:.1f} MB")

Saving to Parquet: combined_fhvhv_tripdata.parquet...


 ##### 4.2 Pipeline Health Check
 Verify that the pipeline completed successfully and data has expected structure.

In [None]:
# Verify expected HVFHV columns are present
expected_columns = ['hvfhs_license_num', 'pickup_datetime', 'dropoff_datetime', 
                   'trip_miles', 'base_passenger_fare']
missing_cols = [col for col in expected_columns if col not in column_names]

if missing_cols:
    print(f"⚠️  Missing expected columns: {missing_cols}")
    print(f"Available columns: {column_names}")
else:
    print(f"✓ Pipeline complete: {row_count:,} rows saved")
    print(f"✓ All expected HVFHV columns present")


✓ Pipeline complete: 1,002,283,074 rows saved
✓ All expected HVFHV columns present


 ##### 4.3 Data Summary
 Comprehensive overview of the combined dataset using DuckDB queries.
 DuckDB can compute statistics without loading the entire dataset into memory.

##### 4.3.1 Dataset Dimensions

In [None]:
print("="*70)
print("DATASET OVERVIEW")
print("="*70)

# Basic statistics
print(f"\nDimensions:")
print(f"  Total rows: {row_count:,}")
print(f"  Total columns: {len(column_names)}")

# File size
print(f"\nFile Size:")
print(f"  Parquet: {parquet_size_mb:.1f} MB ({parquet_size_mb / 1024:.2f} GB)")

DATASET OVERVIEW

Dimensions:
  Total rows: 1,002,283,074
  Total columns: 24

File Size:
  Parquet: 28114.2 MB (27.46 GB)


##### 4.3.2 Date Coverage

In [None]:
# Date coverage
date_stats = con.execute(f"""
    SELECT 
        MIN(pickup_datetime) as start_date,
        MAX(pickup_datetime) as end_date,
        DATEDIFF('day', MIN(pickup_datetime), MAX(pickup_datetime)) as span_days
    FROM ({query})
""").fetchone()

print(f"\nDate Range:")
print(f"  Start: {date_stats[0]}")
print(f"  End: {date_stats[1]}")
print(f"  Span: {date_stats[2]} days ({date_stats[2] / 365:.1f} years)")


Date Range:
  Start: 2020-01-01 00:00:00
  End: 2024-12-31 23:59:59
  Span: 1826 days (5.0 years)


##### 4.3.3 Company Distribution

In [None]:

# Query company trip counts from data
company_stats = con.execute(f"""
    SELECT 
        hvfhs_license_num as code,
        COUNT(*) as trips
    FROM ({query})
    GROUP BY hvfhs_license_num
    ORDER BY trips DESC
""").fetchall()

# Company name mapping
company_map = {
    'HV0002': 'Juno',
    'HV0003': 'Uber',
    'HV0004': 'Via',
    'HV0005': 'Lyft'
}

# Display results
print(f"Company Distribution ({len(company_stats)} companies):\n")

total_trips = sum(trips for _, trips in company_stats)

for code, trips in company_stats:
    name = company_map.get(code, code)
    pct = (trips / total_trips) * 100
    print(f"  {name:10s}: {trips:>15,} trips ({pct:>5.1f}%)")


Company Distribution (3 companies):

  Uber      :     729,414,181 trips ( 72.8%)
  Lyft      :     269,104,518 trips ( 26.8%)
  Via       :       3,764,375 trips (  0.4%)


##### 4.3.4 Dataset Metrics

In [None]:
data_stats = con.execute(f"""
    SELECT 
        SUM(trip_miles) as total_miles,
        AVG(trip_miles) as avg_miles,
        SUM(base_passenger_fare) as total_revenue,
        AVG(base_passenger_fare) as avg_fare,
        SUM(tips) as total_tips,
        AVG(tips) as avg_tip
    FROM ({query})
""").fetchone()

print("Dataset Metrics:")
print(f"  Total miles:        {data_stats[0]:>15,.0f}")
print(f"  Avg trip distance:  {data_stats[1]:>15.2f} miles")
print(f"  Total revenue:      ${data_stats[2]:>14,.2f}")
print(f"  Avg fare:           ${data_stats[3]:>14.2f}")
print(f"  Total tips:         ${data_stats[4]:>14,.2f}")
print(f"  Avg tip:            ${data_stats[5]:>14.2f}")

# Tip percentage
if data_stats[3] > 0:
    tip_pct = (data_stats[5] / data_stats[3]) * 100
    print(f"  Avg tip percentage: {tip_pct:>14.1f}%")

Dataset Metrics:
  Total miles:          4,967,337,155
  Avg trip distance:             4.96 miles
  Total revenue:      $23,393,211,267.39
  Avg fare:           $         23.34
  Total tips:         $999,829,665.77
  Avg tip:            $          1.00
  Avg tip percentage:            4.3%


##### 4.3.5 Location Statistics

In [None]:
# Location statistics
location_stats = con.execute(f"""
    SELECT 
        COUNT(DISTINCT PULocationID) as pickup_zones,
        COUNT(DISTINCT DOLocationID) as dropoff_zones
    FROM ({query})
""").fetchone()

print(f"\nUnique Locations:")
print(f"  Pickup zones: {location_stats[0]}")
print(f"  Dropoff zones: {location_stats[1]}")


Unique Locations:
  Pickup zones: 263
  Dropoff zones: 264


##### 4.3.6 Column Inventory

In [None]:
# Column inventory
print(f"\nColumns ({len(column_names)}):")
for i, col in enumerate(column_names, 1):
    print(f"  {i:2d}. {col}")

##### 4.3.7 Missing Value Analysis

In [None]:

# Columns to check for missing values
columns_to_check = [
    'hvfhs_license_num',
    'pickup_datetime',
    'dropoff_datetime',
    'trip_miles',
    'base_passenger_fare',
    'tips',
    'PULocationID',
    'DOLocationID'
]

print("Missing Values (Key Columns):")

has_nulls = FalseS
for col in columns_to_check:
    # Simple query for each column
    null_count = con.execute(f"""
        SELECT COUNT(*) 
        FROM ({query}) 
        WHERE {col} IS NULL
    """).fetchone()[0]
    
    if null_count > 0:
        has_nulls = True
        pct = (null_count / row_count) * 100
        print(f"  {col:30s}: {null_count:>12,} ({pct:5.2f}%)")

if not has_nulls:
    print("  ✓ No missing values in key columns")

Missing Values (Key Columns):
  ✓ No missing values in key columns


##### 4.3.8 Sample Records

In [None]:
# Display sample records (all columns) in a transposed format so all fields are visible
sample_df = con.execute(f"""
    SELECT *
    FROM ({query})
    LIMIT 4
""").df()

print(f"Sample Records - Transposed ({len(sample_df.columns)} columns, {len(sample_df)} rows):\n")
print(sample_df.T.to_string())

##### 4.3.9 Summary Complete

In [None]:
print(f"\n{'='*70}")
print(f"✓ Data summary complete")
print(f"→ Next step: Run notebooks/01_data_validation.ipynb")
print(f"{'='*70}")

# Close DuckDB connection
con.close()

### Summary

**Completed Steps**
1. Downloaded HVFHV trip data from NYC Open Data (2020-2024)
2. Combined monthly files using DuckDB (memory-efficient)
3. Exported to Parquet format
4. Verified pipeline success
5. Generated data summary

**Output Files**
- `data/raw/fhvhv_tripdata_*.parquet` - Individual monthly files (60 files)
- `data/final/combined_hvfhv_tripdata.parquet` - Combined dataset

**Why DuckDB?**
DuckDB processes larger-than-RAM datasets without loading everything into memory. This enables analysis of the 28 GB dataset on a standard laptop.

**Next Step**
Run `01_data_validation.ipynb` to validate data quality and flag invalid records.