variables de entorno

In [1]:
import os
from pyspark.sql import SparkSession

# iniciar SparkSession
spark = SparkSession.builder.appName("SnowflakeConnectionTest").getOrCreate()

# opciones Snowflake desde .env
sfOptions = {
    "sfURL": os.getenv("SNOWFLAKE_HOST"),      # sin :443
    "sfPort": os.getenv("SNOWFLAKE_PORT", "443"),
    "sfDatabase": os.getenv("SNOWFLAKE_DATABASE"),
    "sfSchema": os.getenv("SNOWFLAKE_SCHEMA_RAW"),
    "sfWarehouse": os.getenv("SNOWFLAKE_WAREHOUSE"),
    "sfRole": os.getenv("SNOWFLAKE_ROLE"),
    "sfUser": os.getenv("SNOWFLAKE_USER"),
    "sfPassword": os.getenv("SNOWFLAKE_PASSWORD"),
}

# consulta de prueba (solo lectura)
probe = (
    spark.read
    .format("snowflake")
    .options(**sfOptions)
    .option(
        "query",
        "select current_account() as account, current_region() as region, current_role() as role, current_database() as db, current_schema() as sch",
    )
    .load()
)

probe.show(truncate=False)


+--------+-------------+--------+--------+---+
|ACCOUNT |REGION       |ROLE    |DB      |SCH|
+--------+-------------+--------+--------+---+
|XPC24435|AWS_US_EAST_1|SYSADMIN|DM_PSET3|RAW|
+--------+-------------+--------+--------+---+



In [2]:
from pyspark.sql.types import StructType, StructField, StringType, TimestampType

data = [("hello_snowflake", "probe", None)]

schema = StructType([
    StructField("msg", StringType(), nullable=False),
    StructField("run_id", StringType(), nullable=False),
    StructField("ts_current", TimestampType(), nullable=True),
])

df_probe = spark.createDataFrame(data, schema=schema)

df_probe.printSchema()
df_probe.show(truncate=False)


root
 |-- msg: string (nullable = false)
 |-- run_id: string (nullable = false)
 |-- ts_current: timestamp (nullable = true)

+---------------+------+----------+
|msg            |run_id|ts_current|
+---------------+------+----------+
|hello_snowflake|probe |NULL      |
+---------------+------+----------+



In [3]:
target_table = "RAW._CONNECT_PROBE"

# Escribir
(
    df_probe.write
    .format("snowflake")
    .options(**sfOptions)
    .option("dbtable", target_table)
    .mode("overwrite")
    .save()
)

# Leer de vuelta
df_back = (
    spark.read
    .format("snowflake")
    .options(**sfOptions)
    .option("dbtable", target_table)
    .load()
)

print("Filas leídas:", df_back.count())
df_back.show(truncate=False)


Filas leídas: 1
+---------------+------+----------+
|MSG            |RUN_ID|TS_CURRENT|
+---------------+------+----------+
|hello_snowflake|probe |NULL      |
+---------------+------+----------+



In [4]:
import os, pathlib, urllib.request
from pyspark.sql import SparkSession

spark = SparkSession.builder.getOrCreate()

# 1) Preparar carpeta local
local_dir = "/home/jovyan/work/datasets/trip-data"
pathlib.Path(local_dir).mkdir(parents=True, exist_ok=True)

# 2) Descarga de un archivo de prueba (enero 2019, yellow)
remote = f"{os.getenv('DATA_BASE_URL')}/yellow_tripdata_2019-01.parquet"
local  = f"{local_dir}/yellow_tripdata_2019-01.parquet"
if not os.path.exists(local):
    print("Descargando:", remote)
    urllib.request.urlretrieve(remote, local)

# 3) Leer el Parquet desde el disco local (seekable)
print("Leyendo local:", local)
df_probe_read = spark.read.parquet(local)
df_probe_read.printSchema()
df_probe_read.show(5, truncate=False)


