In [1]:
from utils.snowflake_utils import get_spark_session, get_snowflake_options

spark = get_spark_session("ingesta_raw")
sf_options = get_snowflake_options(schema="RAW")


✓ Snowflake context activo: DB=SPARK_DATA, SCHEMA=SPARK_DATA.RAW, WH=spark_wh, ROLE=ACCOUNTADMIN


## 0) Configuración Spark consciente del contenedor

In [2]:
# --- Container-aware Spark Config (no intrusivo) ---
# Optimiza particiones, spill y memoria para el contenedor (driver=5g, executor=3g, mem_limit=10g)
import os
from pyspark.sql import SparkSession
spark = SparkSession.builder.getOrCreate()
spark.conf.set("spark.sql.adaptive.enabled", "true")
spark.conf.set("spark.sql.adaptive.coalescePartitions.enabled", "true")
spark.conf.set("spark.sql.adaptive.shuffle.targetPostShuffleInputSize", str(64*1024*1024))  # ~64MB
spark.conf.set("spark.sql.shuffle.partitions", os.getenv("SHUFFLE_PARTITIONS","1200"))
spark.conf.set("spark.sql.shuffle.spill.enabled", "true")
spark.conf.set("spark.sql.files.maxPartitionBytes", os.getenv("MAX_PART_BYTES", str(128*1024*1024)))  # 128MB
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", os.getenv("BROADCAST_THRESHOLD", str(64*1024*1024)))  # 64MB
spark.conf.set("spark.sql.files.ignoreCorruptFiles", "true")
print("[Spark] AQE:", spark.conf.get("spark.sql.adaptive.enabled"),
      "| shuffle.partitions:", spark.conf.get("spark.sql.shuffle.partitions"))

[Spark] AQE: true | shuffle.partitions: 1200


In [3]:
# --- SAFE MODE: estabilidad del kernel y límites de recursos ---
import os, time, gc
SAFE_MODE = os.getenv("SAFE_MODE", "true").lower() == "true"
PUT_PARALLEL = int(os.getenv("PUT_PARALLEL", "4" if SAFE_MODE else "8"))
SLEEP_BETWEEN = float(os.getenv("SLEEP_BETWEEN", "1" if SAFE_MODE else "0"))
MAX_RSS_MB = int(os.getenv("MAX_RSS_MB", "8192"))
MAX_FAILS_ALLOWED = int(os.getenv("MAX_FAILS_ALLOWED", "5"))
STREAM_SUMMARY = os.getenv("STREAM_SUMMARY","false").lower() == "true"

try:
    import psutil, os as _os
    _proc = psutil.Process(_os.getpid())
    def current_rss_mb(): 
        return int(_proc.memory_info().rss / (1024*1024))
except Exception:
    def current_rss_mb(): 
        return 0

def maybe_sleep():
    if SLEEP_BETWEEN > 0:
        time.sleep(SLEEP_BETWEEN)

print(f"[SAFE MODE] SAFE_MODE={SAFE_MODE} PUT_PARALLEL={PUT_PARALLEL} MAX_RSS_MB={MAX_RSS_MB} STREAM_SUMMARY={STREAM_SUMMARY}")

[SAFE MODE] SAFE_MODE=True PUT_PARALLEL=4 MAX_RSS_MB=8192 STREAM_SUMMARY=False


In [4]:
# --- Helpers: retry/backoff, transacciones y monitor de memoria ---
import time, functools, random

def retry(max_tries=3, base=0.5, factor=2.0, jitter=0.1, exceptions=(Exception,)):
    def deco(fn):
        @functools.wraps(fn)
        def wrapper(*args, **kwargs):
            delay = base
            for attempt in range(1, max_tries+1):
                try:
                    return fn(*args, **kwargs)
                except exceptions as e:
                    if attempt == max_tries:
                        raise
                    sleep = delay * (1 + random.uniform(-jitter, jitter))
                    print(f"   ↻ retry {attempt}/{max_tries-1} in {sleep:,.2f}s → {e}")
                    time.sleep(max(0.1, sleep))
                    delay *= factor
        return wrapper
    return deco

class snowflake_tx:
    def __init__(self, session):
        self.session = session
    def __enter__(self):
        try:
            self.session.sql("BEGIN").collect()
        except Exception as e:
            print("BEGIN falló:", e)
        return self.session
    def __exit__(self, exc_type, exc, tb):
        if exc_type:
            try:
                self.session.sql("ROLLBACK").collect()
            except Exception as e:
                print("ROLLBACK falló:", e)
        else:
            try:
                self.session.sql("COMMIT").collect()
            except Exception as e:
                print("COMMIT falló:", e)

