In [1]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("IngestNYCTaxiData") \
    .config("spark.jars", "/home/jovyan/jars/snowflake-jdbc-3.13.30.jar,/home/jovyan/jars/spark-snowflake_2.12-2.12.0-spark_3.4.jar") \
    .getOrCreate()
#en la sesi√≥n de Spark se incluye los archivos jar

In [2]:
#obtener credenciales y guardarlas
import os

sfOptions = {
    "sfURL": f"https://{os.getenv('SF_ACCOUNT')}.snowflakecomputing.com",
    "sfUser": os.getenv("SF_USER"),
    "sfPassword": os.getenv("SF_PASSWORD"),
    "sfWarehouse": os.getenv("SF_WAREHOUSE"),
    "sfDatabase": os.getenv("SF_DATABASE"),
    "sfSchema": os.getenv("SF_SCHEMA"),
    "sfRole": os.getenv("SF_ROLE")
}

In [4]:
#crear un dataframe para leer la tabla de NYC_YELLOW_TAXIS en Snowflake
df = spark.read \
    .format("snowflake") \
    .options(**sfOptions) \
    .option("dbtable", "RAW.NYC_YELLOW_TAXIS") \
    .load()

df.show(5)

+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+------------------+----------------+------------+-----------+------------+--------------------+--------------------+
|VENDORID|TPEP_PICKUP_DATETIME|TPEP_DROPOFF_DATETIME|PASSENGER_COUNT|TRIP_DISTANCE|RATECODEID|STORE_AND_FWD_FLAG|PULOCATIONID|DOLOCATIONID|PAYMENT_TYPE|FARE_AMOUNT|EXTRA|MTA_TAX|TIP_AMOUNT|TOLLS_AMOUNT|IMPROVEMENT_SURCHARGE|TOTAL_AMOUNT|CONGESTION_SURCHARGE|AIRPORT_FEE|CBD_CONGESTION_FEE|          RUN_ID|SERVICE_TYPE|SOURCE_YEAR|SOURCE_MONTH|     INGESTED_AT_UTC|         SOURCE_PATH|
+--------+--------------------+---------------------+---------------+-------------+----------+------------------+------------+------------+------------+-----------+-----+-------+----------+------------+---------------------+--

In [5]:
# =======================
# Ingesta Yellow Taxi ‚Üí Snowflake por chunks (JDBC puro)
# =======================
from pyspark.sql import DataFrame
from pyspark.sql.functions import (
    col, lit, current_timestamp, row_number, monotonically_increasing_id
)
from pyspark.sql.window import Window
import math

# =======================
# Configuraci√≥n
# =======================
SERVICE_TYPE = "YELLOW"
CHUNK_SIZE = 1_000_000  # tama√±o del chunk

# =======================
# Columnas destino Snowflake
# (RAW.NYC_YELLOW_TAXIS ‚Üí seg√∫n DESC TABLE que compartiste)
# =======================
SNOWFLAKE_FINAL_COLS = [
    "VendorID", "tpep_pickup_datetime", "tpep_dropoff_datetime",
    "passenger_count", "trip_distance", "RatecodeID", "store_and_fwd_flag",
    "PULocationID", "DOLocationID", "payment_type", "fare_amount", "extra",
    "mta_tax", "tip_amount", "tolls_amount", "improvement_surcharge",
    "total_amount", "congestion_surcharge", "airport_fee", "cbd_congestion_fee",
    "RUN_ID", "SERVICE_TYPE", "SOURCE_YEAR", "SOURCE_MONTH",
    "INGESTED_AT_UTC", "SOURCE_PATH"
]

# Auditor√≠a (RAW.INGEST_AUDIT)
AUDIT_COLS_ORDER = [
    "RUN_ID", "SERVICE_TYPE", "SOURCE_YEAR", "SOURCE_MONTH",
    "INGESTED_AT_UTC", "SOURCE_PATH", "ROW_COUNT"
]

