# Notebook 01: Ingesta Parquet → Postgres RAW
## NYC TLC Taxi Trips 2015-2025 (Yellow & Green)

**Objetivo**: Backfill completo de datos NYC TLC desde archivos Parquet públicos hacia PostgreSQL en el esquema `raw`.

**Alcance**:
- Yellow Taxi: 2015-2025
- Green Taxi: 2015-2025
- Taxi Zone Lookup (catálogo de zonas)

**Proceso**:
1. Configuración y conexión a Postgres
2. Descarga de archivos Parquet desde NYC TLC
3. Estandarización de columnas
4. **Limpieza de datos** (validaciones de negocio)
5. Inserción en `raw.yellow_taxi_trip` y `raw.green_taxi_trip`
6. Carga de `raw.taxi_zone_lookup`
7. Validación y reportes

## 1. Configuración inicial y variables de ambiente

In [1]:
import os
import time
import tempfile
from datetime import datetime
from urllib.request import urlretrieve
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit, current_timestamp, when, unix_timestamp, abs as spark_abs
from pyspark.sql.types import StringType, IntegerType, DoubleType, TimestampType

# Cargar variables de ambiente
PG_HOST = os.getenv('PG_HOST', 'postgres')
PG_PORT = os.getenv('PG_PORT', '5432')
PG_DB = os.getenv('PG_DB', 'nyc_tlc')
PG_USER = os.getenv('PG_USER', 'postgres')
PG_PASSWORD = os.getenv('PG_PASSWORD', 'postgres')
PG_SCHEMA_RAW = os.getenv('PG_SCHEMA_RAW', 'raw')

# Parámetros de ingesta
YEARS = list(range(2015, 2026))  # 2015-2025
MONTHS = list(range(1, 13))      # 1-12
SERVICES = ['yellow']   # Tipos de taxi
RUN_ID = os.getenv('RUN_ID', f"ingesta_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}")

# URL base para los archivos Parquet de NYC TLC
BASE_URL = "https://d37ci6vzurychx.cloudfront.net/trip-data"

print("Configuración cargada:")
print(f"  - Postgres: {PG_USER}@{PG_HOST}:{PG_PORT}/{PG_DB}")
print(f"  - Schema RAW: {PG_SCHEMA_RAW}")
print(f"  - Años: {YEARS[0]}-{YEARS[-1]}")
print(f"  - Servicios: {SERVICES}")
print(f"  - RUN_ID: {RUN_ID}")

Configuración cargada:
  - Postgres: admin@postgres:5432/nyc_taxi
  - Schema RAW: raw
  - Años: 2015-2025
  - Servicios: ['yellow']
  - RUN_ID: localtest


## 2. Inicialización de Spark Session con conexión a Postgres

In [2]:
from pyspark.sql import SparkSession

# Crear SparkSession con driver JDBC para Postgres
spark = (
    SparkSession.builder
    .appName("NYC_TLC_Ingesta_Raw")
    .config("spark.jars.packages", "org.postgresql:postgresql:42.6.0")
    .config("spark.driver.memory", "4g")
    .config("spark.executor.memory", "4g")
    .config("spark.sql.adaptive.enabled", "true")
    .getOrCreate()
)

# Nivel de logs
spark.sparkContext.setLogLevel("WARN")

print(f"Spark {spark.version} inicializado correctamente")
print(f"Master: {spark.sparkContext.master}")


Spark 3.5.0 inicializado correctamente
Master: local[*]


## 3. Funciones de utilidad

In [3]:
def build_url(service: str, year: int, month: int) -> str:
    """Construye la URL del archivo Parquet para un servicio, año y mes específicos."""
    filename = f"{service}_tripdata_{year}-{month:02d}.parquet"
    return f"{BASE_URL}/{filename}"

def download_to_temp(url: str) -> str:
    """Descarga un archivo Parquet a un directorio temporal y retorna la ruta."""
    temp_dir = tempfile.gettempdir()
    filename = url.split('/')[-1]
    local_path = os.path.join(temp_dir, filename)
    if not os.path.exists(local_path):
        print(f"  Descargando: {filename}...", end=" ")
        urlretrieve(url, local_path)
        print("OK")
    else:
        print(f"  Usando caché local: {filename}")
    return local_path

def get_postgres_jdbc_url() -> str:
    """Construye la URL JDBC para Postgres."""
    return f"jdbc:postgresql://{PG_HOST}:{PG_PORT}/{PG_DB}"

def get_postgres_properties() -> dict:
    """Retorna las propiedades de conexión JDBC."""
    return {
        "user": PG_USER,
        "password": PG_PASSWORD,
        "driver": "org.postgresql.Driver"
    }

print("Funciones de utilidad cargadas correctamente")

Funciones de utilidad cargadas correctamente


## 4. Estandarización de columnas por servicio