def mem_guard(prefix=""):
    try:
        rss = current_rss_mb()
        print(f"{prefix} RSS={rss} MB")
        return rss
    except Exception:
        return 0

In [5]:
spark.conf.set("spark.sql.shuffle.partitions", max(200, spark.sparkContext.defaultParallelism * 3))
spark.conf.set("spark.default.parallelism",     spark.sparkContext.defaultParallelism * 2)
spark.conf.set("spark.sql.files.maxPartitionBytes", 134217728)  # 128 MB por split

spark.conf.set("spark.sql.adaptive.enabled",           "true")
spark.conf.set("spark.sql.adaptive.skewJoin.enabled",  "true")
spark.conf.set("spark.sql.autoBroadcastJoinThreshold", 104857600)  # 100 MB

spark.conf.set("spark.sql.parquet.compression.codec",  "snappy")


In [6]:
import os

# SKIP  -> si ya existe (service, year, month) en RAW, no escribe
# REPLACE -> si ya existe, borra ese (service, year, month) y escribe de nuevo
IDEMPOTENCY_MODE = os.environ.get("IDEMPOTENCY_MODE", "SKIP").upper()
assert IDEMPOTENCY_MODE in {"SKIP", "REPLACE"}, "IDEMPOTENCY_MODE debe ser SKIP o REPLACE"

print("IDEMPOTENCY_MODE =", IDEMPOTENCY_MODE)


IDEMPOTENCY_MODE = SKIP


In [37]:
from pyspark.sql import functions as F

def sf_count_partition(sf_options, table, service, year, month):
    """Cuenta filas existentes en RAW para (service, year, month)."""
    db = sf_options["sfDatabase"]; sc = sf_options["sfSchema"]
    q = f"""
    SELECT COUNT(*) AS CNT
    FROM {db}.{sc}.{table}
    WHERE service_type = '{service}'
      AND source_year  = {year}
      AND source_month = {month}
    """
    df_cnt = (spark.read.format("net.snowflake.spark.snowflake")
              .options(**sf_options)
              .option("query", q)
              .load())
    return int(df_cnt.collect()[0]["CNT"])

def sf_exec_sql(sf_options, sql):
    """Ejecuta un SQL DML/DDL (DELETE/MERGE/CREATE) en Snowflake vía JDBC Utils."""
    jvm = spark._jvm
    jmap = jvm.java.util.HashMap()
    for k, v in sf_options.items():
        jmap.put(k, v)
    jvm.net.snowflake.spark.snowflake.Utils.runQuery(jmap, sql)


In [38]:
# Evitar duplicados dentro del lote
NATURAL_KEY = [
    "pickup_datetime", "dropoff_datetime",   
    "PULocationID", "DOLocationID",
    "VendorID"
]

def drop_dupes_by_key(df, key_cols):
    present = [c for c in key_cols if c in df.columns]
    if not present:
        return df  
    return df.dropDuplicates(present)


In [138]:
import os, uuid, datetime, subprocess, sys
from pathlib import Path

# --- Parámetros---
SERVICE   = os.environ.get("DM_SERVICE", "green")      # "yellow" o "green"
YEAR      = os.environ.get("DM_YEAR", "2022")
months_str = os.environ.get("DM_MONTHS", "1,2,3")  # default a smoke test
MONTHS = [int(m.strip()) for m in months_str.split(",")]
RUN_ID    = os.environ.get("RUN_ID", str(uuid.uuid4()))
BASE_URL  = os.environ.get("TLC_BASE_URL", "https://d37ci6vzurychx.cloudfront.net/trip-data")
RAW_TABLE = os.environ.get("RAW_TABLE", "RAW_TLC_TRIPS_green")  # RAW_TLC_TRIPS_yellow

# Ruta de trabajo dentro del contenedor Jupyter
DATA_DIR = Path("/home/jovyan/work/data")  # respeta el volumen del docker-compose
EVID_DIR = Path("/home/jovyan/work/evidencias")
DATA_DIR.mkdir(parents=True, exist_ok=True)
EVID_DIR.mkdir(parents=True, exist_ok=True)

print("Parámetros:")
print(" service   =", SERVICE)
print(" year      =", YEAR)
print(" months    =", MONTHS)
print(" run_id    =", RUN_ID)
print(" base_url  =", BASE_URL)
print(" raw_table =", RAW_TABLE)
print(" data_dir  =", DATA_DIR)