# =======================
# Utilidades de esquema
# =======================
def _ensure_column_present(df: DataFrame, candidates, target, spark_type) -> DataFrame:
    """
    Si existe alguna columna en 'candidates', la renombra a 'target'.
    Si no existe, crea 'target' como NULL del tipo 'spark_type'.
    """
    existing = set(df.columns)
    for c in candidates:
        if c in existing:
            if c != target:
                return df.withColumnRenamed(c, target)
            else:
                return df
    return df.withColumn(target, lit(None).cast(spark_type))

def _align_and_enrich_schema(base_df: DataFrame, run_id: str, year: int, month: int, source_url: str) -> DataFrame:
    """
    - Castea timestamps
    - Rellena columnas modernas si faltan (airport_fee, cbd_congestion_fee)
    - Agrega metadatos (RUN_ID, SERVICE_TYPE, SOURCE_YEAR, SOURCE_MONTH, INGESTED_AT_UTC, SOURCE_PATH)
    - Reordena columnas exactamente como Snowflake las espera
    """
    df = base_df

    # Timestamps ‚Üí timestamp (Spark) ‚Üí Snowflake TIMESTAMP_NTZ
    if "tpep_pickup_datetime" in df.columns:
        df = df.withColumn("tpep_pickup_datetime", col("tpep_pickup_datetime").cast("timestamp"))
    if "tpep_dropoff_datetime" in df.columns:
        df = df.withColumn("tpep_dropoff_datetime", col("tpep_dropoff_datetime").cast("timestamp"))

    # airport_fee en min√∫scula (en 2015 puede venir como airport_fee o Airport_fee)
    df = _ensure_column_present(df, ["airport_fee", "Airport_fee"], "airport_fee", "float")

    # 2015 NO trae cbd_congestion_fee: crear NULL si no est√°
    df = _ensure_column_present(df, ["cbd_congestion_fee", "CBD_CONGESTION_FEE"], "cbd_congestion_fee", "float")

    # Metadatos
    df = (
        df.withColumn("RUN_ID", lit(run_id))
          .withColumn("SERVICE_TYPE", lit(SERVICE_TYPE))
          .withColumn("SOURCE_YEAR", lit(year))
          .withColumn("SOURCE_MONTH", lit(month))
          .withColumn("INGESTED_AT_UTC", current_timestamp())
          .withColumn("SOURCE_PATH", lit(source_url))
    )

    # Orden EXACTO como Snowflake
    df = df.select(SNOWFLAKE_FINAL_COLS)
    return df

# =======================
# Idempotencia por RUN_ID
# =======================
def _delete_previous_chunk_sql(run_id: str) -> str:
    """SQL que limpia datos previos de ese RUN_ID en ambas tablas antes de cargar."""
    return f"""
        DELETE FROM RAW.NYC_YELLOW_TAXIS WHERE RUN_ID = '{run_id}';
        DELETE FROM RAW.INGEST_AUDIT     WHERE RUN_ID = '{run_id}';
    """

# =======================
# Writes JDBC puros
# =======================
def _write_chunk_to_snowflake(df_chunk: DataFrame, sfOptions: dict, run_id: str):
    """
    Escribe el chunk en RAW.NYC_YELLOW_TAXIS forzando JDBC (sin COPY/Stage).
    """
    pre_sql = _delete_previous_chunk_sql(run_id)
    (
        df_chunk.write
        .format("snowflake")
        .options(**sfOptions)
        .option("dbtable", "RAW.NYC_YELLOW_TAXIS")
        .option("preactions", pre_sql)
        .option("usestagetable", "false")     # ‚Üê evita staging table
        .option("use_copy_unload", "false")   # ‚Üê fuerza JDBC puro (sin COPY)
        .mode("append")
        .save()
    )