In [4]:
def standardize_columns(df, service: str):
    """
    Estandariza nombres de columnas y tipos de datos según el servicio.

    Yellow: tpep_pickup_datetime, tpep_dropoff_datetime
    Green: lpep_pickup_datetime, lpep_dropoff_datetime, trip_type
    """
    # Mapeo de columnas comunes (minúsculas y snake_case)
    column_mapping = {
        'VendorID': 'VendorID',
        'RatecodeID': 'RatecodeID',
        'PULocationID': 'PULocationID',
        'DOLocationID': 'DOLocationID',
        'passenger_count': 'passenger_count',
        'trip_distance': 'trip_distance',
        'fare_amount': 'fare_amount',
        'extra': 'extra',
        'mta_tax': 'mta_tax',
        'tip_amount': 'tip_amount',
        'tolls_amount': 'tolls_amount',
        'improvement_surcharge': 'improvement_surcharge',
        'total_amount': 'total_amount',
        'payment_type': 'payment_type',
        'congestion_surcharge': 'congestion_surcharge',
        'airport_fee': 'airport_fee',
        'store_and_fwd_flag': 'store_and_fwd_flag'
    }
    # Renombrar columnas que existan en el DataFrame
    for old_name, new_name in column_mapping.items():
        if old_name in df.columns and old_name != new_name:
            df = df.withColumnRenamed(old_name, new_name)
    # Timestamps específicos por servicio
    if service == 'yellow':
        if 'tpep_pickup_datetime' in df.columns:
            df = df.withColumn('tpep_pickup_datetime', col('tpep_pickup_datetime').cast(TimestampType()))
        if 'tpep_dropoff_datetime' in df.columns:
            df = df.withColumn('tpep_dropoff_datetime', col('tpep_dropoff_datetime').cast(TimestampType()))
    elif service == 'green':
        if 'lpep_pickup_datetime' in df.columns:
            df = df.withColumn('lpep_pickup_datetime', col('lpep_pickup_datetime').cast(TimestampType()))
        if 'lpep_dropoff_datetime' in df.columns:
            df = df.withColumn('lpep_dropoff_datetime', col('lpep_dropoff_datetime').cast(TimestampType()))
        if 'trip_type' in df.columns:
            df = df.withColumn('trip_type', col('trip_type').cast(IntegerType()))
    # Castear tipos de datos comunes
    numeric_int_cols = ['VendorID', 'RatecodeID', 'PULocationID', 'DOLocationID', 'passenger_count', 'payment_type']
    numeric_double_cols = ['trip_distance', 'fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount',
                           'improvement_surcharge', 'total_amount', 'congestion_surcharge', 'airport_fee']
    for col_name in numeric_int_cols:
        if col_name in df.columns:
            df = df.withColumn(col_name, col(col_name).cast(IntegerType()))
    for col_name in numeric_double_cols:
        if col_name in df.columns:
            df = df.withColumn(col_name, col(col_name).cast(DoubleType()))
    if 'store_and_fwd_flag' in df.columns:
        df = df.withColumn('store_and_fwd_flag', col('store_and_fwd_flag').cast(StringType()))
    return df

print("Función de estandarización cargada")

Función de estandarización cargada


## 5. Limpieza y validación de datos

