In [6]:
import os
import argparse
import psycopg2
import pandas as pd
from dotenv import load_dotenv
from datetime import datetime
import time
from pyspark.sql import SparkSession
import requests
import tempfile

In [5]:
load_dotenv() 

PG_HOST = os.getenv("PG_HOST")
PG_PORT = os.getenv("PG_PORT")
PG_DB = os.getenv("PG_DB")
PG_USER = os.getenv("PG_USER")
PG_PASSWORD = os.getenv("PG_PASSWORD")
PG_SCHEMA = os.getenv("PG_SCHEMA_RAW")



In [7]:
# ingesta de datos de parquet a postgres

spark = SparkSession.builder \
    .appName("IngestaParquetPostgres") \
    .config("spark.jars.packages", "org.postgresql:postgresql:42.2.18") \
    .getOrCreate()

 

In [6]:
pgOptions = {
    "url": f"jdbc:postgresql://{os.getenv('PG_HOST', 'postgres-warehouse')}:{os.getenv('PG_PORT', '5432')}/{os.getenv('PG_DB', 'warehouse_db')}",
    "user": os.getenv("PG_USER", "postgres"),
    "password": os.getenv("PG_PASSWORD", "postgres"),
    "driver": "org.postgresql.Driver"
}

In [None]:

years = ['2015', '2016', '2017', '2018', '2019', '2020', '2021', '2022', '2023', '2024', '2025']
months = ['01', '02', '03', '04', '05', '06', '07', '08', '09', '10', '11', '12']
services = ['yellow', 'green']

for service in services:
    if service == 'yellow':
        ordered_cols = [
            "VENDORID", "TPEP_PICKUP_DATETIME", "TPEP_DROPOFF_DATETIME",
            "PASSENGER_COUNT", "TRIP_DISTANCE", "RATECODEID",
            "STORE_AND_FWD_FLAG", "PULOCATIONID", "DOLOCATIONID",
            "PAYMENT_TYPE", "FARE_AMOUNT", "EXTRA", "MTA_TAX",
            "TIP_AMOUNT", "TOLLS_AMOUNT", "IMPROVEMENT_SURCHARGE",
            "TOTAL_AMOUNT", "CONGESTION_SURCHARGE", "AIRPORT_FEE",
            "CBD_CONGESTION_FEE",
            "RUN_ID", "SERVICE_TYPE", "SOURCE_YEAR", "SOURCE_MONTH",
            "INGESTED_AT_UTC", "SOURCE_PATH"
        ]
    else:
        ordered_cols = [
            "VENDORID", "LPEP_PICKUP_DATETIME", "LPEP_DROPOFF_DATETIME",
            "STORE_AND_FWD_FLAG", "RATECODEID",
            "PULOCATIONID", "DOLOCATIONID", "PASSENGER_COUNT", "TRIP_DISTANCE",
            "FARE_AMOUNT", "EXTRA", "MTA_TAX", "TIP_AMOUNT", "TOLLS_AMOUNT",
            "EHAIL_FEE", "IMPROVEMENT_SURCHARGE", "TOTAL_AMOUNT",
            "PAYMENT_TYPE", "TRIP_TYPE", "CONGESTION_SURCHARGE",
            "CBD_CONGESTION_FEE",
            "RUN_ID", "SERVICE_TYPE", "SOURCE_YEAR", "SOURCE_MONTH",
            "INGESTED_AT_UTC", "SOURCE_PATH"
        ]   
    for year in years:
        if year == '2025':
            months = ['01', '02', '03', '04', '05', '06', '07', '08']
        for month in months:
            if service == 'yellow':
                timestamp_columns = ['TPEP_PICKUP_DATETIME', 'TPEP_DROPOFF_DATETIME']
            else:
                timestamp_columns = ['LPEP_PICKUP_DATETIME', 'LPEP_DROPOFF_DATETIME']
            for attempt in range(3):  # Intentar hasta 3 veces en caso de fallo
                try: 
                    url = f"https://d37ci6vzurychx.cloudfront.net/trip-data/{service}_tripdata_{year}-{month}.parquet"
                    # try to access the url, si no existe volver a intentar
                    for attempt in range(5):
                        response = requests.head(url)
                        if response.status_code != 200:
                            print(f"No existe: {url}")
                            continue
                        print(f"Procesando {service}: {year}-{month}")
                        break

                    # Descargar archivo temporal
                    tmp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".parquet")
                    tmp_path = tmp_file.name
                    tmp_file.close()
                    r = requests.get(url)
                    with open(tmp_path, "wb") as f:
                        f.write(r.content)

                    # Leer archivo parquet
                    df = spark.read.parquet(tmp_path)
                    print(f"Archivo {url} cargado en DataFrame")

                    # Crear run_id
                    run_id = f"run_{year}_{month}"

                    # Agregar metadatos
                    df = (
                        df.withColumn("RUN_ID", F.lit(run_id))
                        .withColumn("SERVICE_TYPE", F.lit(service))
                        .withColumn("SOURCE_YEAR", F.lit(int(year)))
                        .withColumn("SOURCE_MONTH", F.lit(int(month)))
                        .withColumn("INGESTED_AT_UTC", F.current_timestamp())
                        .withColumn("SOURCE_PATH", F.lit(url))
                    )

                    # Normalizar columnas
                    df = df.toDF(*[c.upper() for c in df.columns])

                    # Asegurar columnas timestamp
                    for col_name in timestamp_columns:
                        if col_name in df.columns:
                            df = df.withColumn(col_name, F.col(col_name).cast("timestamp"))

                    # Columna nueva si no existe
                    if "CBD_CONGESTION_FEE" not in df.columns:
                        df = df.withColumn("CBD_CONGESTION_FEE", F.lit(None).cast(T.DoubleType()))

                    table_name = f"{PG_SCHEMA}.{service.upper()}_TRIPS"


                    # Reordenar columnas
                    df = df.select([c for c in ordered_cols if c in df.columns])

                    # indempotencia, verificar si el run_id ya existe en la tabla con sparksql
                    query_check = f"SELECT COUNT(1) FROM {table_name} WHERE RUN_ID = {run_id}"
                    existing_count = spark.read \
                        .format("jdbc") \
                        .options(**pgOptions) \
                        .option("dbtable", f"({query_check}) AS check_table") \
                        .load() \
                        .collect()[0][0]
                    if existing_count > 0:
                        print(f"Datos para {service} {year}-{month} ya existen en la tabla. Saltando...")
                        os.remove(tmp_path)
                        break  # Salir del bucle de reintentos si ya existe
            
                    df.write \
                        .format("jdbc") \
                        .options(**pgOptions) \
                        .option("dbtable", table_name) \
                        .mode("append") \
                        .save()

                    print(f"Datos de {service} para {year}-{month} cargados en PostgreSQL")

                    os.remove(tmp_path)
                    break  # Salir del bucle de reintentos si tiene éxito
                except Exception as e:
                    print(f"Error al procesar {service} {year}-{month}: {e}")
                    if attempt < 2:
                        print("Reintentando...")
                        time.sleep(10 * (attempt + 1))  # Esperar antes de reintentar
                    else:
                        print("Máximo de intentos alcanzado. Pasando al siguiente.")
                        os.remove(tmp_path)
                        continue    