def _write_audit_row(sfOptions: dict, run_id: str, row_count: int, spark_session, year: int, month: int, source_url: str):
    """
    Inserta la fila de auditor√≠a del chunk en RAW.INGEST_AUDIT
    usando Spark DataFrame (sin pandas).
    """
    audit_df = spark_session.range(1).select(
        lit(run_id).alias("RUN_ID"),
        lit(SERVICE_TYPE).alias("SERVICE_TYPE"),
        lit(year).alias("SOURCE_YEAR"),
        lit(month).alias("SOURCE_MONTH"),
        current_timestamp().alias("INGESTED_AT_UTC"),
        lit(source_url).alias("SOURCE_PATH"),
        lit(row_count).cast("long").alias("ROW_COUNT")
    )

    pre_sql = f"DELETE FROM RAW.INGEST_AUDIT WHERE RUN_ID = '{run_id}';"
    (
        audit_df.write
        .format("snowflake")
        .options(**sfOptions)
        .option("dbtable", "RAW.INGEST_AUDIT")
        .option("preactions", pre_sql)
        .option("usestagetable", "false")     # ‚Üê evita staging table
        .option("use_copy_unload", "false")   # ‚Üê fuerza JDBC puro (sin COPY)
        .mode("append")
        .save()
    )

# =======================
# Funci√≥n principal
# =======================
def ingest_yellow_month_chunked(
    spark,
    sfOptions: dict,
    file_path: str,
    year: int,
    month: int,
    chunk_size: int = CHUNK_SIZE
):
    """
    Lee el Parquet del mes indicado, divide en chunks de 'chunk_size',
    alinea esquema din√°micamente, escribe cada chunk con RUN_ID = YEL_YYYY_MM_XXXX,
    y registra auditor√≠a por chunk.
    """
    source_url = f"https://d37ci6vzurychx.cloudfront.net/trip-data/yellow_tripdata_{year}-{month:02d}.parquet"

    print(f"üì• Leyendo Parquet local: {file_path}")
    base_df = spark.read.parquet(file_path)

    # Indexado estable para chunking
    w = Window.orderBy(monotonically_increasing_id())
    df_indexed = base_df.withColumn("_rn", row_number().over(w))

    total_rows = df_indexed.count()
    num_chunks = math.ceil(total_rows / chunk_size)
    print(f"üìä Filas totales: {total_rows} ‚Üí Chunks de {chunk_size}: {num_chunks}")

    base_run_prefix = f"YEL_{year}_{month:02d}_"

    for i in range(num_chunks):
        start = i * chunk_size + 1
        end = min((i + 1) * chunk_size, total_rows)
        run_id = f"{base_run_prefix}{(i+1):04d}"

        print(f"üß© Chunk {i+1}/{num_chunks} ‚Üí filas [{start}, {end}] ‚Üí RUN_ID={run_id}")

        # Rebanar el chunk
        chunk_base = df_indexed.filter((col("_rn") >= start) & (col("_rn") <= end)).drop("_rn")

        # Alinear, enriquecer y reordenar columnas al est√°ndar Snowflake
        chunk_df_for_sf = _align_and_enrich_schema(chunk_base, run_id, year, month, source_url)

        # Escribir chunk a tabla principal (con preactions para idempotencia)
        _write_chunk_to_snowflake(chunk_df_for_sf, sfOptions, run_id)

        # Registrar auditor√≠a del chunk
        _write_audit_row(sfOptions, run_id, row_count=(end - start + 1), spark_session=spark,
                         year=year, month=month, source_url=source_url)

    print("‚úÖ Ingesta mensual chunked COMPLETADA con auditor√≠a por chunk.")


In [5]:
file_path = "/home/work/yellow_2015_02.parquet"

ingest_yellow_month_chunked(
    spark=spark,
    sfOptions=sfOptions,   # ya definido con credenciales
    file_path=file_path,
    year=2015,
    month=2,
    chunk_size=1_000_000
)