In [5]:
def clean_taxi_data(df, service: str):
    """
    Limpieza robusta de datos NYC TLC Yellow/Green taxi trips.

    Reglas de validación:
    1. Timestamps válidos y lógicos (pickup < dropoff)
    2. LocationID válidos (1-263 según NYC TLC)
    3. Passenger count razonable (1-6, casos extremos hasta 9)
    4. Trip distance positiva y realista (< 500 millas)
    5. Montos no negativos y realistas
    6. Rate codes válidos (1-6)
    7. Payment types válidos (1-6)
    8. Vendor IDs válidos (1-2, algunos 4)
    9. Duración de viaje razonable (> 1 min, < 24 hrs)
    10. Consistencia en montos
    """
    initial_count = df.count()
    print(f"    [CLEAN] Filas iniciales: {initial_count:,}")
    pickup_col = 'tpep_pickup_datetime' if service == 'yellow' else 'lpep_pickup_datetime'
    dropoff_col = 'tpep_dropoff_datetime' if service == 'yellow' else 'lpep_dropoff_datetime'
    df = df.filter((col(pickup_col).isNotNull()) & (col(dropoff_col).isNotNull()))
    df = df.filter(col(pickup_col) < col(dropoff_col))
    df = df.filter((col(pickup_col) >= lit('2015-01-01')) & (col(pickup_col) <= lit('2025-12-31')))
    trip_duration_seconds = unix_timestamp(col(dropoff_col)) - unix_timestamp(col(pickup_col))
    df = df.filter((trip_duration_seconds >= 60) & (trip_duration_seconds <= 86400))
    df = df.filter((col('PULocationID').between(1, 263)) & (col('DOLocationID').between(1, 263)))
    df = df.withColumn('passenger_count',
        when(col('passenger_count').isNull(), 1)
        .when(col('passenger_count') < 1, 1)
        .when(col('passenger_count') > 9, 9)
        .otherwise(col('passenger_count'))
    )
    df = df.filter((col('trip_distance') > 0) & (col('trip_distance') < 500))
    df = df.filter(col('fare_amount') > 0)
    df = df.filter((col('total_amount') > 0) & (col('total_amount') < 1000))
    money_columns = ['extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge', 'congestion_surcharge', 'airport_fee']
    for col_name in money_columns:
        if col_name in df.columns:
            df = df.withColumn(col_name, when(col(col_name).isNull(), 0).when(col(col_name) < 0, 0).otherwise(col(col_name)))
    df = df.withColumn('RatecodeID', when(col('RatecodeID').isNull(), 1).when(~col('RatecodeID').between(1, 6), 1).otherwise(col('RatecodeID')))
    df = df.filter(col('payment_type').between(1, 6))
    df = df.filter(col('VendorID').isin([1, 2, 4]))
    if service == 'green' and 'trip_type' in df.columns:
        df = df.withColumn('trip_type', when(col('trip_type').isNull(), 1).when(~col('trip_type').isin([1, 2]), 1).otherwise(col('trip_type')))
    required_cols = ['fare_amount', 'extra', 'mta_tax', 'tip_amount', 'tolls_amount', 'improvement_surcharge']
    if all(c in df.columns for c in required_cols):
        expected_total = (col('fare_amount') + col('extra') + col('mta_tax') + col('tip_amount') + col('tolls_amount') + col('improvement_surcharge'))
        if 'congestion_surcharge' in df.columns:
            expected_total = expected_total + col('congestion_surcharge')
        if 'airport_fee' in df.columns:
            expected_total = expected_total + col('airport_fee')
        df = df.filter(spark_abs(col('total_amount') - expected_total) <= 5)
    if 'store_and_fwd_flag' in df.columns:
        df = df.withColumn('store_and_fwd_flag',
            when(col('store_and_fwd_flag').isNull(), 'N')
            .when(col('store_and_fwd_flag').isin(['Y', 'y', '1', 'true', 'True']), 'Y')
            .otherwise('N')
        )
    final_count = df.count()
    removed = initial_count - final_count
    pct_removed = (removed / initial_count * 100) if initial_count > 0 else 0
    print(f"    [CLEAN] Filas válidas: {final_count:,} ({removed:,} removidas, {pct_removed:.2f}%)")
    return df

print("Función de limpieza cargada")

Función de limpieza cargada


## 6. Agregar metadatos de ingesta

In [6]:
from pyspark.sql.functions import lit, current_timestamp

def add_metadata(df, year: int, month: int, run_id: str):
    """Agrega columnas de metadatos para trazabilidad de la ingesta."""
    return (
        df
        .withColumn('run_id', lit(run_id))
        .withColumn('source_year', lit(year))
        .withColumn('source_month', lit(month))
        .withColumn('ingested_at_utc', current_timestamp())
    )

print("Función de metadatos cargada")


Función de metadatos cargada


## 7. Escritura a Postgres

In [7]:
def write_batch(df, table_name: str, mode: str = "overwrite", writers: int = 4, batchsize: int = 5000):
    # número de writers
    df_to_write = df.coalesce(writers) 

    (df_to_write.write
        .format("jdbc")
        .option("url", get_postgres_jdbc_url())
        .option("dbtable", table_name)
        .option("user", PG_USER)
        .option("password", PG_PASSWORD)
        .option("driver", "org.postgresql.Driver")
        .option("batchsize", str(batchsize))   # tamaño de lote JDBC
        # .option("isolationLevel", "READ_COMMITTED")  # opcional
        .mode(mode)
        .save()
    )


print("Función de escritura a Postgres cargada")

Función de escritura a Postgres cargada


## 8. Creación de esquemas y tablas en Postgres

Antes de la ingesta, creamos los esquemas `raw` y `analytics` si no existen.

In [8]:
import psycopg2
from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT

def create_schemas():
    """Crea los esquemas raw y analytics si no existen."""
    conn = psycopg2.connect(
        host=PG_HOST,
        port=PG_PORT,
        database=PG_DB,
        user=PG_USER,
        password=PG_PASSWORD
    )
    conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
    cursor = conn.cursor()
    cursor.execute(f"CREATE SCHEMA IF NOT EXISTS {PG_SCHEMA_RAW};")
    cursor.execute("CREATE SCHEMA IF NOT EXISTS analytics;")
    print(f"Esquemas '{PG_SCHEMA_RAW}' y 'analytics' verificados/creados")
    cursor.close()
    conn.close()

# Ejecutar creación de esquemas
create_schemas()

Esquemas 'raw' y 'analytics' verificados/creados


## 9. INGESTA PRINCIPAL: Yellow & Green Taxi Trips (2015-2025)

**Proceso por mes**:
1. Descargar Parquet
2. Estandarizar columnas
3. **Limpiar y validar datos**
4. Agregar metadatos
5. Insertar en Postgres (`raw.yellow_taxi_trip` o `raw.green_taxi_trip`)