Procesando yellow: 2015-01
Archivo https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2015-01.parquet cargado en DataFrame
Datos de yellow para 2015-01 cargados en PostgreSQL
Procesando yellow: 2015-02
Archivo https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2015-02.parquet cargado en DataFrame
Datos de yellow para 2015-02 cargados en PostgreSQL
Procesando yellow: 2015-03
Archivo https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2015-03.parquet cargado en DataFrame
Datos de yellow para 2015-03 cargados en PostgreSQL
Procesando yellow: 2015-04
Archivo https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2015-04.parquet cargado en DataFrame
Datos de yellow para 2015-04 cargados en PostgreSQL
Procesando yellow: 2015-05
Archivo https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_2015-05.parquet cargado en DataFrame
Datos de yellow para 2015-05 cargados en PostgreSQL
Procesando yellow: 2015-06
Archivo https://d37ci6vzurychx.cl

### Carga de taxi_zone

In [11]:
# cargar taxi zones
url_zones = "https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv"
# Descargar archivo temporalmente
tmp_file = tempfile.NamedTemporaryFile(delete=False, suffix=".csv")
tmp_path = tmp_file.name
tmp_file.close()
r = requests.get(url_zones)
with open(tmp_path, "wb") as f:
    f.write(r.content)
df_zones = spark.read.csv(tmp_path, header=True, inferSchema=True)
print(f"Archivo {url_zones} cargado en DataFrame")


df_zones.write \
    .format("jdbc") \
    .options(**pgOptions) \
    .option("dbtable", "RAW.TAXI_ZONES") \
    .mode("overwrite") \
    .save()

print("Datos de taxi zones cargados en PostgreSQL")
os.remove(tmp_path)


Archivo https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv cargado en DataFrame
Datos de taxi zones cargados en PostgreSQL