üì• Leyendo Parquet local: /home/work/yellow_2015_02.parquet
üìä Filas totales: 12442394 ‚Üí Chunks de 1000000: 13
üß© Chunk 1/13 ‚Üí filas [1, 1000000] ‚Üí RUN_ID=YEL_2015_02_0001
üß© Chunk 2/13 ‚Üí filas [1000001, 2000000] ‚Üí RUN_ID=YEL_2015_02_0002
üß© Chunk 3/13 ‚Üí filas [2000001, 3000000] ‚Üí RUN_ID=YEL_2015_02_0003
üß© Chunk 4/13 ‚Üí filas [3000001, 4000000] ‚Üí RUN_ID=YEL_2015_02_0004
üß© Chunk 5/13 ‚Üí filas [4000001, 5000000] ‚Üí RUN_ID=YEL_2015_02_0005
üß© Chunk 6/13 ‚Üí filas [5000001, 6000000] ‚Üí RUN_ID=YEL_2015_02_0006
üß© Chunk 7/13 ‚Üí filas [6000001, 7000000] ‚Üí RUN_ID=YEL_2015_02_0007
üß© Chunk 8/13 ‚Üí filas [7000001, 8000000] ‚Üí RUN_ID=YEL_2015_02_0008
üß© Chunk 9/13 ‚Üí filas [8000001, 9000000] ‚Üí RUN_ID=YEL_2015_02_0009
üß© Chunk 10/13 ‚Üí filas [9000001, 10000000] ‚Üí RUN_ID=YEL_2015_02_0010
üß© Chunk 11/13 ‚Üí filas [10000001, 11000000] ‚Üí RUN_ID=YEL_2015_02_0011
üß© Chunk 12/13 ‚Üí filas [11000001, 12000000] ‚Üí RUN_ID=YEL_2015_02_0012
üß© 

In [6]:
# =======================
# Runner multi-mes: 2015-03 ‚Üí 2025-06
# =======================
import os
from datetime import date

def month_iter(start_year: int, start_month: int, end_year: int, end_month: int):
    """Itera (year, month) de inicio a fin, ambos inclusive."""
    y, m = start_year, start_month
    while (y < end_year) or (y == end_year and m <= end_month):
        yield y, m
        # avanzar un mes
        if m == 12:
            y, m = y + 1, 1
        else:
            m += 1

# Rango solicitado
START_YEAR, START_MONTH = 2015, 1
END_YEAR, END_MONTH = 2025, 8

base_dir = "/home/work"  # carpeta donde est√°n los parquet locales
ok, skipped, failed = 0, 0, 0

print(f"üöÄ Iniciando ingesta mensual: {START_YEAR}-{START_MONTH:02d} ‚Üí {END_YEAR}-{END_MONTH:02d}")
for yy, mm in month_iter(START_YEAR, START_MONTH, END_YEAR, END_MONTH):
    file_path = os.path.join(base_dir, f"yellow_{yy}_{mm:02d}.parquet")
    print("\n" + "="*80)
    print(f"üìÖ Procesando {yy}-{mm:02d} ‚Üí {file_path}")

    if not os.path.exists(file_path):
        print(f"‚ö†Ô∏è  Archivo NO encontrado, se omite: {file_path}")
        skipped += 1
        continue

    try:
        ingest_yellow_month_chunked(
            spark=spark,
            sfOptions=sfOptions,  # Debe estar definido en tu notebook
            file_path=file_path,
            year=yy,
            month=mm,
            chunk_size=1_000_000
        )
        ok += 1
    except Exception as e:
        failed += 1
        print(f"‚ùå Error en {yy}-{mm:02d}: {e}")

print("\n" + "="*80)
print(f"üèÅ Resumen:")
print(f"   ‚úÖ Meses OK     : {ok}")
print(f"   ‚è≠  Meses omitidos (sin archivo): {skipped}")
print(f"   ‚ùå Meses con error             : {failed}")
print("üéâ Terminado.")


