# Ingestion Pipeline: NYC TLC Trips to Postgres RAW Layer

## Process Flow
1. Download Parquet files locally
2. Process with Spark
3. Write to Postgres using JDBC

## Features
- Batch processing with memory optimization
- Skip already processed files
- Smart download (URL check + streaming)
- PyArrow batching for large files
- Metadata tracking (run_id, source_year/month, ingested_at_utc)

In [1]:
import sys
!{sys.executable} -m pip install -q psycopg2-binary pandas pyarrow psutil requests python-dotenv
print("Dependencies installed successfully")

Dependencies installed successfully


In [2]:
import os
import gc
import requests
import pyarrow.parquet as pq
import psutil
import psycopg2
from pyspark.sql import SparkSession
from pyspark.sql.functions import lit, current_timestamp
from datetime import datetime
import uuid
from dotenv import load_dotenv

load_dotenv()
print("Imports loaded")

Imports loaded


In [3]:
# Configuration from environment
PG_HOST = os.getenv('PG_HOST', 'postgres')
PG_PORT = os.getenv('PG_PORT', '5432')
PG_DB = os.getenv('PG_DB', 'nyc_taxi')
PG_USER = os.getenv('PG_USER', 'taxi_user')
PG_PASSWORD = os.getenv('PG_PASSWORD')
PG_SCHEMA_RAW = os.getenv('PG_SCHEMA_RAW', 'raw')

NYC_TLC_BASE_URL = os.getenv('NYC_TLC_BASE_URL', 'https://d37ci6vzurychx.cloudfront.net/trip-data')
START_YEAR = 2020
END_YEAR = 2020
SERVICES = os.getenv('SERVICES', 'yellow,green').split(',')
RUN_ID = os.getenv('RUN_ID', str(uuid.uuid4()))

LOCAL_DATA_DIR = "/home/jovyan/data/parquet"
BATCH_SIZE = 5

JDBC_URL = f"jdbc:postgresql://{PG_HOST}:{PG_PORT}/{PG_DB}"

print(f"Configuration loaded")
print(f"Run ID: {RUN_ID}")
print(f"Date range: {START_YEAR}-{END_YEAR}")
print(f"Services: {SERVICES}")
print(f"Postgres: {PG_HOST}:{PG_PORT}/{PG_DB}")
print(f"Local data dir: {LOCAL_DATA_DIR}")
print(f"Batch size: {BATCH_SIZE} files")

Configuration loaded
Run ID: pset4_run_001
Date range: 2020-2020
Services: ['yellow', 'green']
Postgres: postgres:5432/nyc_taxi
Local data dir: /home/jovyan/data/parquet
Batch size: 5 files


In [4]:
os.makedirs(LOCAL_DATA_DIR, exist_ok=True)
print(f"Local directory ready: {LOCAL_DATA_DIR}")

Local directory ready: /home/jovyan/data/parquet


In [5]:
# Initialize Spark with Postgres JDBC driver
spark = SparkSession.builder \
    .appName("NYC_TLC_Postgres_Ingestion") \
    .config("spark.driver.memory", os.getenv("SPARK_DRIVER_MEMORY", "8g")) \
    .config("spark.executor.memory", os.getenv("SPARK_EXECUTOR_MEMORY", "6g")) \
    .config("spark.driver.maxResultSize", os.getenv("SPARK_DRIVER_MAXRESULTSIZE", "2g")) \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.6.0") \
    .getOrCreate()

print(f"Spark version: {spark.version}")
print(f"Spark driver memory: {spark.conf.get('spark.driver.memory')}")
print(f"Spark executor memory: {spark.conf.get('spark.executor.memory')}")

Spark version: 3.5.0
Spark driver memory: 8g
Spark executor memory: 6g


In [6]:
def generate_inventory(base_url, services, start_year, end_year):
    inventory = []
    for service in services:
        for year in range(start_year, end_year + 1):
            for month in range(1, 13):
                if year == 2025 and month > datetime.now().month:
                    break
                
                url = f"{base_url}/{service}_tripdata_{year}-{month:02d}.parquet"
                local_file = f"{LOCAL_DATA_DIR}/{service}_{year}_{month:02d}.parquet"
                
                inventory.append({
                    'service': service,
                    'year': year,
                    'month': month,
                    'url': url,
                    'local_file': local_file
                })
    return inventory

inventory = generate_inventory(NYC_TLC_BASE_URL, SERVICES, START_YEAR, END_YEAR)
print(f"Total files to process: {len(inventory)}")

Total files to process: 24


In [7]:
def get_postgres_connection():
    return psycopg2.connect(
        host=PG_HOST,
        port=PG_PORT,
        database=PG_DB,
        user=PG_USER,
        password=PG_PASSWORD
    )

try:
    conn = get_postgres_connection()
    cursor = conn.cursor()
    cursor.execute("SELECT version();")
    version = cursor.fetchone()
    print(f"Connected to Postgres: {version[0][:50]}...")
    cursor.close()
    conn.close()
except Exception as e:
    print(f"Postgres connection error: {e}")
    raise