Parámetros:
 service   = green
 year      = 2022
 months    = [1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12]
 run_id    = 984827f6-9d04-40cd-a60d-1ef0e61fca24
 base_url  = https://d37ci6vzurychx.cloudfront.net/trip-data
 raw_table = RAW_TLC_TRIPS_green
 data_dir  = /home/jovyan/work/data


In [139]:
def wget(url: str, out_dir: Path):
    # -nc (no clobber): no vuelve a descargar si ya existe
    # -P  : directorio de salida
    cmd = ["wget", "-nc", "-P", str(out_dir), url]
    print("→", " ".join(cmd))
    return subprocess.call(cmd)

for m in MONTHS:
    ym = f"{YEAR}-{m:02d}"
    file_name = f"{SERVICE}_tripdata_{ym}.parquet"
    url = f"{BASE_URL}/{file_name}"
    print(f"Descargando {file_name}")
    rc = wget(url, DATA_DIR)
    if rc not in (0, 8):  # 0 = OK, 8 = already exists (wget -nc)
        print(f"[WARN] wget devolvió código {rc} para {url}", file=sys.stderr)


Descargando green_tripdata_2022-01.parquet
→ wget -nc -P /home/jovyan/work/data https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2022-01.parquet
Descargando green_tripdata_2022-02.parquet
→ wget -nc -P /home/jovyan/work/data https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2022-02.parquet
Descargando green_tripdata_2022-03.parquet
→ wget -nc -P /home/jovyan/work/data https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2022-03.parquet
Descargando green_tripdata_2022-04.parquet
→ wget -nc -P /home/jovyan/work/data https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2022-04.parquet
Descargando green_tripdata_2022-05.parquet
→ wget -nc -P /home/jovyan/work/data https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2022-05.parquet
Descargando green_tripdata_2022-06.parquet
→ wget -nc -P /home/jovyan/work/data https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_2022-06.parquet
Descargando green_tripdata_2022-07.parquet
→ w

In [140]:
from pyspark.sql import functions as F
from pyspark.sql.functions import col
dfs = []
for m in MONTHS:
    ym = f"{YEAR}-{m:02d}"
    file_name = f"{SERVICE}_tripdata_{ym}.parquet"
    local_path = str(DATA_DIR / file_name)
    source_url = f"{BASE_URL}/{file_name}"

    print(f"\n=== Procesando {file_name} ===")

    # 4.1) Idempotencia: ¿ya existe ese particionado en RAW?
    try:
        existing = sf_count_partition(sf_options, RAW_TABLE, SERVICE, YEAR, m)
    except Exception as e:
        existing = 0
        print("[INFO] No pude contar en RAW (¿sin credenciales?). Continuo como si no existiera. Detalle:", e)

    if existing > 0 and IDEMPOTENCY_MODE == "SKIP":
        print(f"↪ SKIP: Ya hay {existing} filas en RAW para ({SERVICE}, {YEAR}-{m:02d}). Saltando este mes.")
        continue

    if existing > 0 and IDEMPOTENCY_MODE == "REPLACE":
        db = sf_options["sfDatabase"]; sc = sf_options["sfSchema"]
        sql_del = f"""
        DELETE FROM {db}.{sc}.{RAW_TABLE}
        WHERE service_type = '{SERVICE}' AND source_year = {YEAR} AND source_month = {m}
        """
        print(f"↪ REPLACE: Borrando partición existente en RAW ({SERVICE}, {YEAR}-{m:02d})…")
        try:
            sf_exec_sql(sf_options, sql_del)
            print("   ✓ DELETE completado")
        except Exception as e:
            print("   [WARN] No se pudo borrar. Detalle:", e)

    # 4.2) Lee local si existe; si no, directo por URL
    path_to_read = local_path if Path(local_path).exists() else source_url
    df = spark.read.parquet(path_to_read)

    # 4.3) Normaliza timestamps (y opcionalmente elimina columnas originales para evitar NTZ)
    pickup_col  = "tpep_pickup_datetime"  if SERVICE == "yellow" else "lpep_pickup_datetime"
    dropoff_col = "tpep_dropoff_datetime" if SERVICE == "yellow" else "lpep_dropoff_datetime"

    df_std = (
        df
        .withColumn("pickup_datetime",  F.col(pickup_col).cast("timestamp"))
        .withColumn("dropoff_datetime", F.col(dropoff_col).cast("timestamp"))
        .drop(pickup_col, dropoff_col)  # evita choques de tipos al escribir
    )

    # 4.4) Metadatos obligatorios
    df_enriched = (
        df_std
        .withColumn("run_id",          F.lit(RUN_ID))
        .withColumn("service_type",    F.lit(SERVICE))
        .withColumn("source_year",     F.lit(YEAR))
        .withColumn("source_month",    F.lit(m))
        .withColumn("ingested_at_utc", F.lit(datetime.datetime.utcnow().isoformat(timespec="seconds") + "Z"))
        .withColumn("source_path",     F.lit(source_url))
    )

    # 4.5) Deduplicación intra-lote por clave natural
    df_enriched = drop_dupes_by_key(df_enriched, NATURAL_KEY)

    # 4.6) Acción para Spark UI (conteo) y acumulación
    print("Conteo (acción → visible en Spark UI):", df_enriched.count())
    dfs.append(df_enriched)