üöÄ Iniciando ingesta mensual: 2017-08 ‚Üí 2017-09

üìÖ Procesando 2017-08 ‚Üí /home/work/yellow_2017_08.parquet
üì• Leyendo Parquet local: /home/work/yellow_2017_08.parquet
‚ùå Error en 2017-08: An error occurred while calling o50.parquet.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 1.0 failed 1 times, most recent failure: Lost task 0.0 in stage 1.0 (TID 1) (5f892c4bd431 executor driver): org.apache.spark.SparkException: Exception thrown in awaitResult: 
	at org.apache.spark.util.SparkThreadUtils$.awaitResult(SparkThreadUtils.scala:56)
	at org.apache.spark.util.ThreadUtils$.awaitResult(ThreadUtils.scala:310)
	at org.apache.spark.util.ThreadUtils$.parmap(ThreadUtils.scala:383)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.readParquetFootersInParallel(ParquetFileFormat.scala:443)
	at org.apache.spark.sql.execution.datasources.parquet.ParquetFileFormat$.$anonfun$mergeSchemasInParallel$1(ParquetFileFormat.scala:493)
	

In [5]:
# ======================================================================================
# Ingesta GREEN Taxi ‚Üí Snowflake por chunks (JDBC puro) [misma arquitectura que Yellow]
# ======================================================================================

# -----------------------
# Configuraci√≥n espec√≠fica
# -----------------------
SERVICE_TYPE_GREEN = "GREEN"
CHUNK_SIZE_GREEN   = 500_000  # puedes cambiarlo si quieres

# -----------------------
# Columnas destino Snowflake (RAW.NYC_GREEN_TAXIS) en el orden exacto que compartiste
# -----------------------
SNOWFLAKE_FINAL_COLS_GREEN = [
    "VendorID",
    "lpep_pickup_datetime", "lpep_dropoff_datetime",
    "store_and_fwd_flag",
    "RatecodeID",
    "PULocationID", "DOLocationID",
    "passenger_count",
    "trip_distance",
    "fare_amount", "extra", "mta_tax", "tip_amount", "tolls_amount",
    "ehail_fee",
    "improvement_surcharge",
    "total_amount",
    "payment_type",
    "trip_type",
    "congestion_surcharge",
    "cbd_congestion_fee",
    # metadatos
    "RUN_ID", "SERVICE_TYPE", "SOURCE_YEAR", "SOURCE_MONTH",
    "INGESTED_AT_UTC", "SOURCE_PATH"
]

# -----------------------
# Auditor√≠a (RAW.INGEST_AUDIT) ‚Äì mismo orden que usaste en Yellow
# -----------------------
AUDIT_COLS_ORDER_GREEN = [
    "RUN_ID", "SERVICE_TYPE", "SOURCE_YEAR", "SOURCE_MONTH",
    "INGESTED_AT_UTC", "SOURCE_PATH", "ROW_COUNT"
]

# -----------------------
# Utilidad: asegurar/renombrar columna (reusa tu _ensure_column_present si ya existe)
# -----------------------
try:
    _ensure_column_present
except NameError:
    from pyspark.sql.functions import lit
    from pyspark.sql import DataFrame
    def _ensure_column_present(df: DataFrame, candidates, target, spark_type) -> DataFrame:
        existing = set(df.columns)
        for c in candidates:
            if c in existing:
                return df if c == target else df.withColumnRenamed(c, target)
        return df.withColumn(target, lit(None).cast(spark_type))

# -----------------------
# Alinear esquema para GREEN
# -----------------------
from pyspark.sql.functions import col, current_timestamp, lit