Leyendo local: /home/jovyan/work/datasets/trip-data/yellow_tripdata_2019-01.parquet
root
 |-- VendorID: long (nullable = true)
 |-- tpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- tpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- passenger_count: double (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- RatecodeID: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: integer (nullable = true)

+--------+--------------------+--------------

In [6]:
from pyspark.sql.functions import col, lit
from pyspark.sql.types import TimestampType
from datetime import datetime, timezone
import os

# 1) Normalizar columnas de tiempo a Spark `timestamp`
df_fixed = (
    df_probe_read
    .withColumn("tpep_pickup_datetime",  col("tpep_pickup_datetime").cast(TimestampType()))
    .withColumn("tpep_dropoff_datetime", col("tpep_dropoff_datetime").cast(TimestampType()))
)

# 2) Metadatos mínimos (lineage)
run_id = os.getenv("RUN_ID") or f"manual_{datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ')}"
df_raw = (
    df_fixed
    .withColumn("run_id",          lit(run_id))
    .withColumn("service_type",    lit("yellow"))
    .withColumn("source_year",     lit("2019"))
    .withColumn("source_month",    lit("01"))
    .withColumn("ingested_at_utc", lit(datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")))
)

# 3) Escribir en RAW y verificar
target_table = "RAW.TRIPS_YELLOW_2019_01"

(
    df_raw.write
    .format("snowflake")
    .options(**sfOptions)
    .option("dbtable", target_table)
    .mode("overwrite")
    .save()
)

df_back = (
    spark.read
    .format("snowflake")
    .options(**sfOptions)
    .option("dbtable", target_table)
    .load()
)

print("Filas en RAW:", df_back.count())
df_back.select("run_id","service_type","source_year","source_month","ingested_at_utc").show(5, truncate=False)


Filas en RAW: 7696617
+-----------------------+------------+-----------+------------+--------------------+
|run_id                 |service_type|source_year|source_month|ingested_at_utc     |
+-----------------------+------------+-----------+------------+--------------------+
|manual_20251016T021100Z|yellow      |2019       |01          |2025-10-16T02:11:00Z|
|manual_20251016T021100Z|yellow      |2019       |01          |2025-10-16T02:11:00Z|
|manual_20251016T021100Z|yellow      |2019       |01          |2025-10-16T02:11:00Z|
|manual_20251016T021100Z|yellow      |2019       |01          |2025-10-16T02:11:00Z|
|manual_20251016T021100Z|yellow      |2019       |01          |2025-10-16T02:11:00Z|
+-----------------------+------------+-----------+------------+--------------------+
only showing top 5 rows



In [14]:
from pyspark.sql.functions import col, dayofmonth
from pyspark.sql.types import TimestampType
from datetime import datetime, timezone
import os

target_table = "RAW.TRIPS_GREEN_2019_01"

# 0) Preparar DF base (con timestamps normalizados y metadatos fijos del mes)
run_id = os.getenv("RUN_ID") or f"manual_{datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ')}"
df_base = (
    spark.read.parquet("/home/jovyan/work/datasets/trip-data/green_tripdata_2019-01.parquet")
    .withColumn("lpep_pickup_datetime",  col("lpep_pickup_datetime").cast(TimestampType()))
    .withColumn("lpep_dropoff_datetime", col("lpep_dropoff_datetime").cast(TimestampType()))
    .withColumn("run_id",          lit(run_id))
    .withColumn("service_type",    lit("green"))
    .withColumn("source_year",     lit("2019"))
    .withColumn("source_month",    lit("01"))
    .withColumn("ingested_at_utc", lit(datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")))
)

# 1) Borrar/crear la tabla del mes (dejarla vacía) para asegurar un estado limpio
#    - hacemos overwrite con 0 filas: limit(0) -> define el schema en Snowflake sin datos
(
    df_base.limit(0)
    .write.format("snowflake")
    .options(**sfOptions)
    .option("dbtable", target_table)
    .mode("overwrite")
    .save()
)

# 2) Escribir en micro-lotes por día (append), con paralelismo conservador
dias = list(range(1, 32))  # 1..31
total_insertadas = 0

for d in dias:
    df_day = (
        df_base
        .filter(dayofmonth(col("lpep_pickup_datetime")) == d)
        .coalesce(2)                       # pocos archivos por día
    )
    cnt = df_day.count()
    if cnt == 0:
        continue

    (
        df_day.write.format("snowflake")
        .options(**sfOptions)
        .option("dbtable", target_table)
        .option("parallelism", "2")        # pocas conexiones
        .option("usestagingtable", "off")  # reduce pasos adicionales
        .mode("append")
        .save()
    )

    total_insertadas += cnt
    print(f"Dia {d:02d} -> inserted: {cnt}")

# 3) Verificar conteo final en Snowflake
df_back = (
    spark.read.format("snowflake")
    .options(**sfOptions)
    .option("dbtable", target_table)
    .load()
)
print("Tabla RAW destino:", target_table)
print("Filas en RAW (GREEN 2019-01):", df_back.count())
print("Total insertadas por loop:", total_insertadas)


Dia 01 -> inserted: 15469
Dia 02 -> inserted: 19900
Dia 03 -> inserted: 21931
Dia 04 -> inserted: 23133
Dia 05 -> inserted: 20832
Dia 06 -> inserted: 18745
Dia 07 -> inserted: 21368
Dia 08 -> inserted: 21082
Dia 09 -> inserted: 22977
Dia 10 -> inserted: 24703
Dia 11 -> inserted: 25666
Dia 12 -> inserted: 21973
Dia 13 -> inserted: 18507
Dia 14 -> inserted: 21534
Dia 15 -> inserted: 22453
Dia 16 -> inserted: 23512
Dia 17 -> inserted: 24640
Dia 18 -> inserted: 23966
Dia 19 -> inserted: 21318
Dia 20 -> inserted: 16122
Dia 21 -> inserted: 14341
Dia 22 -> inserted: 21585
Dia 23 -> inserted: 23074
Dia 24 -> inserted: 23776
Dia 25 -> inserted: 26062
Dia 26 -> inserted: 23570
Dia 27 -> inserted: 19355
Dia 28 -> inserted: 21998
Dia 29 -> inserted: 23165
Dia 30 -> inserted: 22780
Dia 31 -> inserted: 22568
Tabla RAW destino: RAW.TRIPS_GREEN_2019_01
Filas en RAW (GREEN 2019-01): 672105
Total insertadas por loop: 672105


In [25]:
from pyspark.sql.functions import col, lit
from pyspark.sql.types import TimestampType
from datetime import datetime, timezone, timedelta
import os, time

svc = "yellow"
YEAR, mm = "2019", "01"
pcol = "tpep_pickup_datetime"
target_table = f"RAW.TRIPS_{svc.upper()}_{YEAR}_{mm}"

# 0) Leer parquet local y normalizar
df_month = spark.read.parquet(f"/home/jovyan/work/datasets/trip-data/yellow_tripdata_{YEAR}-{mm}.parquet") \
    .withColumn("tpep_pickup_datetime",  col("tpep_pickup_datetime").cast(TimestampType())) \
    .withColumn("tpep_dropoff_datetime", col("tpep_dropoff_datetime").cast(TimestampType()))

expected = df_month.count()
print("Esperadas (parquet):", expected)

# 1) Metadatos consistentes (incluye source_path)
run_id = os.getenv("RUN_ID") or f"manual_{datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ')}"
remote_month_path = f"{os.getenv('DATA_BASE_URL')}/yellow_tripdata_{YEAR}-{mm}.parquet"

df_base = df_month \
    .withColumn("run_id",          lit(run_id)) \
    .withColumn("service_type",    lit(svc)) \
    .withColumn("source_year",     lit(YEAR)) \
    .withColumn("source_month",    lit(mm)) \
    .withColumn("source_path",     lit(remote_month_path)) \
    .withColumn("ingested_at_utc", lit(datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")))

# 2) TRUNCATE al inicio para empezar de cero (y fijar schema)
(
    df_base.limit(0).write.format("snowflake")
    .options(**sfOptions)
    .option("dbtable", target_table)
    .option("preactions", f"TRUNCATE TABLE {target_table}")
    .mode("overwrite")
    .save()
)

def write_day_range(df_day, lo, hi, tries=3):
    """Borra [lo,hi) en Snowflake y escribe ese rango; ambos rangos en NTZ."""
    pre_sql = (
        f"DELETE FROM {target_table} "
        f"WHERE {pcol} >= TO_TIMESTAMP_NTZ('{lo}') "
        f"  AND {pcol} <  TO_TIMESTAMP_NTZ('{hi}')"
    )
    last = None
    for a in range(1, tries+1):
        try:
            (df_day.write.format("snowflake")
             .options(**sfOptions)
             .option("dbtable", target_table)
             .option("parallelism", "1")
             .option("usestagingtable", "off")
             .option("support_share_connection", "false")
             .option("preactions", pre_sql)
             .mode("append")
             .save())
            return
        except Exception as e:
            last = e
            print(f"[WARN] intento {a}/{tries} falló (rango {lo}..{hi}): {e}")
            time.sleep(5)
    raise last

# 3) Cargar por rangos exactos día a día (Spark también filtra por [lo,hi))
inserted_total = 0
for d in range(1, 32):
    day_dt  = datetime.strptime(f"{YEAR}-{mm}-{d:02d} 00:00:00", "%Y-%m-%d %H:%M:%S")
    next_dt = day_dt + timedelta(days=1)
    lo = day_dt.strftime("%Y-%m-%d %H:%M:%S")
    hi = next_dt.strftime("%Y-%m-%d %H:%M:%S")

    part = df_base.filter((col(pcol) >= lo) & (col(pcol) < hi)).coalesce(1)
    cnt = part.count()
    if cnt == 0:
        continue

    write_day_range(part, lo, hi)
    inserted_total += cnt
    print(f"[{svc} {YEAR}-{mm}] {lo}..{hi} -> inserted: {cnt}")

# 4) Verificación final
df_back = (spark.read.format("snowflake").options(**sfOptions).option("dbtable", target_table).load())
final_cnt = df_back.count()
print("Tabla RAW destino:", target_table)
print("Filas en RAW (YELLOW 2019-01):", final_cnt)
print("Esperadas (parquet):", expected)
print("Insertadas por loop:", inserted_total)


Esperadas (parquet): 7696617
[yellow 2019-01] 2019-01-01 00:00:00..2019-01-02 00:00:00 -> inserted: 189432
[yellow 2019-01] 2019-01-02 00:00:00..2019-01-03 00:00:00 -> inserted: 198737
[WARN] intento 1/3 falló (rango 2019-01-03 00:00:00..2019-01-04 00:00:00): An error occurred while calling o5230.save.
: net.snowflake.client.jdbc.SnowflakeSQLLoggedException: !-1!
	at net.snowflake.client.core.SFSession.getQueryMetadata(SFSession.java:246)
	at net.snowflake.client.core.SFSession.getQueryStatus(SFSession.java:305)
	at net.snowflake.client.jdbc.SFAsyncResultSet.getStatus(SFAsyncResultSet.java:107)
	at net.snowflake.client.jdbc.SFAsyncResultSet.getRealResults(SFAsyncResultSet.java:183)
	at net.snowflake.client.jdbc.SFAsyncResultSet.getMetaData(SFAsyncResultSet.java:298)
	at net.snowflake.spark.snowflake.io.StageWriter$.executeCopyIntoTable(StageWriter.scala:575)
	at net.snowflake.spark.snowflake.io.StageWriter$.writeToTableWithStagingTable(StageWriter.scala:452)
	at net.snowflake.spark.sno

In [28]:
# === Backfill robusto 2015–2025 (yellow+green) RAW.TRIPS_<SVC>_<YYYY>_<MM> ===
import calendar
import os, pathlib, urllib.request, time
from datetime import datetime, timezone, timedelta
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit
from pyspark.sql.types import TimestampType

spark = SparkSession.builder.getOrCreate()

BASE_URL = os.getenv("DATA_BASE_URL")
assert BASE_URL, "Falta DATA_BASE_URL en .env"

LOCAL_DIR = "/home/jovyan/work/datasets/trip-data"
pathlib.Path(LOCAL_DIR).mkdir(parents=True, exist_ok=True)

# Parse env ranges
def parse_years(s):
    if "-" in s:
        a, b = s.split("-", 1)
        return [str(y) for y in range(int(a), int(b) + 1)]
    return [x.strip() for x in s.split(",") if x.strip()]

def parse_csv(s):
    return [x.strip() for x in s.split(",") if x.strip()]

SERVICES = parse_csv(os.getenv("SERVICES") or "yellow,green")
YEARS    = parse_years(os.getenv("YEARS") or "2019-2019")
MONTHS   = parse_csv(os.getenv("MONTHS") or "01,02,03,04,05,06,07,08,09,10,11,12")

def pickup_col(svc: str) -> str:
    return "tpep_pickup_datetime" if svc == "yellow" else "lpep_pickup_datetime"

def dropoff_col(svc: str) -> str:
    return "tpep_dropoff_datetime" if svc == "yellow" else "lpep_dropoff_datetime"

def filename(svc: str, y: str, m: str) -> str:
    return f"{svc}_tripdata_{y}-{m}.parquet"

def ensure_local(remote_url: str, local_path: str, tries=3):
    if os.path.exists(local_path):
        return
    last = None
    for a in range(1, tries+1):
        try:
            print(f"Descargando: {remote_url}")
            urllib.request.urlretrieve(remote_url, local_path)
            return
        except Exception as e:
            last = e
            print(f"[WARN] descarga intento {a}/{tries} falló: {e}")
            time.sleep(3)
    raise last

def write_with_retry(df_day, target_table, pre_sql, tries=3):
    """DELETE previo + append; reintenta hasta 3 veces."""
    last = None
    for a in range(1, tries+1):
        try:
            (df_day.write.format("snowflake")
             .options(**sfOptions)
             .option("dbtable", target_table)
             .option("parallelism", "1")
             .option("usestagingtable", "off")
             .option("support_share_connection", "false")
             .option("preactions", pre_sql)
             .mode("append")
             .save())
            return
        except Exception as e:
            last = e
            print(f"[WARN] write intento {a}/{tries} falló: {e}")
            time.sleep(5)
    raise last
def sf_count_rows(table: str, tries: int = 5, wait_s: int = 5) -> int:
    """
    Cuenta filas con SELECT COUNT(*) ... usando conexión no compartida y reintentos.
    Evita el path de metadata que dispara el 'response is null'.
    """
    last = None
    for a in range(1, tries + 1):
        try:
            df = (spark.read.format("snowflake")
                  .options(**sfOptions)
                  .option("support_share_connection", "false")
                  .option("query", f"SELECT COUNT(*) AS C FROM {table}")
                  .load())
            return int(df.collect()[0]["C"])
        except Exception as e:
            last = e
            print(f"[WARN] count intento {a}/{tries} falló: {e}")
            time.sleep(wait_s)
    raise last

def sf_try_count_rows(table: str, tries: int = 5, wait_s: int = 5):
    """
    Versión leniente: devuelve (ok, count_or_None). No detiene el backfill si la verificación falla.
    """
    try:
        c = sf_count_rows(table, tries=tries, wait_s=wait_s)
        return True, c
    except Exception as e:
        print(f"[WARN] verificación final omitida por error persistente: {e}")
        return False, None
        
run_id_global = os.getenv("RUN_ID") or f"manual_{datetime.now(timezone.utc).strftime('%Y%m%dT%H%M%SZ')}"

coverage = []  # (svc, y, m, expected, final_cnt, inserted_total, nulls_cnt)

for svc in SERVICES:
    pcol = pickup_col(svc)
    dcol = dropoff_col(svc)

    for y in YEARS:
        for m in MONTHS:
            remote = f"{BASE_URL}/{filename(svc, y, m)}"
            local  = f"{LOCAL_DIR}/{filename(svc, y, m)}"
            target_table = f"RAW.TRIPS_{svc.upper()}_{y}_{m}"

            # 0) Descarga (si falta)
            try:
                ensure_local(remote, local)
            except Exception as e:
                print(f"[SKIP] No se pudo obtener {remote}: {e}")
                continue

            # 1) Leer y normalizar timestamps
            df = spark.read.parquet(local)
            if pcol not in df.columns or dcol not in df.columns:
                print(f"[SKIP] {svc} {y}-{m} sin columnas {pcol}/{dcol}")
                continue

            df = df.withColumn(pcol, col(pcol).cast(TimestampType())) \
                   .withColumn(dcol, col(dcol).cast(TimestampType()))

            expected = df.count()

            # 2) Metadatos consistentes
            run_id = run_id_global  # mismo run_id para todo el backfill
            df_base = (df
                .withColumn("run_id",          lit(run_id))
                .withColumn("service_type",    lit(svc))
                .withColumn("source_year",     lit(y))
                .withColumn("source_month",    lit(m))
                .withColumn("source_path",     lit(remote))
                .withColumn("ingested_at_utc", lit(datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ")))
            )

            # 3) Asegurar que la tabla existe (schema) con operación ligera
            try:
                (df_base.limit(0).write.format("snowflake")
                 .options(**sfOptions)
                 .option("dbtable", target_table)
                 .option("usestagingtable", "off")
                 .option("support_share_connection", "false")
                 .mode("append")
                 .save())
            except Exception as e:
                print(f"[WARN] ensure table {target_table}: {e}")

            # 4) TRUNCATE del mes al inicio (para idempotencia total del mes)
            try:
                (df_base.limit(0).write.format("snowflake")
                 .options(**sfOptions)
                 .option("dbtable", target_table)
                 .option("preactions", f"TRUNCATE TABLE {target_table}")
                 .option("usestagingtable", "off")
                 .option("support_share_connection", "false")
                 .mode("overwrite")
                 .save())
            except Exception as e:
                print(f"[WARN] truncate {target_table}: {e}")

            # 5) Carga por día con DELETE [lo,hi) antes de cada intento
            inserted_total = 0
            last_day = calendar.monthrange(int(y), int(m))[1]  # 28/29/30/31 según el mes/año
            for d in range(1, last_day + 1):
                day_dt  = datetime.strptime(f"{y}-{m}-{d:02d} 00:00:00", "%Y-%m-%d %H:%M:%S")
                next_dt = day_dt + timedelta(days=1)
                lo = day_dt.strftime("%Y-%m-%d %H:%M:%S")
                hi = next_dt.strftime("%Y-%m-%d %H:%M:%S")

                part = df_base.filter((col(pcol) >= lo) & (col(pcol) < hi)).coalesce(1)
                cnt  = part.count()
                if cnt == 0:
                    continue

                pre_sql = (
                    f"DELETE FROM {target_table} "
                    f"WHERE {pcol} >= TO_TIMESTAMP_NTZ('{lo}') "
                    f"  AND {pcol} <  TO_TIMESTAMP_NTZ('{hi}')"
                )
                write_with_retry(part, target_table, pre_sql, tries=3)
                inserted_total += cnt
                print(f"[{svc} {y}-{m}] {lo}..{hi} -> inserted: {cnt}")

            # 6) Bucket NULL: filas sin pickup (idempotente con DELETE IS NULL)
            df_nulls = df_base.filter(col(pcol).isNull()).coalesce(1)
            cnt_nulls = df_nulls.count()
            if cnt_nulls > 0:
                pre_sql_null = (
                    f"DELETE FROM {target_table} "
                    f"WHERE {pcol} IS NULL AND source_year='{y}' AND source_month='{m}'"
                )
                write_with_retry(df_nulls, target_table, pre_sql_null, tries=3)
                print(f"[{svc} {y}-{m}] NULL bucket -> inserted: {cnt_nulls}")

           # 7) Verificación final del mes (robusta con reintentos)
            ok_count, final_cnt = sf_try_count_rows(target_table, tries=5, wait_s=5)
            
            if ok_count:
                ok = "OK" if final_cnt == expected else f"MISMATCH ({final_cnt} vs {expected})"
            else:
                final_cnt = -1
                ok = f"MISMATCH (count_failed vs expected={expected})"
            
            coverage.append((svc, y, m, expected, final_cnt, inserted_total, cnt_nulls, ok))
            print(f"\n=== RESUMEN {svc.upper()} {y}-{m} ===")
            print(f"Tabla RAW destino: {target_table}")
            print(f"Esperadas (parquet): {expected}")
            print(f"Insertadas por loop (sin NULL): {inserted_total}")
            print(f"Insertadas NULL: {cnt_nulls}")
            print(f"Filas en RAW: {final_cnt if final_cnt>=0 else 'COUNT_FAILED'}  --> {ok}\n")

# 8) Resumen global
print("\n================= COBERTURA GLOBAL =================")
for row in coverage:
    svc, y, m, expc, finalc, ins, nulls, ok = row
    print(f"{svc:6s} {y}-{m}: final={finalc} expected={expc} inserted={ins} nulls={nulls} -> {ok}")
print("====================================================")


[yellow 2015-01] 2015-01-01 00:00:00..2015-01-02 00:00:00 -> inserted: 382014
[WARN] write intento 1/3 falló: An error occurred while calling o18187.save.
: net.snowflake.client.jdbc.SnowflakeSQLLoggedException: !-1!
	at net.snowflake.client.core.SFSession.getQueryMetadata(SFSession.java:246)
	at net.snowflake.client.core.SFSession.getQueryStatus(SFSession.java:305)
	at net.snowflake.client.jdbc.SFAsyncResultSet.getStatus(SFAsyncResultSet.java:107)
	at net.snowflake.client.jdbc.SFAsyncResultSet.getRealResults(SFAsyncResultSet.java:183)
	at net.snowflake.client.jdbc.SFAsyncResultSet.getMetaData(SFAsyncResultSet.java:298)
	at net.snowflake.spark.snowflake.io.StageWriter$.executeCopyIntoTable(StageWriter.scala:575)
	at net.snowflake.spark.snowflake.io.StageWriter$.writeToTableWithStagingTable(StageWriter.scala:452)
	at net.snowflake.spark.snowflake.io.StageWriter$.writeToTable(StageWriter.scala:288)
	at net.snowflake.spark.snowflake.io.StageWriter$.writeToStage(StageWriter.scala:233)
	at 

ERROR:root:KeyboardInterrupt while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 511, in send_command
    answer = smart_decode(self.stream.readline()[:-1])
                          ^^^^^^^^^^^^^^^^^^^^^^
  File "/opt/conda/lib/python3.11/socket.py", line 706, in readinto
    return self._sock.recv_into(b)
           ^^^^^^^^^^^^^^^^^^^^^^^
KeyboardInterrupt


KeyboardInterrupt: 