# 4.7) Unión segura por nombre
if not dfs:
    print("No hubo nada que escribir (todos los meses estaban ya cargados o falló la carga).")
else:
    df_all = dfs[0]
    for extra in dfs[1:]:
        df_all = df_all.unionByName(extra, allowMissingColumns=True)

    print("\n=== Total filas combinadas:", df_all.count(), "===\n")
    df_all.printSchema()




=== Procesando green_tripdata_2022-01.parquet ===
↪ SKIP: Ya hay 62495 filas en RAW para (green, 2022-01). Saltando este mes.

=== Procesando green_tripdata_2022-02.parquet ===
↪ SKIP: Ya hay 69399 filas en RAW para (green, 2022-02). Saltando este mes.

=== Procesando green_tripdata_2022-03.parquet ===
↪ SKIP: Ya hay 78537 filas en RAW para (green, 2022-03). Saltando este mes.

=== Procesando green_tripdata_2022-04.parquet ===
Conteo (acción → visible en Spark UI): 75962

=== Procesando green_tripdata_2022-05.parquet ===
Conteo (acción → visible en Spark UI): 76725

=== Procesando green_tripdata_2022-06.parquet ===
Conteo (acción → visible en Spark UI): 73534

=== Procesando green_tripdata_2022-07.parquet ===
↪ SKIP: Ya hay 64038 filas en RAW para (green, 2022-07). Saltando este mes.

=== Procesando green_tripdata_2022-08.parquet ===
↪ SKIP: Ya hay 65776 filas en RAW para (green, 2022-08). Saltando este mes.

=== Procesando green_tripdata_2022-09.parquet ===
Conteo (acción → visible e

In [141]:
#Escritura a snowflake

In [142]:
try:
    # Reparticiona para distribuir carga en memoria
    df_all = df_all.repartition(16)  # puedes ajustar el número según recursos disponibles

    # Escritura a Snowflake
    (df_all.write
     .format("net.snowflake.spark.snowflake")
     .options(**sf_options)
     .option("dbtable", RAW_TABLE)
     .option("parallelism", os.getenv("SNOWFLAKE_PARALLELISM", "16"))  # mejora throughput
     # .option("onError", "CONTINUE")  # opcional para tolerancia a filas malas
     .mode("append")  # append es seguro por idempotencia aplicada previamente
     .save())

    print(f"✓ Escritura completada en RAW → {RAW_TABLE}")

except Exception as e:
    print("[INFO] Aún no escribimos en Snowflake (probable falta de credenciales, permisos o memoria).")
    print("Detalle:", e)

    # 🧠 Tip adicional de diagnóstico
    import traceback
    traceback.print_exc()


✓ Escritura completada en RAW → RAW_TLC_TRIPS_green


In [143]:
import csv, datetime
audit_csv = EVID_DIR / "audit_ingesta_raw.csv"
headers = ["run_id","service_type","source_year","source_months","rows","when_utc","raw_table","mode"]

if dfs:
    row = [RUN_ID, SERVICE, YEAR, ",".join(map(str, MONTHS)),
           df_all.count(),
           datetime.datetime.utcnow().isoformat(timespec="seconds")+"Z",
           RAW_TABLE, IDEMPOTENCY_MODE]
    write_headers = not audit_csv.exists()
    with open(audit_csv, "a", newline="", encoding="utf-8") as f:
        w = csv.writer(f)
        if write_headers: w.writerow(headers)
        w.writerow(row)
    print("✓ Auditoría actualizada →", str(audit_csv))


✓ Auditoría actualizada → /home/jovyan/work/evidencias/audit_ingesta_raw.csv