def _align_and_enrich_schema_green(base_df, run_id: str, year: int, month: int, source_url: str):
    df = base_df

    # lpep_* timestamps ‚Üí Spark timestamp (Snowflake TIMESTAMP_NTZ)
    if "lpep_pickup_datetime" in df.columns:
        df = df.withColumn("lpep_pickup_datetime", col("lpep_pickup_datetime").cast("timestamp"))
    if "lpep_dropoff_datetime" in df.columns:
        df = df.withColumn("lpep_dropoff_datetime", col("lpep_dropoff_datetime").cast("timestamp"))

    # Columnas que pueden faltar en ciertos a√±os/meses ‚Üí crear como NULL si no existen
    df = _ensure_column_present(df, ["ehail_fee", "Ehail_fee", "E HAIL FEE"], "ehail_fee", "float")
    df = _ensure_column_present(df, ["trip_type", "Trip_type"], "trip_type", "float")
    df = _ensure_column_present(df, ["congestion_surcharge", "Congestion_Surcharge"], "congestion_surcharge", "float")
    df = _ensure_column_present(df, ["cbd_congestion_fee", "CBD_CONGESTION_FEE"], "cbd_congestion_fee", "float")

    # Metadatos
    df = (
        df.withColumn("RUN_ID", lit(run_id))
          .withColumn("SERVICE_TYPE", lit(SERVICE_TYPE_GREEN))
          .withColumn("SOURCE_YEAR", lit(year))
          .withColumn("SOURCE_MONTH", lit(month))
          .withColumn("INGESTED_AT_UTC", current_timestamp())
          .withColumn("SOURCE_PATH", lit(source_url))
    )

    # Orden EXACTO
    df = df.select(SNOWFLAKE_FINAL_COLS_GREEN)
    return df

# -----------------------
# SQL de limpieza idempotente para GREEN
# -----------------------
def _delete_previous_chunk_sql_green(run_id: str) -> str:
    return f"""
        DELETE FROM RAW.NYC_GREEN_TAXIS WHERE RUN_ID = '{run_id}';
        DELETE FROM RAW.INGEST_AUDIT     WHERE RUN_ID = '{run_id}';
    """

# -----------------------
# Writes JDBC puros (sin COPY/Stage) para GREEN
# -----------------------
def _write_chunk_to_snowflake_green(df_chunk, sfOptions: dict, run_id: str):
    pre_sql = _delete_previous_chunk_sql_green(run_id)
    (
        df_chunk.write
        .format("snowflake")
        .options(**sfOptions)
        .option("dbtable", "RAW.NYC_GREEN_TAXIS")
        .option("preactions", pre_sql)
        .option("usestagetable", "false")
        .option("use_copy_unload", "false")
        .mode("append")
        .save()
    )

def _write_audit_row_green(sfOptions: dict, run_id: str, row_count: int, spark_session, year: int, month: int, source_url: str):
    audit_df = spark_session.range(1).select(
        lit(run_id).alias("RUN_ID"),
        lit(SERVICE_TYPE_GREEN).alias("SERVICE_TYPE"),
        lit(year).alias("SOURCE_YEAR"),
        lit(month).alias("SOURCE_MONTH"),
        current_timestamp().alias("INGESTED_AT_UTC"),
        lit(source_url).alias("SOURCE_PATH"),
        lit(row_count).cast("long").alias("ROW_COUNT")
    )

    pre_sql = f"DELETE FROM RAW.INGEST_AUDIT WHERE RUN_ID = '{run_id}';"
    (
        audit_df.write
        .format("snowflake")
        .options(**sfOptions)
        .option("dbtable", "RAW.INGEST_AUDIT")
        .option("preactions", pre_sql)
        .option("usestagetable", "false")
        .option("use_copy_unload", "false")
        .mode("append")
        .save()
    )

# -----------------------
# Funci√≥n principal GREEN
# -----------------------
from pyspark.sql.window import Window
from pyspark.sql.functions import row_number, monotonically_increasing_id
import math