Connected to Postgres: PostgreSQL 15.14 on x86_64-pc-linux-musl, compiled...


In [8]:
print("Checking already processed files in Postgres...")
try:
    conn = get_postgres_connection()
    cursor = conn.cursor()
    
    existing_run_ids = set()
    for service in ['yellow', 'green']:
        table_name = f"{PG_SCHEMA_RAW}.{service}_taxi_trip"
        try:
            cursor.execute(f"SELECT DISTINCT run_id FROM {table_name}")
            results = cursor.fetchall()
            service_ids = {row[0] for row in results}
            existing_run_ids.update(service_ids)
            print(f"  {service}: {len(service_ids)} files already processed")
        except Exception as e:
            print(f"  {service}: table not found or empty (OK for first run)")
    
    cursor.close()
    conn.close()
    print(f"Total files already in Postgres: {len(existing_run_ids)}")
except Exception as e:
    print(f"Could not check existing files (OK for first run): {e}")
    existing_run_ids = set()

Checking already processed files in Postgres...
  yellow: 12 files already processed
  green: 12 files already processed
Total files already in Postgres: 24


In [9]:
def download_parquet(url, local_file):
    if os.path.exists(local_file):
        file_size_mb = os.path.getsize(local_file) / (1024 * 1024)
        print(f"  File exists: {file_size_mb:.2f} MB")
        return True, os.path.getsize(local_file)
    
    try:
        r = requests.head(url, timeout=10)
        if r.status_code != 200:
            print(f"  URL not available: {r.status_code}")
            return False, None
    except Exception as e:
        print(f"  URL check failed: {e}")
        return False, None
    
    try:
        print(f"  Downloading...")
        with requests.get(url, stream=True, timeout=60) as r:
            r.raise_for_status()
            with open(local_file, 'wb') as f:
                for chunk in r.iter_content(chunk_size=8192):
                    f.write(chunk)
        
        file_size = os.path.getsize(local_file)
        size_mb = file_size / (1024 * 1024)
        print(f"  Downloaded: {size_mb:.2f} MB")
        return True, file_size
    except Exception as e:
        print(f"  Download failed: {e}")
        return False, None

In [10]:
def ingest_to_postgres(item):
    service = item['service']
    year = item['year']
    month = item['month']
    url = item['url']
    local_file = item['local_file']
    
    start_time = datetime.now()
    status = 'SUCCESS'
    error_msg = None
    records_loaded = 0
    
    print(f"Processing {service} {year}-{month:02d}...")
    
    file_run_id = f"{service}_{year}_{month:02d}"
    if file_run_id in existing_run_ids:
        print(f"  SKIPPING - already in Postgres")
        return {
            'run_id': file_run_id,
            'service_type': service,
            'source_year': year,
            'source_month': month,
            'record_count': 0,
            'started_at': start_time,
            'completed_at': datetime.now(),
            'status': 'SKIPPED',
            'error_message': 'Already processed'
        }
    
    try:
        download_ok, file_size = download_parquet(url, local_file)
        if not download_ok:
            raise Exception("Download failed or URL not available")
        
        df = spark.read.parquet(local_file)

        if 'ehail_fee' in df.columns:
            df = df.drop('ehail_fee')
        
        df = df.withColumn("run_id", lit(file_run_id)) \
               .withColumn("service_type", lit(service)) \
               .withColumn("source_year", lit(year)) \
               .withColumn("source_month", lit(month)) \
               .withColumn("ingested_at_utc", current_timestamp()) \
               .withColumn("source_path", lit(url))
        
        records_loaded = df.count()
        print(f"  Rows: {records_loaded:,}")
        
        table_name = f"{PG_SCHEMA_RAW}.{service}_taxi_trip"
        
        df.write \
            .format("jdbc") \
            .option("url", JDBC_URL) \
            .option("dbtable", table_name) \
            .option("user", PG_USER) \
            .option("password", PG_PASSWORD) \
            .option("driver", "org.postgresql.Driver") \
            .mode("append") \
            .save()
        
        print(f"  Written to {table_name}")
        
        try:
            os.remove(local_file)
            print(f"  Temp file cleaned")
        except:
            pass
        
    except Exception as e:
        status = 'FAILED'
        error_msg = str(e)
        print(f"  FAILED: {error_msg}")
    
    end_time = datetime.now()
    duration = (end_time - start_time).total_seconds()
    print(f"  Duration: {duration:.2f}s")
    
    return {
        'run_id': file_run_id,
        'service_type': service,
        'source_year': year,
        'source_month': month,
        'record_count': records_loaded,
        'started_at': start_time,
        'completed_at': end_time,
        'status': status,
        'error_message': error_msg,
        'source_path': url
    }

## Test: Single file first

In [11]:
test_item = inventory[0]
print(f"Testing: {test_item['service']} {test_item['year']}-{test_item['month']:02d}")
print()

test_result = ingest_to_postgres(test_item)
print()
print(f"Result: {test_result['status']}")
if test_result['status'] == 'SUCCESS':
    print(f"Records loaded: {test_result['record_count']:,}")
    print("Ready for batch processing")
