In [5]:
from pyspark.sql import SparkSession

spark = (
    SparkSession.builder
    .appName("PZ4-IngestaTest")
    .config("spark.driver.extraClassPath", "/opt/jars/postgresql-42.7.4.jar")
    .config("spark.jars", "/opt/jars/postgresql-42.7.4.jar")
    .getOrCreate()
)

spark


In [2]:
%pip install psycopg2-binary

Collecting psycopg2-binary
  Downloading psycopg2_binary-2.9.11-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.whl.metadata (4.9 kB)
Downloading psycopg2_binary-2.9.11-cp311-cp311-manylinux2014_x86_64.manylinux_2_17_x86_64.whl (4.2 MB)
[2K   [90m‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ‚îÅ[0m [32m4.2/4.2 MB[0m [31m28.7 MB/s[0m eta [36m0:00:00[0m00:01[0m00:01[0m
[?25hInstalling collected packages: psycopg2-binary
Successfully installed psycopg2-binary-2.9.11
Note: you may need to restart the kernel to use updated packages.


In [8]:
import os
from datetime import datetime, timezone
from pyspark.sql import functions as F
import psycopg2

# Config
DATA_ROOT = "/home/jovyan/work/data/trip-data"
service = "yellow"
year, month = 2015, 1

PG_HOST = os.getenv("PG_HOST", "postgres")
PG_PORT = os.getenv("PG_PORT", "5432")
PG_DB   = os.getenv("PG_DB", "nyctaxi")
PG_USER = os.getenv("PG_USER", "pset")
PG_PWD  = os.getenv("PG_PASSWORD", "pset_password")
PG_URL  = f"jdbc:postgresql://{PG_HOST}:{PG_PORT}/{PG_DB}"
PG_PROPS = {"driver": "org.postgresql.Driver", "user": PG_USER, "password": PG_PWD}

# 1) Borrar partici√≥n previa en Postgres (idempotencia)
conn = psycopg2.connect(
    host=PG_HOST,
    port=PG_PORT,
    dbname=PG_DB,
    user=PG_USER,
    password=PG_PWD,
)
cur = conn.cursor()
cur.execute(
    "DELETE FROM raw.yellow_taxi_trip WHERE source_year=%s AND source_month=%s;",
    (year, month),
)
deleted = cur.rowcount
conn.commit()
cur.close()
conn.close()
print(f"BORRADOS anteriores: {deleted} filas de yellow {year}-{month:02d}")

# 2) Leer parquet
fname = f"{service}_tripdata_{year}-{month:02d}.parquet"
path  = f"{DATA_ROOT}/{fname}"
print("Leyendo parquet:", path)
df = spark.read.parquet(path)

# 3) Normalizaci√≥n m√≠nima + metadatos
run_id = datetime.now(timezone.utc).strftime("ingest-%Y%m%dT%H%M%SZ")

dfn = (df
    .withColumnRenamed("tpep_pickup_datetime",  "pickup_datetime")
    .withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime")
    .withColumn("service_type",    F.lit(service))
    .withColumn("source_year",     F.lit(int(year)))
    .withColumn("source_month",    F.lit(int(month)))
    .withColumn("ingested_at_utc", F.lit(datetime.now(timezone.utc)))
    .withColumn("run_id",          F.lit(run_id))
)

# 4) Escribir a Postgres (append)
table = "raw.yellow_taxi_trip"
(dfn.write
    .mode("append")
    .jdbc(PG_URL, table, properties=PG_PROPS))

cnt = dfn.count()
print(f"INSERTADAS: {cnt} filas nuevas en {table} para {year}-{month:02d}")

# 5) Verificar conteo final directamente en Postgres
conn = psycopg2.connect(
    host=PG_HOST,
    port=PG_PORT,
    dbname=PG_DB,
    user=PG_USER,
    password=PG_PWD,
)
cur = conn.cursor()
cur.execute(
    "SELECT COUNT(*) FROM raw.yellow_taxi_trip WHERE source_year=%s AND source_month=%s;",
    (year, month),
)
final_cnt = cur.fetchone()[0]
cur.close()
conn.close()
print(f"COUNT final en Postgres (yellow {year}-{month:02d}): {final_cnt}")


BORRADOS anteriores: 0 filas de yellow 2015-01
Leyendo parquet: /home/jovyan/work/data/trip-data/yellow_tripdata_2015-01.parquet
INSERTADAS: 12741035 filas nuevas en raw.yellow_taxi_trip para 2015-01
COUNT final en Postgres (yellow 2015-01): 12741035


In [10]:
path = "/home/jovyan/work/data/trip-data/green_tripdata_2015-01.parquet"
dfg = spark.read.parquet(path)

dfg.printSchema()


root
 |-- VendorID: long (nullable = true)
 |-- lpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- lpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: integer (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- trip_type: double (nullable = true)
 |-- congestion_surcharge: integer (nullable = true)



In [11]:
import os
from datetime import datetime, timezone
from pyspark.sql import functions as F
import psycopg2

# Config
DATA_ROOT = "/home/jovyan/work/data/trip-data"
service = "green"
year, month = 2015, 1

PG_HOST = os.getenv("PG_HOST", "postgres")
PG_PORT = os.getenv("PG_PORT", "5432")
PG_DB   = os.getenv("PG_DB", "nyctaxi")
PG_USER = os.getenv("PG_USER", "pset")
PG_PWD  = os.getenv("PG_PASSWORD", "pset_password")
PG_URL  = f"jdbc:postgresql://{PG_HOST}:{PG_PORT}/{PG_DB}"
PG_PROPS = {"driver": "org.postgresql.Driver", "user": PG_USER, "password": PG_PWD}

# 1) Borrar partici√≥n previa SOLO si la tabla ya existe
conn = psycopg2.connect(
    host=PG_HOST,
    port=PG_PORT,
    dbname=PG_DB,
    user=PG_USER,
    password=PG_PWD,
)
cur = conn.cursor()

cur.execute("""
    SELECT EXISTS (
      SELECT 1
      FROM information_schema.tables
      WHERE table_schema = 'raw'
        AND table_name   = 'green_taxi_trip'
    );
""")
exists = cur.fetchone()[0]

if exists:
    cur.execute(
        "DELETE FROM raw.green_taxi_trip WHERE source_year=%s AND source_month=%s;",
        (year, month),
    )
    deleted = cur.rowcount
    print(f"BORRADOS anteriores: {deleted} filas de green {year}-{month:02d}")
else:
    print("Tabla raw.green_taxi_trip a√∫n no existe; no hay nada que borrar.")

conn.commit()
cur.close()
conn.close()

# 2) Leer parquet
fname = f"{service}_tripdata_{year}-{month:02d}.parquet"
path  = f"{DATA_ROOT}/{fname}"
print("Leyendo parquet:", path)
df = spark.read.parquet(path)

# 3) Normalizaci√≥n m√≠nima + metadatos
run_id = datetime.now(timezone.utc).strftime("ingest-%Y%m%dT%H%M%SZ")

dfn = (df
    .withColumnRenamed("lpep_pickup_datetime",  "pickup_datetime")
    .withColumnRenamed("lpep_dropoff_datetime", "dropoff_datetime")
    .withColumn("service_type",    F.lit(service))
    .withColumn("source_year",     F.lit(int(year)))
    .withColumn("source_month",    F.lit(int(month)))
    .withColumn("ingested_at_utc", F.lit(datetime.now(timezone.utc)))
    .withColumn("run_id",          F.lit(run_id))
)

# 4) Escribir a Postgres (append)
table = "raw.green_taxi_trip"
(dfn.write
    .mode("append")
    .jdbc(PG_URL, table, properties=PG_PROPS))

cnt = dfn.count()
print(f"INSERTADAS: {cnt} filas nuevas en {table} para {year}-{month:02d}")

# 5) Verificar conteo final directamente en Postgres
conn = psycopg2.connect(
    host=PG_HOST,
    port=PG_PORT,
    dbname=PG_DB,
    user=PG_USER,
    password=PG_PWD,
)
cur = conn.cursor()
cur.execute(
    "SELECT COUNT(*) FROM raw.green_taxi_trip WHERE source_year=%s AND source_month=%s;",
    (year, month),
)
final_cnt = cur.fetchone()[0]
cur.close()
conn.close()
print(f"COUNT final en Postgres (green {year}-{month:02d}): {final_cnt}")


Tabla raw.green_taxi_trip a√∫n no existe; no hay nada que borrar.
Leyendo parquet: /home/jovyan/work/data/trip-data/green_tripdata_2015-01.parquet
INSERTADAS: 1508493 filas nuevas en raw.green_taxi_trip para 2015-01
COUNT final en Postgres (green 2015-01): 1508493


In [3]:
def ingest_month(service: str, year: int, month: int):
    """
    Ingesta idempotente de un (service, year, month) desde parquet ‚Üí raw.<service>_taxi_trip.
    service: 'yellow' o 'green'
    """
    import os
    from datetime import datetime, timezone
    import psycopg2
    from pyspark.sql import functions as F

    DATA_ROOT = "/home/jovyan/work/data/trip-data"

    PG_HOST = os.getenv("PG_HOST", "postgres")
    PG_PORT = os.getenv("PG_PORT", "5432")
    PG_DB   = os.getenv("PG_DB", "nyctaxi")
    PG_USER = os.getenv("PG_USER", "pset")
    PG_PWD  = os.getenv("PG_PASSWORD", "pset_password")

    # üëá CAMBIO 1: activamos reWriteBatchedInserts en el JDBC URL
    PG_URL  = f"jdbc:postgresql://{PG_HOST}:{PG_PORT}/{PG_DB}?reWriteBatchedInserts=true"
    PG_PROPS = {"driver": "org.postgresql.Driver", "user": PG_USER, "password": PG_PWD}

    # 1) Borrar partici√≥n previa solo si la tabla existe
    conn = psycopg2.connect(
        host=PG_HOST,
        port=PG_PORT,
        dbname=PG_DB,
        user=PG_USER,
        password=PG_PWD,
    )
    cur = conn.cursor()
    table_name = f"{service}_taxi_trip"

    cur.execute("""
        SELECT EXISTS (
          SELECT 1
          FROM information_schema.tables
          WHERE table_schema = 'raw'
            AND table_name   = %s
        );
    """, (table_name,))
    exists = cur.fetchone()[0]

    if exists:
        cur.execute(
            f"DELETE FROM raw.{table_name} WHERE source_year=%s AND source_month=%s;",
            (year, month),
        )
        deleted = cur.rowcount
        print(f"[{service} {year}-{month:02d}] BORRADOS anteriores: {deleted}")
    else:
        print(f"[{service} {year}-{month:02d}] Tabla raw.{table_name} a√∫n no existe; no hay nada que borrar.")

    conn.commit()
    cur.close()
    conn.close()

    # 2) Leer parquet
    fname = f"{service}_tripdata_{year}-{month:02d}.parquet"
    path  = f"{DATA_ROOT}/{fname}"
    print(f"[{service} {year}-{month:02d}] Leyendo parquet: {path}")
    df = spark.read.parquet(path)

    # 3) Normalizar timestamps seg√∫n servicio
    run_id = datetime.now(timezone.utc).strftime("ingest-%Y%m%dT%H%M%SZ")

    if service == "yellow":
        df = (df
            .withColumnRenamed("tpep_pickup_datetime",  "pickup_datetime")
            .withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime")
        )
    elif service == "green":
        df = (df
            .withColumnRenamed("lpep_pickup_datetime",  "pickup_datetime")
            .withColumnRenamed("lpep_dropoff_datetime", "dropoff_datetime")
        )
    else:
        raise ValueError(f"Servicio no soportado: {service}")

    dfn = (df
        .withColumn("service_type",    F.lit(service))
        .withColumn("source_year",     F.lit(int(year)))
        .withColumn("source_month",    F.lit(int(month)))
        .withColumn("ingested_at_utc", F.lit(datetime.now(timezone.utc)))
        .withColumn("run_id",          F.lit(run_id))
    )

    # 4) Escribir a Postgres (optimizado)
    table = f"raw.{service}_taxi_trip"
    (dfn.write
        .mode("append")
        .option("batchsize", 10000)     # üëà CAMBIO 2: batches m√°s grandes
        .option("numPartitions", 8)     # üëà CAMBIO 3: control de paralelismo JDBC
        .jdbc(PG_URL, table, properties=PG_PROPS))

    cnt = dfn.count()
    print(f"[{service} {year}-{month:02d}] INSERTADAS: {cnt} filas")

    return cnt


In [13]:
ingest_month("yellow", 2015, 2)


[yellow 2015-02] BORRADOS anteriores: 0
[yellow 2015-02] Leyendo parquet: /home/jovyan/work/data/trip-data/yellow_tripdata_2015-02.parquet
[yellow 2015-02] INSERTADAS: 12442394 filas


12442394

In [14]:
ingest_month("green", 2015, 2)

[green 2015-02] BORRADOS anteriores: 0
[green 2015-02] Leyendo parquet: /home/jovyan/work/data/trip-data/green_tripdata_2015-02.parquet
[green 2015-02] INSERTADAS: 1574830 filas


1574830

In [15]:
import os
import pandas as pd
from pyspark.sql import functions as F
from datetime import datetime, timezone

DATA_ROOT = "/home/jovyan/work/data/trip-data"
zones_path = os.path.join(DATA_ROOT, "taxi_zone_lookup.csv")

PG_HOST = os.getenv("PG_HOST", "postgres")
PG_PORT = os.getenv("PG_PORT", "5432")
PG_DB   = os.getenv("PG_DB", "nyctaxi")
PG_USER = os.getenv("PG_USER", "pset")
PG_PWD  = os.getenv("PG_PASSWORD", "pset_password")
PG_URL  = f"jdbc:postgresql://{PG_HOST}:{PG_PORT}/{PG_DB}"
PG_PROPS = {"driver": "org.postgresql.Driver", "user": PG_USER, "password": PG_PWD}

print("Leyendo CSV:", zones_path)
pdf = pd.read_csv(zones_path)

run_id = datetime.now(timezone.utc).strftime("zones-%Y%m%dT%H%M%SZ")

sdf = (spark.createDataFrame(pdf)
       .withColumn("ingested_at_utc", F.lit(datetime.now(timezone.utc)))
       .withColumn("run_id", F.lit(run_id)))

# overwrite porque es una tabla de cat√°logo chica
table = "raw.taxi_zone_lookup"
sdf.write.mode("overwrite").jdbc(PG_URL, table, properties=PG_PROPS)

print(f"Cargadas {sdf.count()} filas en {table}")


Leyendo CSV: /home/jovyan/work/data/trip-data/taxi_zone_lookup.csv
Cargadas 265 filas en raw.taxi_zone_lookup


In [16]:
import time
from datetime import timedelta

services = ["yellow", "green"]
year = 2015
months = list(range(1, 13))

resumen = []
total_jobs = len(services) * len(months)
done = 0
start_global = time.time()

print(f"== Iniciando ingesta a√±o {year} ==")

for svc in services:
    for m in months:
        print("\n" + "="*70)
        print(f">>> Ingestando {svc.upper()} {year}-{m:02d}")
        
        start = time.time()
        cnt = ingest_month(svc, year, m)
        elapsed = time.time() - start

        done += 1
        global_elapsed = time.time() - start_global
        avg_per_job = global_elapsed / done
        remaining_jobs = total_jobs - done
        eta = remaining_jobs * avg_per_job

        print(f"{svc} {year}-{m:02d}: {cnt} filas")
        print(f"Tiempo: {timedelta(seconds=int(elapsed))}")
        print(f"Progreso: {done}/{total_jobs}")
        print(f"ETA aproximada: {timedelta(seconds=int(eta))}")

        # Barra de progreso
        bar_length = 50
        filled = int(bar_length * done / total_jobs)
        bar = "‚ñà" * filled + "-" * (bar_length - filled)
        print(f"[{bar}]")

        resumen.append((svc, year, m, cnt))

print("\n== RESUMEN FINAL 2015 ==")
for svc, y, m, cnt in resumen:
    print(f"{svc} {y}-{m:02d}: {cnt} filas")

print("\nDuraci√≥n total:", timedelta(seconds=int(time.time() - start_global)))


== Iniciando ingesta a√±o 2015 ==

>>> Ingestando YELLOW 2015-01
[yellow 2015-01] BORRADOS anteriores: 12741035
[yellow 2015-01] Leyendo parquet: /home/jovyan/work/data/trip-data/yellow_tripdata_2015-01.parquet
[yellow 2015-01] INSERTADAS: 12741035 filas
yellow 2015-01: 12741035 filas
Tiempo: 0:07:09
Progreso: 1/24
ETA aproximada: 2:44:36
[‚ñà‚ñà------------------------------------------------]

>>> Ingestando YELLOW 2015-02
[yellow 2015-02] BORRADOS anteriores: 12442394
[yellow 2015-02] Leyendo parquet: /home/jovyan/work/data/trip-data/yellow_tripdata_2015-02.parquet
[yellow 2015-02] INSERTADAS: 12442394 filas
yellow 2015-02: 12442394 filas
Tiempo: 0:06:55
Progreso: 2/24
ETA aproximada: 2:34:54
[‚ñà‚ñà‚ñà‚ñà----------------------------------------------]

>>> Ingestando YELLOW 2015-03
[yellow 2015-03] BORRADOS anteriores: 0
[yellow 2015-03] Leyendo parquet: /home/jovyan/work/data/trip-data/yellow_tripdata_2015-03.parquet
[yellow 2015-03] INSERTADAS: 13342951 filas
yellow 2015-03: 1334

In [18]:
import time
from datetime import timedelta

start = time.time()
cnt = ingest_month("yellow", 2016, 1)
elapsed = time.time() - start
print("Tiempo yellow 2016-01:", timedelta(seconds=int(elapsed)), "filas:", cnt)

[yellow 2016-01] BORRADOS anteriores: 0
[yellow 2016-01] Leyendo parquet: /home/jovyan/work/data/trip-data/yellow_tripdata_2016-01.parquet
[yellow 2016-01] INSERTADAS: 10905067 filas
Tiempo yellow 2016-01: 0:02:52 filas: 10905067


In [19]:
ingest_month("yellow", 2016, 1)
ingest_month("green", 2016, 1)

[yellow 2016-01] BORRADOS anteriores: 10905067
[yellow 2016-01] Leyendo parquet: /home/jovyan/work/data/trip-data/yellow_tripdata_2016-01.parquet
[yellow 2016-01] INSERTADAS: 10905067 filas
[green 2016-01] BORRADOS anteriores: 0
[green 2016-01] Leyendo parquet: /home/jovyan/work/data/trip-data/green_tripdata_2016-01.parquet
[green 2016-01] INSERTADAS: 1445292 filas


1445292

In [4]:
import time
from datetime import timedelta

services = ["yellow", "green"]
years = list(range(2016, 2026))  # 2016‚Äì2024
months = list(range(1, 13))

jobs = []
for y in years:
    for svc in services:
        for m in months:
            jobs.append((svc, y, m))

total_jobs = len(jobs)
done = 0
start_global = time.time()
resumen = []

print(f"== Iniciando ingesta a√±os {years[0]}‚Äì{years[-1]} ==")

for svc, y, m in jobs:
    print("\n" + "="*70)
    print(f">>> Ingestando {svc.upper()} {y}-{m:02d}")

    start = time.time()
    cnt = ingest_month(svc, y, m)
    elapsed = time.time() - start

    done += 1
    global_elapsed = time.time() - start_global
    avg_per_job = global_elapsed / done
    remaining_jobs = total_jobs - done
    eta = remaining_jobs * avg_per_job

    print(f"{svc} {y}-{m:02d}: {cnt} filas")
    print(f"Tiempo: {timedelta(seconds=int(elapsed))}")
    print(f"Progreso: {done}/{total_jobs}")
    print(f"ETA aproximada: {timedelta(seconds=int(eta))}")

    # Barra de progreso
    bar_length = 50
    filled = int(bar_length * done / total_jobs)
    bar = "‚ñà" * filled + "-" * (bar_length - filled)
    print(f"[{bar}]")

    resumen.append((svc, y, m, cnt))

print("\n== RESUMEN FINAL MULTIANUAL ==")
for svc, y, m, cnt in resumen:
    print(f"{svc} {y}-{m:02d}: {cnt} filas")

print("\nDuraci√≥n total:", timedelta(seconds=int(time.time() - start_global)))


== Iniciando ingesta a√±os 2016‚Äì2025 ==

>>> Ingestando YELLOW 2016-01
[yellow 2016-01] BORRADOS anteriores: 10905067
[yellow 2016-01] Leyendo parquet: /home/jovyan/work/data/trip-data/yellow_tripdata_2016-01.parquet
[yellow 2016-01] INSERTADAS: 10905067 filas
yellow 2016-01: 10905067 filas
Tiempo: 0:04:47
Progreso: 1/240
ETA aproximada: 19:06:44
[--------------------------------------------------]

>>> Ingestando YELLOW 2016-02
[yellow 2016-02] BORRADOS anteriores: 11375412
[yellow 2016-02] Leyendo parquet: /home/jovyan/work/data/trip-data/yellow_tripdata_2016-02.parquet
[yellow 2016-02] INSERTADAS: 11375412 filas
yellow 2016-02: 11375412 filas
Tiempo: 0:04:58
Progreso: 2/240
ETA aproximada: 19:23:14
[--------------------------------------------------]

>>> Ingestando YELLOW 2016-03
[yellow 2016-03] BORRADOS anteriores: 12203824
[yellow 2016-03] Leyendo parquet: /home/jovyan/work/data/trip-data/yellow_tripdata_2016-03.parquet
[yellow 2016-03] INSERTADAS: 12203824 filas
yellow 2016-0

AnalysisException: Column cbd_congestion_fee not found in schema Some(StructType(StructField(VendorID,LongType,true),StructField(pickup_datetime,TimestampType,true),StructField(dropoff_datetime,TimestampType,true),StructField(passenger_count,LongType,true),StructField(trip_distance,DoubleType,true),StructField(RatecodeID,LongType,true),StructField(store_and_fwd_flag,StringType,true),StructField(PULocationID,LongType,true),StructField(DOLocationID,LongType,true),StructField(payment_type,LongType,true),StructField(fare_amount,DoubleType,true),StructField(extra,DoubleType,true),StructField(mta_tax,DoubleType,true),StructField(tip_amount,DoubleType,true),StructField(tolls_amount,DoubleType,true),StructField(improvement_surcharge,DoubleType,true),StructField(total_amount,DoubleType,true),StructField(congestion_surcharge,IntegerType,true),StructField(airport_fee,IntegerType,true),StructField(service_type,StringType,false),StructField(source_year,IntegerType,false),StructField(source_month,IntegerType,false),StructField(ingested_at_utc,TimestampType,false),StructField(run_id,StringType,false))).