def ingest_green_month_chunked(
    spark,
    sfOptions: dict,
    file_path: str,
    year: int,
    month: int,
    chunk_size: int = CHUNK_SIZE_GREEN
):
    """
    Lee el Parquet local del mes, divide en chunks, alinea esquema para GREEN,
    escribe cada chunk en RAW.NYC_GREEN_TAXIS con RUN_ID = GRE_YYYY_MM_XXXX
    y registra auditor√≠a por chunk en RAW.INGEST_AUDIT.
    """
    source_url = f"https://d37ci6vzurychx.cloudfront.net/trip-data/green_tripdata_{year}-{month:02d}.parquet"

    print(f"üì• [GREEN] Leyendo Parquet local: {file_path}")
    base_df = spark.read.parquet(file_path)

    w = Window.orderBy(monotonically_increasing_id())
    df_indexed = base_df.withColumn("_rn", row_number().over(w))

    total_rows = df_indexed.count()
    num_chunks  = math.ceil(total_rows / chunk_size)
    print(f"üìä [GREEN] Filas totales: {total_rows} ‚Üí Chunks de {chunk_size}: {num_chunks}")

    base_run_prefix = f"GRE_{year}_{month:02d}_"

    for i in range(num_chunks):
        start = i * chunk_size + 1
        end   = min((i + 1) * chunk_size, total_rows)
        run_id = f"{base_run_prefix}{(i+1):04d}"

        print(f"üß© [GREEN] Chunk {i+1}/{num_chunks} ‚Üí filas [{start}, {end}] ‚Üí RUN_ID={run_id}")

        # Slicing
        chunk_base = df_indexed.filter((col("_rn") >= start) & (col("_rn") <= end)).drop("_rn")

        # Alinear y enriquecer
        chunk_df_for_sf = _align_and_enrich_schema_green(chunk_base, run_id, year, month, source_url)

        # Escribir datos y auditor√≠a
        _write_chunk_to_snowflake_green(chunk_df_for_sf, sfOptions, run_id)
        _write_audit_row_green(sfOptions, run_id, row_count=(end - start + 1),
                               spark_session=spark, year=year, month=month, source_url=source_url)

    print("‚úÖ [GREEN] Ingesta mensual chunked COMPLETADA con auditor√≠a por chunk.")
# -----------------------
# Prueba r√°pida de lectura (opcional)
# -----------------------
try:
    df_green = (
        spark.read
        .format("snowflake")
        .options(**sfOptions)
        .option("dbtable", "RAW.NYC_GREEN_TAXIS")
        .load()
    )
    print("üîé Muestra de RAW.NYC_GREEN_TAXIS:")
    df_green.show(5)
except Exception as e:
    print(f"‚ö†Ô∏è No se pudo leer RAW.NYC_GREEN_TAXIS todav√≠a (quiz√° no existe o no hay permisos): {e}")

üîé Muestra de RAW.NYC_GREEN_TAXIS:
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+------------+---------+--------------------+------------------+------+------------+-----------+------------+---------------+-----------+
|VENDORID|LPEP_PICKUP_DATETIME|LPEP_DROPOFF_DATETIME|STORE_AND_FWD_FLAG|RATECODEID|PULOCATIONID|DOLOCATIONID|PASSENGER_COUNT|TRIP_DISTANCE|FARE_AMOUNT|EXTRA|MTA_TAX|TIP_AMOUNT|TOLLS_AMOUNT|EHAIL_FEE|IMPROVEMENT_SURCHARGE|TOTAL_AMOUNT|PAYMENT_TYPE|TRIP_TYPE|CONGESTION_SURCHARGE|CBD_CONGESTION_FEE|RUN_ID|SERVICE_TYPE|SOURCE_YEAR|SOURCE_MONTH|INGESTED_AT_UTC|SOURCE_PATH|
+--------+--------------------+---------------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------+---------------------+

In [7]:
# -----------------------
# Runner multi-mes para GREEN (ajusta rango/base_dir seg√∫n tus archivos locales)
# -----------------------
from datetime import date
import os