In [9]:
import time
import pandas as pd
from pyspark.sql.functions import lit, col

# Estadísticas globales
total_rows = 0
total_removed = 0
total_errors = 0
ingestion_log = []

print("="*80)
print("INICIANDO BACKFILL 2015-2025 CON LIMPIEZA DE DATOS Y VERIFICACIÓN DE EXISTENCIA")
print("="*80)

def is_already_ingested(service, year, month, table):
    """
    Retorna True si ya hay datos para ese service, año y mes.
    """
    try:
        df_check = spark.read \
            .format("jdbc") \
            .option("url", "admin@postgres:5432/nyc_taxi") \
            .option("dbtable", table) \
            .option("user", PG_USER) \
            .option("password", PG_PASSWORD) \
            .load() \
            .filter((col("service_type") == service) & (col("year") == year) & (col("month") == month))
        
        return df_check.count() > 0
    except Exception as e:
        print(f"    [WARNING] No se pudo verificar existencia: {e}")
        return False

for service in SERVICES:
    table = f"{PG_SCHEMA_RAW}.{service}_taxi_trip"
    print(f"\n{'='*80}")
    print(f"SERVICIO: {service.upper()} → {table}")
    print(f"{'='*80}\n")
    service_rows = 0
    service_removed = 0

    for y in YEARS:
        for m in MONTHS:

            # 0) Verificar si ya existe en DB
            if is_already_ingested(service, y, m, table):
                print(f"[{y}-{m:02d}] Ya existe en DB → saltando")
                log_entry = {
                    'service': service,
                    'year': y,
                    'month': m,
                    'raw_count': 0,
                    'clean_count': 0,
                    'removed': 0,
                    'pct_removed': 0,
                    'duration_sec': 0,
                    'status': 'SKIPPED'
                }
                ingestion_log.append(log_entry)
                continue

            try:
                t0 = time.time()
                print(f"[{y}-{m:02d}] Procesando {service}...")

                # 1) Descargar y leer Parquet
                tmp_path = download_to_temp(build_url(service, y, m))
                raw_df = spark.read.parquet(tmp_path)
                raw_count = raw_df.count()
                print(f"    Filas descargadas: {raw_count:,}")

                # 2) Estandarizar columnas
                df = standardize_columns(raw_df, service)

                # 3) Limpieza de datos
                df_clean = clean_taxi_data(df, service)
                clean_count = df_clean.count()
                removed = raw_count - clean_count

                # 4) Validar que hay datos
                if clean_count == 0:
                    print("    [SKIP] 0 filas después de limpieza\n")
                    log_entry = {
                        'service': service,
                        'year': y,
                        'month': m,
                        'raw_count': raw_count,
                        'clean_count': 0,
                        'removed': removed,
                        'pct_removed': 100.0,
                        'duration_sec': time.time() - t0,
                        'status': 'SKIPPED'
                    }
                    ingestion_log.append(log_entry)
                    continue

                # 5) Agregar metadatos
                df_final = add_metadata(df_clean, y, m, RUN_ID)
                df_final = df_final.withColumn("service_type", lit(service))  # 'yellow' o 'green'

                # 6) Escribir a Postgres
                print(f"    Escribiendo a {table}...", end=" ")
                write_batch(df_final, table, mode="overwrite", writers=4, batchsize=5000)
                print("OK")

                # 7) Estadísticas
                total_rows += clean_count
                total_removed += removed
                service_rows += clean_count
                service_removed += removed

                dt = time.time() - t0
                pct_removed = (removed / raw_count * 100) if raw_count > 0 else 0

                log_entry = {
                    'service': service,
                    'year': y,
                    'month': m,
                    'raw_count': raw_count,
                    'clean_count': clean_count,
                    'removed': removed,
                    'pct_removed': pct_removed,
                    'duration_sec': dt,
                    'status': 'OK'
                }
                ingestion_log.append(log_entry)

                print(f"    [OK] {clean_count:,} filas insertadas ")
                print(f"         ({removed:,} removidas, {pct_removed:.2f}%) en {dt:.1f}s\n")

            except Exception as e:
                total_errors += 1
                print(f"    [ERROR] {e}\n")
                log_entry = {
                    'service': service,
                    'year': y,
                    'month': m,
                    'raw_count': 0,
                    'clean_count': 0,
                    'removed': 0,
                    'pct_removed': 0,
                    'duration_sec': 0,
                    'status': 'ERROR',
                    'error': str(e)
                }
                ingestion_log.append(log_entry)

    print(f"\n{'='*80}")
    print(f"RESUMEN {service.upper()}:")
    print(f"  Filas insertadas: {service_rows:,}")
    print(f"  Filas removidas: {service_removed:,}")
    if service_rows + service_removed > 0:
        pct_valid = service_rows / (service_rows + service_removed) * 100
        print(f"  Tasa de validez: {pct_valid:.2f}%")
    print(f"{'='*80}\n")