else:
    print(f"Status: {test_result['error_message']}")

Testing: yellow 2020-01

Processing yellow 2020-01...
  SKIPPING - already in Postgres

Result: SKIPPED
Status: Already processed


## Execute batch ingestion

In [12]:
audit_records = []

print(f"Starting batch ingestion of {len(inventory)} files")
print(f"Batch size: {BATCH_SIZE} files at a time\n")

for i, item in enumerate(inventory, 1):
    audit_record = ingest_to_postgres(item)
    audit_records.append(audit_record)
    print("")
    
    if i % BATCH_SIZE == 0:
        print(f"--- Completed {i}/{len(inventory)} files ---\n")
        spark.catalog.clearCache()
        gc.collect()

print(f"Ingestion complete: {len(audit_records)} files processed")

Starting batch ingestion of 24 files
Batch size: 5 files at a time

Processing yellow 2020-01...
  SKIPPING - already in Postgres

Processing yellow 2020-02...
  SKIPPING - already in Postgres

Processing yellow 2020-03...
  SKIPPING - already in Postgres

Processing yellow 2020-04...
  SKIPPING - already in Postgres

Processing yellow 2020-05...
  SKIPPING - already in Postgres

--- Completed 5/24 files ---

Processing yellow 2020-06...
  SKIPPING - already in Postgres

Processing yellow 2020-07...
  SKIPPING - already in Postgres

Processing yellow 2020-08...
  SKIPPING - already in Postgres

Processing yellow 2020-09...
  SKIPPING - already in Postgres

Processing yellow 2020-10...
  SKIPPING - already in Postgres

--- Completed 10/24 files ---

Processing yellow 2020-11...
  SKIPPING - already in Postgres

Processing yellow 2020-12...
  SKIPPING - already in Postgres

Processing green 2020-01...
  SKIPPING - already in Postgres

Processing green 2020-02...
  SKIPPING - already in P

## Write audit records to Postgres

In [13]:
import pandas as pd

audit_df = pd.DataFrame(audit_records)

conn = get_postgres_connection()
cursor = conn.cursor()

inserted = 0
failed = 0

for index, row in audit_df.iterrows():
    try:
        started_at = row['started_at'].strftime('%Y-%m-%d %H:%M:%S')
        completed_at = row['completed_at'].strftime('%Y-%m-%d %H:%M:%S')
        duration = (row['completed_at'] - row['started_at']).total_seconds()
        
        if pd.isna(row['error_message']) or not row['error_message']:
            error_msg = 'NULL'
        else:
            error_msg = "'" + str(row['error_message']).replace("'", "''") + "'"
        
        source_path = f"'{row['source_path']}'"
        
        sql = f"""
        INSERT INTO {PG_SCHEMA_RAW}.ingestion_audit 
        (run_id, service_type, source_year, source_month, source_path, record_count, 
         status, error_message, started_at, completed_at, duration_seconds)
        VALUES (
            '{row['run_id']}',
            '{row['service_type']}',
            {row['source_year']},
            {row['source_month']},
            {source_path},
            {row['record_count']},
            '{row['status']}',
            {error_msg},
            '{started_at}',
            '{completed_at}',
            {duration}
        )
        """
        cursor.execute(sql)
        inserted += 1
    except Exception as e:
        failed += 1
        if failed <= 5:
            print(f"Error inserting audit: {str(e)[:80]}")
    
    if (index + 1) % 50 == 0:
        print(f"Progress: {index + 1}/{len(audit_df)}")

conn.commit()
cursor.close()
conn.close()

print(f"Audit records inserted: {inserted}, Failed: {failed}")

Error inserting audit: 'source_path'
Error inserting audit: 'source_path'
Error inserting audit: 'source_path'
Error inserting audit: 'source_path'
Error inserting audit: 'source_path'
Audit records inserted: 0, Failed: 24


## Summary

In [14]:
summary_df = pd.DataFrame(audit_records)

print("\n=== INGESTION SUMMARY ===")
print(f"\nTotal files processed: {len(summary_df)}")
print(f"\nBy status:")
print(summary_df['status'].value_counts())

print(f"\nBy service and status:")
print(summary_df.groupby(['service_type', 'status']).size())

successful = summary_df[summary_df['status'] == 'SUCCESS']
if len(successful) > 0:
    total_records = successful['record_count'].sum()
    print(f"\nTotal records ingested: {total_records:,}")

failed = summary_df[summary_df['status'] == 'FAILED']
if len(failed) > 0:
    print(f"\nFailed loads:")
    print(failed[['service_type', 'source_year', 'source_month', 'error_message']])
else:
    print(f"\n✓ No failed loads")


=== INGESTION SUMMARY ===

Total files processed: 24

By status:
status
SKIPPED    24
Name: count, dtype: int64

By service and status:
service_type  status 
green         SKIPPED    12
yellow        SKIPPED    12
dtype: int64

✓ No failed loads


In [15]:
spark.stop()
print("Spark session stopped")

Spark session stopped