# Usa tu month_iter existente; si no, descomenta:
def month_iter(start_year: int, start_month: int, end_year: int, end_month: int):
 y, m = start_year, start_month
 while (y < end_year) or (y == end_year and m <= end_month):
     yield y, m
     m = 1 if m == 12 else m + 1
     y = y + 1 if m == 1 else y

START_YEAR_G, START_MONTH_G = 2016,2
END_YEAR_G,   END_MONTH_G   = 2025, 8

base_dir_green = "/home/work"  # carpeta donde est√°n los parquet locales
ok_g, skipped_g, failed_g = 0, 0, 0

print(f"üöÄ [GREEN] Iniciando ingesta mensual: {START_YEAR_G}-{START_MONTH_G:02d} ‚Üí {END_YEAR_G}-{END_MONTH_G:02d}")
for yy, mm in month_iter(START_YEAR_G, START_MONTH_G, END_YEAR_G, END_MONTH_G):
    file_path = os.path.join(base_dir_green, f"green_{yy}_{mm:02d}.parquet")  # p.ej., green_2017_08.parquet
    print("\n" + "="*80)
    print(f"üìÖ [GREEN] Procesando {yy}-{mm:02d} ‚Üí {file_path}")

    if not os.path.exists(file_path):
        print(f"‚è≠  [GREEN] Archivo NO encontrado, se omite: {file_path}")
        skipped_g += 1
        continue

    try:
        ingest_green_month_chunked(
            spark=spark,
            sfOptions=sfOptions,
            file_path=file_path,
            year=yy,
            month=mm,
            chunk_size=CHUNK_SIZE_GREEN
        )
        ok_g += 1
    except Exception as e:
        failed_g += 1
        print(f"‚ùå [GREEN] Error en {yy}-{mm:02d}: {e}")

print("\n" + "="*80)
print(f"üèÅ [GREEN] Resumen:")
print(f"   ‚úÖ Meses OK     : {ok_g}")
print(f"   ‚è≠  Meses omitidos (sin archivo): {skipped_g}")
print(f"   ‚ùå Meses con error             : {failed_g}")
print("üéâ [GREEN] Terminado.")

üöÄ [GREEN] Iniciando ingesta mensual: 2016-02 ‚Üí 2025-08

üìÖ [GREEN] Procesando 2016-02 ‚Üí /home/work/green_2016_02.parquet
üì• [GREEN] Leyendo Parquet local: /home/work/green_2016_02.parquet
üìä [GREEN] Filas totales: 1510722 ‚Üí Chunks de 500000: 4
üß© [GREEN] Chunk 1/4 ‚Üí filas [1, 500000] ‚Üí RUN_ID=GRE_2016_02_0001
üß© [GREEN] Chunk 2/4 ‚Üí filas [500001, 1000000] ‚Üí RUN_ID=GRE_2016_02_0002
üß© [GREEN] Chunk 3/4 ‚Üí filas [1000001, 1500000] ‚Üí RUN_ID=GRE_2016_02_0003
üß© [GREEN] Chunk 4/4 ‚Üí filas [1500001, 1510722] ‚Üí RUN_ID=GRE_2016_02_0004
‚úÖ [GREEN] Ingesta mensual chunked COMPLETADA con auditor√≠a por chunk.

üìÖ [GREEN] Procesando 2016-03 ‚Üí /home/work/green_2016_03.parquet
üì• [GREEN] Leyendo Parquet local: /home/work/green_2016_03.parquet
üìä [GREEN] Filas totales: 1576393 ‚Üí Chunks de 500000: 4
üß© [GREEN] Chunk 1/4 ‚Üí filas [1, 500000] ‚Üí RUN_ID=GRE_2016_03_0001
üß© [GREEN] Chunk 2/4 ‚Üí filas [500001, 1000000] ‚Üí RUN_ID=GRE_2016_03_0002
üß© 