print("\n" + "="*80)
print("RESUMEN GLOBAL DE INGESTA")
print("="*80)
print(f"Total filas insertadas: {total_rows:,}")
print(f"Total filas removidas: {total_removed:,}")
if total_rows + total_removed > 0:
    pct_valid = total_rows / (total_rows + total_removed) * 100
    print(f"Tasa de validez global: {pct_valid:.2f}%")
print(f"Total errores: {total_errors}")
print(f"RUN_ID: {RUN_ID}")
print("="*80)

# Convertir log a DataFrame de Pandas para análisis
log_df = pd.DataFrame(ingestion_log)
print("\nLog de ingesta guardado en variable 'log_df'")


INICIANDO BACKFILL 2015-2025 CON LIMPIEZA DE DATOS Y VERIFICACIÓN DE EXISTENCIA

SERVICIO: YELLOW → raw.yellow_taxi_trip

: java.sql.SQLException: No suitable driver
	at java.sql/java.sql.DriverManager.getDriver(DriverManager.java:299)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.$anonfun$driverClass$2(JDBCOptions.scala:109)
	at scala.Option.getOrElse(Option.scala:189)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:109)
	at org.apache.spark.sql.execution.datasources.jdbc.JDBCOptions.<init>(JDBCOptions.scala:41)
	at org.apache.spark.sql.execution.datasources.jdbc.JdbcRelationProvider.createRelation(JdbcRelationProvider.scala:34)
	at org.apache.spark.sql.execution.datasources.DataSource.resolveRelation(DataSource.scala:346)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:229)
	at org.apache.spark.sql.DataFrameReader.$anonfun$load$2(DataFrameReader.scala:211)
	at scala.Option.getOrElse(Option.scala:18

## 10. Análisis del log de ingesta

In [10]:
# Resumen por servicio
print("\nRESUMEN POR SERVICIO:")
if len(log_df) > 0:
    print(log_df.groupby('service').agg({
        'raw_count': 'sum',
        'clean_count': 'sum',
        'removed': 'sum',
        'duration_sec': 'sum'
    }).round(2))
else:
    print("Sin datos en el log.")

# Resumen por año
print("\n\nRESUMEN POR AÑO:")
if len(log_df) > 0:
    print(log_df.groupby('year').agg({
        'raw_count': 'sum',
        'clean_count': 'sum',
        'removed': 'sum'
    }).round(2))
else:
    print("Sin datos en el log.")

# Errores
if len(log_df) > 0:
    errors = log_df[log_df['status'] == 'ERROR']
    if len(errors) > 0:
        print(f"\n\nERRORES ({len(errors)} archivos):")
        print(errors[['service', 'year', 'month', 'error']])
    else:
        print("\n\n✓ Sin errores en la ingesta")


RESUMEN POR SERVICIO:
         raw_count  clean_count   removed  duration_sec
service                                                
yellow   726330010    696203512  30126498      23694.84


RESUMEN POR AÑO:
      raw_count  clean_count  removed
year                                 
2015  119538603    116430747  3107856
2016  131131805    127838281  3293524
2017  113500327    110474320  3026007
2018  102871387     99937427  2933960
2019   84598444     82020060  2578384
2020   24649092     23138248  1510844
2021   30904308     28488148  2416160
2022   39656098     36817964  2838134
2023   38310226     35531127  2779099
2024   41169720     35527190  5642530
2025          0            0        0


ERRORES (14 archivos):
    service  year  month                                              error
2    yellow  2015      3  An error occurred while calling o877.parquet.\...
4    yellow  2015      5  An error occurred while calling o1308.parquet....
120  yellow  2025      1  Column cbd_conges

## 11. Ingesta de Taxi Zone Lookup

Cargamos el catálogo de zonas que mapea `LocationID` → `zone`, `borough`, `service_zone`.

In [11]:
import os
import tempfile
from urllib.request import urlretrieve
from pyspark.sql.functions import lit, current_timestamp

# URL del archivo taxi_zone_lookup
ZONE_LOOKUP_URL = "https://d37ci6vzurychx.cloudfront.net/misc/taxi+_zone_lookup.csv"

print("Descargando Taxi Zone Lookup...")
try:
    # Descargar CSV
    temp_dir = tempfile.gettempdir()
    zone_path = os.path.join(temp_dir, "taxi_zone_lookup.csv")
    urlretrieve(ZONE_LOOKUP_URL, zone_path)

    # Leer con Spark
    zones_df = spark.read.csv(zone_path, header=True, inferSchema=True)

    # Estandarizar nombres de columnas
    zones_df = zones_df.toDF(*[c.lower().replace(' ', '_') for c in zones_df.columns])

    # Agregar metadatos
    zones_df = zones_df.withColumn('run_id', lit(RUN_ID)) \
                       .withColumn('ingested_at_utc', current_timestamp())

    # Contar filas
    zone_count = zones_df.count()
    print(f"Zonas cargadas: {zone_count}")

    # Escribir a Postgres (overwrite para mantener actualizado)
    zones_table = f"{PG_SCHEMA_RAW}.taxi_zone_lookup"
    write_batch(zones_df, zones_table, mode="overwrite")

    print(f"✓ Taxi Zone Lookup insertado en {zones_table}")

    # Mostrar preview
    zones_df.show(10, truncate=False)

except Exception as e:
    print(f"[ERROR] No se pudo cargar Taxi Zone Lookup: {e}")


Descargando Taxi Zone Lookup...
Zonas cargadas: 265
✓ Taxi Zone Lookup insertado en raw.taxi_zone_lookup
+----------+-------------+-----------------------+------------+---------+--------------------------+
|locationid|borough      |zone                   |service_zone|run_id   |ingested_at_utc           |
+----------+-------------+-----------------------+------------+---------+--------------------------+
|1         |EWR          |Newark Airport         |EWR         |localtest|2025-11-04 01:57:07.524996|
|2         |Queens       |Jamaica Bay            |Boro Zone   |localtest|2025-11-04 01:57:07.524996|
|3         |Bronx        |Allerton/Pelham Gardens|Boro Zone   |localtest|2025-11-04 01:57:07.524996|
|4         |Manhattan    |Alphabet City          |Yellow Zone |localtest|2025-11-04 01:57:07.524996|
|5         |Staten Island|Arden Heights          |Boro Zone   |localtest|2025-11-04 01:57:07.524996|
|6         |Staten Island|Arrochar/Fort Wadsworth|Boro Zone   |localtest|2025-11-04 01:

## 12. Validación final: Conteos en Postgres

In [12]:
def get_table_count(table_name: str) -> int:
    """Obtiene el conteo de filas de una tabla en Postgres."""
    jdbc_url = get_postgres_jdbc_url()
    properties = get_postgres_properties()
    query = f"(SELECT COUNT(*) as count FROM {table_name}) as subquery"
    df = spark.read.jdbc(url=jdbc_url, table=query, properties=properties)
    return df.collect()[0]['count']

print("\n" + "="*80)
print("VALIDACIÓN FINAL: Conteos en Postgres")
print("="*80)

try:
    yellow_count = get_table_count(f"{PG_SCHEMA_RAW}.yellow_taxi_trip")
    print(f"\n{PG_SCHEMA_RAW}.yellow_taxi_trip: {yellow_count:,} filas")
except Exception as e:
    print(f"Error leyendo yellow_taxi_trip: {e}")
    yellow_count = 0

try:
    green_count = get_table_count(f"{PG_SCHEMA_RAW}.green_taxi_trip")
    print(f"{PG_SCHEMA_RAW}.green_taxi_trip: {green_count:,} filas")
except Exception as e:
    print(f"Error leyendo green_taxi_trip: {e}")
    green_count = 0

try:
    zones_count = get_table_count(f"{PG_SCHEMA_RAW}.taxi_zone_lookup")
    print(f"{PG_SCHEMA_RAW}.taxi_zone_lookup: {zones_count:,} filas")
except Exception as e:
    print(f"Error leyendo taxi_zone_lookup: {e}")
    zones_count = 0

print(f"\nTOTAL TRIPS: {yellow_count + green_count:,} filas")
print("="*80)


VALIDACIÓN FINAL: Conteos en Postgres

raw.yellow_taxi_trip: 708,576,559 filas
raw.green_taxi_trip: 68,262,085 filas
raw.taxi_zone_lookup: 265 filas

TOTAL TRIPS: 776,838,644 filas


## 13. Exploración rápida de los datos cargados

In [13]:
# Leer muestra de Yellow Taxi
print("\nMUESTRA: raw.yellow_taxi_trip (primeras 5 filas)")
print("="*80)

jdbc_url = get_postgres_jdbc_url()
properties = get_postgres_properties()

try:
    yellow_sample = spark.read.jdbc(
        url=jdbc_url,
        table=f"(SELECT * FROM {PG_SCHEMA_RAW}.yellow_taxi_trip LIMIT 5) as sample",
        properties=properties
    )
    yellow_sample.show(5, truncate=False, vertical=True)
except Exception as e:
    print(f"Error leyendo muestra de yellow_taxi_trip: {e}")


MUESTRA: raw.yellow_taxi_trip (primeras 5 filas)
-RECORD 0----------------------------------------------
 VendorID              | 1                             
 tpep_pickup_datetime  | 2015-01-01 00:11:33           
 tpep_dropoff_datetime | 2015-01-01 00:16:48           
 passenger_count       | 1                             
 trip_distance         | 1.0                           
 RatecodeID            | 1                             
 store_and_fwd_flag    | N                             
 PULocationID          | 41                            
 DOLocationID          | 166                           
 payment_type          | 1                             
 fare_amount           | 5.7                           
 extra                 | 0.5                           
 mta_tax               | 0.5                           
 tip_amount            | 1.4                           
 tolls_amount          | 0.0                           
 improvement_surcharge | 0.0                          

In [14]:
# Leer muestra de Green Taxi
print("\nMUESTRA: raw.green_taxi_trip (primeras 5 filas)")
print("="*80)

try:
    green_sample = spark.read.jdbc(
        url=jdbc_url,
        table=f"(SELECT * FROM {PG_SCHEMA_RAW}.green_taxi_trip LIMIT 5) as sample",
        properties=properties
    )
    green_sample.show(5, truncate=False, vertical=True)
except Exception as e:
    print(f"Error leyendo muestra de green_taxi_trip: {e}")


MUESTRA: raw.green_taxi_trip (primeras 5 filas)
-RECORD 0-------------------------------------------
 VendorID              | 1                          
 lpep_pickup_datetime  | 2024-10-11 10:36:29        
 lpep_dropoff_datetime | 2024-10-11 10:47:38        
 store_and_fwd_flag    | N                          
 RatecodeID            | 1                          
 PULocationID          | 97                         
 DOLocationID          | 181                        
 passenger_count       | 1                          
 trip_distance         | 2.3                        
 fare_amount           | 13.5                       
 extra                 | 0.0                        
 mta_tax               | 1.5                        
 tip_amount            | 3.75                       
 tolls_amount          | 0.0                        
 ehail_fee             | NULL                       
 improvement_surcharge | 1.0                        
 total_amount          | 18.75                    

In [15]:
# Distribución por año y servicio
print("\nDISTRIBUCIÓN POR AÑO Y SERVICIO:")
print("="*80)

try:
    query_yellow = f"""
    (SELECT 
        source_year as year,
        'yellow' as service,
        COUNT(*) as trips
     FROM {PG_SCHEMA_RAW}.yellow_taxi_trip
     GROUP BY source_year
     ORDER BY source_year) as stats
    """

    query_green = f"""
    (SELECT 
        source_year as year,
        'green' as service,
        COUNT(*) as trips
     FROM {PG_SCHEMA_RAW}.green_taxi_trip
     GROUP BY source_year
     ORDER BY source_year) as stats
    """

    stats_yellow = spark.read.jdbc(url=jdbc_url, table=query_yellow, properties=properties)
    stats_green = spark.read.jdbc(url=jdbc_url, table=query_green, properties=properties)

    stats_all = stats_yellow.union(stats_green).orderBy('year', 'service')
    stats_all.show(30, truncate=False)
except Exception as e:
    print(f"Error calculando distribución: {e}")


DISTRIBUCIÓN POR AÑO Y SERVICIO:
+----+-------+---------+
|year|service|trips    |
+----+-------+---------+
|2015|green  |23194726 |
|2015|yellow |128803794|
|2016|green  |15933312 |
|2016|yellow |127838281|
|2017|green  |11445721 |
|2017|yellow |110474320|
|2018|green  |8588553  |
|2018|yellow |99937427 |
|2019|green  |5412312  |
|2019|yellow |82020060 |
|2020|green  |1122600  |
|2020|yellow |23138248 |
|2021|green  |606589   |
|2021|yellow |28488148 |
|2022|green  |686860   |
|2022|yellow |36817964 |
|2023|green  |681369   |
|2023|yellow |35531127 |
|2024|green  |590043   |
|2024|yellow |35527190 |
+----+-------+---------+



## 14. Verificación de calidad de datos

In [16]:
# Verificar rangos de valores en Yellow Taxi
print("\nCALIDAD DE DATOS: raw.yellow_taxi_trip")
print("="*80)

try:
    quality_query_yellow = f"""
    (SELECT
        MIN(tpep_pickup_datetime) as min_pickup,
        MAX(tpep_pickup_datetime) as max_pickup,
        MIN(passenger_count) as min_passengers,
        MAX(passenger_count) as max_passengers,
        MIN(trip_distance) as min_distance,
        MAX(trip_distance) as max_distance,
        MIN(total_amount) as min_total,
        MAX(total_amount) as max_total,
        AVG(total_amount) as avg_total
     FROM {PG_SCHEMA_RAW}.yellow_taxi_trip) as quality
    """

    quality_yellow = spark.read.jdbc(url=jdbc_url, table=quality_query_yellow, properties=properties)
    quality_yellow.show(truncate=False, vertical=True)
except Exception as e:
    print(f"Error en calidad de datos (yellow): {e}")


CALIDAD DE DATOS: raw.yellow_taxi_trip
-RECORD 0-----------------------------
 min_pickup     | 2015-01-01 00:00:00 
 max_pickup     | 2025-03-23 20:42:06 
 min_passengers | 1                   
 max_passengers | 9                   
 min_distance   | 0.01                
 max_distance   | 482.1               
 min_total      | 0.01                
 max_total      | 999.8               
 avg_total      | 18.007333323369625  



In [17]:
# Verificar rangos de valores en Green Taxi
print("\nCALIDAD DE DATOS: raw.green_taxi_trip")
print("="*80)

try:
    quality_query_green = f"""
    (SELECT
        MIN(lpep_pickup_datetime) as min_pickup,
        MAX(lpep_pickup_datetime) as max_pickup,
        MIN(passenger_count) as min_passengers,
        MAX(passenger_count) as max_passengers,
        MIN(trip_distance) as min_distance,
        MAX(trip_distance) as max_distance,
        MIN(total_amount) as min_total,
        MAX(total_amount) as max_total,
        AVG(total_amount) as avg_total
     FROM {PG_SCHEMA_RAW}.green_taxi_trip) as quality
    """

    quality_green = spark.read.jdbc(url=jdbc_url, table=quality_query_green, properties=properties)
    quality_green.show(truncate=False, vertical=True)
except Exception as e:
    print(f"Error en calidad de datos (green): {e}")


CALIDAD DE DATOS: raw.green_taxi_trip
-RECORD 0-----------------------------
 min_pickup     | 2015-01-01 00:00:02 
 max_pickup     | 2025-01-01 22:21:15 
 min_passengers | 1                   
 max_passengers | 9                   
 min_distance   | 0.01                
 max_distance   | 450.21              
 min_total      | 0.01                
 max_total      | 999.99              
 avg_total      | 15.028900739400548  



## 15. Exportar log de ingesta a CSV (opcional)

In [18]:
# Guardar log de ingesta como CSV para documentación
log_filename = f"ingestion_log_{RUN_ID}.csv"
try:
    log_df.to_csv(log_filename, index=False)
    print(f"\n✓ Log de ingesta guardado en: {log_filename}")
except Exception as e:
    print(f"No se pudo guardar el log: {e}")


✓ Log de ingesta guardado en: ingestion_log_localtest.csv


## 16. Resumen final y próximos pasos

In [19]:
print("\n" + "="*80)
print("INGESTA COMPLETADA")
print("="*80)
try:
    print(f"\n✓ Esquema RAW poblado: {PG_SCHEMA_RAW}")
    print("  - Verifica conteos y calidad en las celdas anteriores.")
except Exception as e:
    print(f"Resumen parcial: {e}")

print(f"✓ RUN_ID: {RUN_ID}")
print(f"\n{'='*80}")
print("PRÓXIMOS PASOS:")
print("="*80)
print("1. Ejecutar obt-builder para crear analytics.obt_trips")
print("   Comando: docker compose run obt-builder --mode full-rebuild")
print("\n2. Abrir notebook ML: ml_total_amount_regression.ipynb")
print("   - Entrenar modelos from-scratch (SGD, Ridge, Lasso, Elastic Net)")
print("   - Comparar con scikit-learn")
print("   - Evaluar y seleccionar mejor modelo")
print("="*80)


INGESTA COMPLETADA

✓ Esquema RAW poblado: raw
  - Verifica conteos y calidad en las celdas anteriores.
✓ RUN_ID: localtest

PRÓXIMOS PASOS:
1. Ejecutar obt-builder para crear analytics.obt_trips
   Comando: docker compose run obt-builder --mode full-rebuild

2. Abrir notebook ML: ml_total_amount_regression.ipynb
   - Entrenar modelos from-scratch (SGD, Ridge, Lasso, Elastic Net)
   - Comparar con scikit-learn
   - Evaluar y seleccionar mejor modelo


## Notas de implementación

### Limpieza de datos aplicada:
1. **Timestamps**: Pickup < Dropoff, rango 2015-2025, duración 1min-24hrs  
2. **LocationID**: Válidos entre 1-263 (zonas NYC oficiales)  
3. **Passenger count**: 1-9 pasajeros (imputa nulos a 1)  
4. **Trip distance**: Positiva y < 500 millas  
5. **Montos**: No negativos, total_amount < $1,000  
6. **Códigos**: Rate code 1-6, Payment type 1-6, Vendor 1/2/4  
7. **Consistencia**: Diferencia entre total_amount y suma de componentes ≤ $5

### Metadatos agregados:
- `run_id`: Identificador único de esta ejecución  
- `source_year`: Año del archivo fuente  
- `source_month`: Mes del archivo fuente  
- `ingested_at_utc`: Timestamp UTC de la ingesta

### Variables de ambiente requeridas:
```
PG_HOST=postgres
PG_PORT=5432
PG_DB=nyc_tlc
PG_USER=postgres
PG_PASSWORD=<tu_password>
PG_SCHEMA_RAW=raw
RUN_ID=ingesta_20250103_120000
```

usage: ipykernel_launcher.py [-h] --mode {full,by-partition}
                             [--year-start YEAR_START] [--year-end YEAR_END]
                             [--months MONTHS] [--services SERVICES]
                             [--run-id RUN_ID] [--overwrite]
ipykernel_launcher.py: error: the following arguments are required: --mode


SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)
