In [None]:
# IMPORTS NECESARIOS PARA PROCESAMIENTO DE DATOS
from pyspark.sql import functions as F
from pyspark.sql import types as T
from datetime import datetime
import os

print("✅ Imports para procesamiento de datos cargados")

In [1]:
# DIAGNÓSTICO COMPLETO - SPARK + SNOWFLAKE
# Vamos a diagnosticar si son las credenciales o la conexión Spark

import os
import sys
import time
from pyspark.sql import SparkSession
from pyspark import SparkContext

print("=== DIAGNÓSTICO PASO A PASO ===")

# PASO 1: Limpiar entorno Spark completamente
print("\n1️⃣ LIMPIANDO ENTORNO SPARK...")

# Terminar procesos existentes
try:
    os.system("pkill -f 'java.*spark' 2>/dev/null || true")
    os.system("pkill -f pyspark 2>/dev/null || true")
    time.sleep(2)
    print("✅ Procesos Spark terminados")
except:
    print("⚠️ No se pudieron terminar procesos (normal)")

# Limpiar variables de entorno
env_vars = ['PYSPARK_GATEWAY_PORT', 'PYSPARK_GATEWAY_SECRET', 'SPARK_LOCAL_IP']
for var in env_vars:
    if var in os.environ:
        del os.environ[var]
        print(f"✅ Variable {var} eliminada")

# Limpiar contextos Spark
try:
    if 'spark' in globals():
        spark.stop()
        del spark
    if 'sc' in globals():
        sc.stop()
        del sc
    SparkSession._instantiatedSession = None
    SparkContext._active_spark_context = None
    print("✅ Contextos Spark limpiados")
except Exception as e:
    print(f"⚠️ Limpieza contexto: {e}")

time.sleep(3)

# PASO 2: Configurar JARs
print("\n2️⃣ CONFIGURANDO JARS...")
jars_dir = '/home/jovyan/work/jars'
spark_jars = f"{jars_dir}/spark-snowflake_2.12-2.12.0-spark_3.4.jar,{jars_dir}/snowflake-jdbc-3.14.4.jar"

# Verificar que los JARs existen
jar_files = [
    f"{jars_dir}/spark-snowflake_2.12-2.12.0-spark_3.4.jar",
    f"{jars_dir}/snowflake-jdbc-3.14.4.jar"
]

for jar in jar_files:
    if os.path.exists(jar):
        print(f"✅ JAR encontrado: {os.path.basename(jar)}")
    else:
        print(f"❌ JAR FALTANTE: {jar}")

# PASO 3: Crear Spark con configuración ultra-minimalista
print("\n3️⃣ CREANDO SPARK (CONFIGURACIÓN MÍNIMA)...")

try:
    # Configuración ultra-simple para evitar ConnectionRefusedError
    spark_test = SparkSession.builder \
        .appName(f"DiagnosticoSimple_{int(time.time())}") \
        .master("local[1]") \
        .config("spark.jars", spark_jars) \
        .config("spark.driver.memory", "512m") \
        .config("spark.executor.memory", "512m") \
        .config("spark.driver.host", "localhost") \
        .config("spark.driver.bindAddress", "127.0.0.1") \
        .config("spark.ui.enabled", "false") \
        .config("spark.sql.warehouse.dir", "/tmp/spark-warehouse") \
        .getOrCreate()
    
    print("✅ SPARK CREADO EXITOSAMENTE!")
    
    # Test básico de funcionamiento
    test_df = spark_test.range(3)
    test_count = test_df.count()
    print(f"✅ Test Spark: {test_count} registros")
    
except Exception as e:
    print(f"❌ ERROR CREANDO SPARK: {e}")
    print(f"   Tipo: {type(e).__name__}")
    print("\n🔧 RECOMENDACIONES:")
    print("   - Reinicia el kernel: Kernel -> Restart")
    print("   - Reinicia el contenedor Docker")
    print("   - Verifica memoria disponible")
    
    # Intentar diagnóstico adicional
    try:
        import subprocess
        result = subprocess.run(['ps', 'aux'], capture_output=True, text=True)
        java_processes = [line for line in result.stdout.split('\n') if 'java' in line.lower()]
        if java_processes:
            print(f"\n🔍 Procesos Java detectados: {len(java_processes)}")
    except:
        pass
    
    raise Exception("No se pudo crear Spark")

# PASO 4: Configuraciones Snowflake (múltiples para probar)
print("\n4️⃣ CONFIGURANDO CREDENCIALES SNOWFLAKE...")

# Configuración 1: Credenciales originales
sf_config_original = {
    "sfURL": "YKFGMFI-GRC01155.snowflakecomputing.com",
    "sfUser": "MARTIN",
    "sfPassword": "P7kh2nUSu727FKZ",
    "sfDatabase": "NY_TAXI",
    "sfSchema": "RAW",
    "sfWarehouse": "COMPUTE_WH",
    "sfRole": "ACCOUNTADMIN"
}

# Configuración 2: Con diferentes opciones de formato URL
sf_config_alt1 = sf_config_original.copy()
sf_config_alt1["sfURL"] = "https://YKFGMFI-GRC01155.snowflakecomputing.com"

# Configuración 3: Con puerto explícito
sf_config_alt2 = sf_config_original.copy()
sf_config_alt2["sfURL"] = "YKFGMFI-GRC01155.snowflakecomputing.com:443"

print("✅ Configuraciones Snowflake preparadas")

# PASO 5: Probar conexiones progresivamente
print("\n5️⃣ PROBANDO CONEXIONES SNOWFLAKE...")

configs_to_test = [
    ("Original", sf_config_original),
    ("Con HTTPS", sf_config_alt1),
    ("Con Puerto", sf_config_alt2)
]

conexion_exitosa = False

for config_name, sf_config in configs_to_test:
    print(f"\n🔍 Probando configuración: {config_name}")
    print(f"   URL: {sf_config['sfURL']}")
    print(f"   Usuario: {sf_config['sfUser']}")
    print(f"   Base de datos: {sf_config['sfDatabase']}")
    
    try:
        # Test de conexión básica (sin leer datos)
        print("   Creando DataFrame de prueba...")
        
        test_data = [(1, f"test_{config_name}"), (2, "conexion")]
        test_df = spark_test.createDataFrame(test_data, ["id", "descripcion"])
        print(f"   ✅ DataFrame: {test_df.count()} registros")
        
        # Intentar leer metadatos de Snowflake (más ligero que leer datos)
        print("   Intentando conectar a Snowflake...")
        
        # Usar una consulta muy simple para verificar conectividad
        simple_query = "SELECT 1 as test_conexion"
        
        snow_test = spark_test.read \
            .format("net.snowflake.spark.snowflake") \
            .options(**sf_config) \
            .option("query", simple_query) \
            .load()
        
        resultado = snow_test.collect()
        print(f"   🎉 CONEXIÓN EXITOSA con {config_name}!")
        print(f"   📊 Resultado: {resultado[0]['TEST_CONEXION']}")
        
        conexion_exitosa = True
        config_exitosa = sf_config
        break
        
    except Exception as e:
        error_str = str(e)
        print(f"   ❌ Error con {config_name}: {type(e).__name__}")
        
        # Análisis específico del error
        if "authentication" in error_str.lower() or "credential" in error_str.lower():
            print("   🔍 PROBLEMA DE CREDENCIALES detectado")
        elif "network" in error_str.lower() or "connection" in error_str.lower():
            print("   🔍 PROBLEMA DE RED detectado")
        elif "snowflake" in error_str.lower():
            print("   🔍 PROBLEMA ESPECÍFICO DE SNOWFLAKE")
        elif "connectionrefused" in error_str.lower():
            print("   🔍 PROBLEMA DE SPARK (no Snowflake)")
        else:
            print(f"   🔍 Error desconocido: {error_str[:100]}...")
        
        continue

# PASO 6: Si hay conexión exitosa, probar tabla real
if conexion_exitosa:
    print(f"\n6️⃣ PROBANDO LECTURA DE TABLA REAL...")
    
    try:
        print("Intentando leer TAXI_ZONE_LOOKUP...")
        
        zone_df = spark_test.read \
            .format("net.snowflake.spark.snowflake") \
            .options(**config_exitosa) \
            .option("dbtable", "TAXI_ZONE_LOOKUP") \
            .load()
        
        zone_count = zone_df.count()
        print(f"🎉 TABLA LEÍDA EXITOSAMENTE!")
        print(f"📊 TAXI_ZONE_LOOKUP: {zone_count} registros")
        
        # Mostrar muestra pequeña
        print("\n📋 Muestra de datos:")
        zone_df.show(2, truncate=False)
        
    except Exception as e:
        print(f"❌ Error leyendo tabla: {e}")
        
else:
    print("\n❌ NINGUNA CONFIGURACIÓN FUNCIONÓ")
    print("\n🔧 ACCIONES RECOMENDADAS:")
    print("1. Verificar credenciales Snowflake en interfaz web")
    print("2. Comprobar que el warehouse COMPUTE_WH está activo")
    print("3. Verificar permisos del usuario MARTIN")
    print("4. Confirmar que la base de datos NY_TAXI existe")

# PASO 7: Limpiar recursos
print(f"\n7️⃣ LIMPIANDO RECURSOS...")
try:
    if 'spark_test' in locals():
        spark_test.stop()
        print("✅ Sesión Spark cerrada")
except Exception as e:
    print(f"⚠️ Error cerrando Spark: {e}")

print(f"\n🏁 DIAGNÓSTICO COMPLETADO")

if conexion_exitosa:
    print("✅ Resultado: CONEXIÓN EXITOSA - Puedes proceder con el código principal")
else:
    print("❌ Resultado: PROBLEMAS DE CONEXIÓN - Revisar credenciales/configuración")

=== DIAGNÓSTICO PASO A PASO ===

1️⃣ LIMPIANDO ENTORNO SPARK...
✅ Procesos Spark terminados
✅ Variable SPARK_LOCAL_IP eliminada
✅ Contextos Spark limpiados

2️⃣ CONFIGURANDO JARS...
✅ JAR encontrado: spark-snowflake_2.12-2.12.0-spark_3.4.jar
✅ JAR encontrado: snowflake-jdbc-3.14.4.jar

3️⃣ CREANDO SPARK (CONFIGURACIÓN MÍNIMA)...
✅ SPARK CREADO EXITOSAMENTE!
✅ Test Spark: 3 registros

4️⃣ CONFIGURANDO CREDENCIALES SNOWFLAKE...
✅ Configuraciones Snowflake preparadas

5️⃣ PROBANDO CONEXIONES SNOWFLAKE...

🔍 Probando configuración: Original
   URL: YKFGMFI-GRC01155.snowflakecomputing.com
   Usuario: MARTIN
   Base de datos: NY_TAXI
   Creando DataFrame de prueba...
   ✅ DataFrame: 2 registros
   Intentando conectar a Snowflake...
   🎉 CONEXIÓN EXITOSA con Original!
   📊 Resultado: 1

6️⃣ PROBANDO LECTURA DE TABLA REAL...
Intentando leer TAXI_ZONE_LOOKUP...
🎉 TABLA LEÍDA EXITOSAMENTE!
📊 TAXI_ZONE_LOOKUP: 265 registros

📋 Muestra de datos:
+----------+-------+--------------+------------+
|LO

In [37]:
import pyspark 
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T

In [None]:
import os
from pathlib import Path

# Descarga datos yellow y green + taxi_zone_lookup para todos los meses de 2015 a 2025
start_year = 2019
end_year = 2025
months = range(1, 13)
base_url = 'https://d37ci6vzurychx.cloudfront.net/trip-data'
zone_url = 'https://d37ci6vzurychx.cloudfront.net/misc/taxi_zone_lookup.csv'
data_dir = '/home/jovyan/work/data'  # Carpeta montada por Docker, accesible desde local
os.makedirs(data_dir, exist_ok=True)
missing_files = []
for year in range(start_year, end_year + 1):
    for color in ['yellow', 'green']:
        for m in months:
            fname = f'{color}_tripdata_{year}-{m:02d}.parquet'
            url = f'{base_url}/{fname}'
            dest = f'{data_dir}/{fname}'
            exit_code = os.system(f'wget -O {dest} {url}')
            if exit_code != 0 or not Path(dest).is_file():
                missing_files.append(fname)
# Descarga taxi_zone_lookup
zone_dest = f'{data_dir}/taxi_zone_lookup.csv'
exit_code = os.system(f'wget -O {zone_dest} {zone_url}')
if exit_code != 0 or not Path(zone_dest).is_file():
    missing_files.append('taxi_zone_lookup.csv')

# Resumen
if missing_files:
    print('Faltan los siguientes archivos:')
    for f in missing_files:
        print('-', f)
else:
    print('Todos los archivos descargados correctamente.')

In [None]:
# Instalación de dependencias y descarga de JARs para Spark-Snowflake
!pip install snowflake-snowpark-python snowflake-connector-python

# Crear directorio para JARs si no existe
import os
jars_dir = '/home/jovyan/work/jars'
os.makedirs(jars_dir, exist_ok=True)

# Descargar JARs necesarios para Spark 3.x con Scala 2.12
# Snowflake Spark Connector compatible con Spark 3.x y Scala 2.12
snowflake_jar_url = "https://repo1.maven.org/maven2/net/snowflake/spark-snowflake_2.12/2.12.0-spark_3.4/spark-snowflake_2.12-2.12.0-spark_3.4.jar"
snowflake_jdbc_url = "https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/3.14.4/snowflake-jdbc-3.14.4.jar"

# Descargar JARs
!wget -O {jars_dir}/spark-snowflake_2.12-2.12.0-spark_3.4.jar {snowflake_jar_url}
!wget -O {jars_dir}/snowflake-jdbc-3.14.4.jar {snowflake_jdbc_url}

print("JARs descargados exitosamente")

--2025-10-18 23:06:46--  https://repo1.maven.org/maven2/net/snowflake/spark-snowflake_2.12/2.12.0-spark_3.4/spark-snowflake_2.12-2.12.0-spark_3.4.jar
Resolving repo1.maven.org (repo1.maven.org)... 104.18.18.12, 104.18.19.12, 2606:4700::6812:130c, ...
Connecting to repo1.maven.org (repo1.maven.org)|104.18.18.12|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 772490 (754K) [application/java-archive]
Saving to: ‘/home/jovyan/work/jars/spark-snowflake_2.12-2.12.0-spark_3.4.jar’


2025-10-18 23:06:47 (2.77 MB/s) - ‘/home/jovyan/work/jars/spark-snowflake_2.12-2.12.0-spark_3.4.jar’ saved [772490/772490]

--2025-10-18 23:06:47--  https://repo1.maven.org/maven2/net/snowflake/snowflake-jdbc/3.14.4/snowflake-jdbc-3.14.4.jar
Resolving repo1.maven.org (repo1.maven.org)... 104.18.18.12, 104.18.19.12, 2606:4700::6812:130c, ...
Connecting to repo1.maven.org (repo1.maven.org)|104.18.18.12|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 70552650 (67M) 

In [9]:
# Reinicio completo de Spark para resolver problemas de conexión
import os
import sys
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T
from datetime import datetime

# 1. Limpiar completamente cualquier sesión Spark existente
try:
    # Intentar cerrar sesión existente
    if 'spark' in locals():
        spark.stop()
    
    # Limpiar contexto de Spark
    if 'sc' in locals():
        sc.stop()
        
    # Limpiar variables de entorno problemáticas
    os.environ.pop('PYSPARK_GATEWAY_PORT', None)
    os.environ.pop('PYSPARK_GATEWAY_SECRET', None)
    
    print("Sesiones Spark anteriores cerradas")
except Exception as e:
    print(f"Error cerrando sesiones anteriores (normal): {e}")

# 2. Configuración de JARs
jars_dir = '/home/jovyan/work/jars'
spark_jars = f"{jars_dir}/spark-snowflake_2.12-2.12.0-spark_3.4.jar,{jars_dir}/snowflake-jdbc-3.14.4.jar"

# 3. Crear nueva sesión Spark con configuración simplificada
print("Creando nueva sesión Spark...")

spark = SparkSession.builder \
    .appName("NYC_Taxi_Ingesta_Raw_v2") \
    .config("spark.jars", spark_jars) \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.driver.maxResultSize", "1g") \
    .config("spark.sql.adaptive.enabled", "true") \
    .config("spark.sql.adaptive.coalescePartitions.enabled", "true") \
    .config("spark.serializer", "org.apache.spark.serializer.KryoSerializer") \
    .config("spark.driver.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") \
    .config("spark.executor.extraJavaOptions", "-Dio.netty.tryReflectionSetAccessible=true") \
    .getOrCreate()

# 4. Verificar que Spark funciona correctamente
try:
    test_df = spark.range(10)
    count = test_df.count()
    print(f"Spark funcionando correctamente. Test count: {count}")
except Exception as e:
    print(f"Error en test de Spark: {e}")
    raise

# 5. Configuración Snowflake
sfOptions = {
    "sfURL": "YKFGMFI-GRC01155.snowflakecomputing.com",
    "sfUser": "MARTIN",
    "sfPassword": "P7kh2nUSu727FKZ",
    "sfDatabase": "NY_TAXI",
    "sfSchema": "RAW",
    "sfWarehouse": "COMPUTE_WH",
    "sfRole": "ACCOUNTADMIN"
}

# 6. Variables de configuración
service_types = ['green']
start_year = 2015
end_year = 2015
months = range(2)
data_dir = '/home/jovyan/work/data'
run_id = f"run_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}"
ingested_at_utc = datetime.utcnow()
auditoria = []

# ================================
# ESTRATEGIA DE TABLAS ESPEJO Y PARTICIONADO
# ================================
# DECISIÓN ARQUITECTÓNICA: Partición lógica por año/mes en mismo esquema
# 
# JUSTIFICACIÓN:
# - Una tabla por servicio (GREEN_TAXI, YELLOW_TAXI) para mantener esquemas específicos
# - Partición lógica por año/mes usando metadatos (SOURCE_YEAR, SOURCE_MONTH)
# - Evita proliferación de tablas físicas (sería 24 tablas por año por servicio)
# - Facilita consultas cross-período y mantenimiento
# - Snowflake optimiza automáticamente con clustering en columnas temporales
#
# ESTRUCTURA:
# - GREEN_TAXI: contiene todos los años/meses de green taxi
# - YELLOW_TAXI: contiene todos los años/meses de yellow taxi  
# - TAXI_ZONE_LOOKUP: tabla de referencia
# - RAW_AUDITORIA: metadatos completos de todas las cargas
#
# METADATOS OBLIGATORIOS POR REGISTRO:
# - run_id: identificador único de ejecución
# - service_type: 'green' o 'yellow'
# - source_year, source_month: partición lógica temporal
# - ingested_at_utc: timestamp UTC de ingesta
# - source_path: ruta del archivo origen
# - batch_number: número de lote dentro del archivo
# - natural_key: clave de negocio para idempotencia
#
# IDEMPOTENCIA:
# - Clave natural: pickup_datetime + pulocationid + dolocationid + vendorid
# - Upsert strategy: DELETE + INSERT por (service_type, source_year, source_month)
# ================================

print(f"Configuración completada - Run ID: {run_id}")
print(f"Estrategia: Partición lógica por año/mes en tablas por servicio")

# 7. Función MEJORADA para carga idempotente a Snowflake con metadatos completos
def load_to_snowflake_idempotent(sdf, table_name, service_type, source_year, source_month, sfOptions, batch_size=1000000):
    """
    Carga idempotente a Snowflake con estrategia de reemplazo por partición lógica
    
    ESTRATEGIA DE IDEMPOTENCIA:
    1. Eliminar registros existentes de la misma partición (service_type + year + month)
    2. Insertar nuevos registros con metadatos completos
    3. Registrar auditoría detallada por lote
    """
    try:
        # PASO 1: Preparar DataFrame con metadatos obligatorios
        print(f"🔧 Preparando metadatos para {table_name} - {service_type} {source_year}-{source_month:02d}")
        
        sdf_cached = sdf.cache()
        total_rows = sdf_cached.count()
        
        print(f"📊 Registros a procesar: {total_rows:,}")
        
        if total_rows == 0:
            print("⚠️ No hay registros para procesar")
            return 0, []
        
        # PASO 2: Implementar estrategia de idempotencia
        print(f"🗑️ Eliminando registros existentes para {service_type} {source_year}-{source_month:02d}")
        
        delete_query = f"""
        DELETE FROM {table_name} 
        WHERE SERVICE_TYPE = '{service_type}' 
        AND SOURCE_YEAR = {source_year} 
        AND SOURCE_MONTH = {source_month}
        """
        
        try:
            # Ejecutar DELETE para idempotencia
            delete_df = spark.read \
                .format("net.snowflake.spark.snowflake") \
                .options(**sfOptions) \
                .option("query", delete_query) \
                .load()
            
            # Trigger execution
            delete_count = delete_df.count()
            print(f"✅ Limpieza completada (query ejecutada)")
            
        except Exception as delete_error:
            print(f"⚠️ Error en DELETE (puede ser normal si no hay datos previos): {delete_error}")
        
        # PASO 3: Procesar en lotes con metadatos por lote
        if total_rows <= batch_size:
            print(f"📦 Procesando como lote único...")
            
            # Agregar metadatos del lote
            sdf_with_metadata = sdf_cached \
                .withColumn('BATCH_NUMBER', F.lit(1)) \
                .withColumn('BATCH_SIZE', F.lit(total_rows)) \
                .withColumn('TOTAL_FILE_ROWS', F.lit(total_rows))
            
            # Cargar
            sdf_with_metadata.write \
                .format("net.snowflake.spark.snowflake") \
                .options(**sfOptions) \
                .option("dbtable", table_name) \
                .option("truncate_table", "off") \
                .option("usestagingtable", "on") \
                .option("continue_on_error", "false") \
                .mode("append") \
                .save()
            
            batch_audit = [{
                'batch_number': 1,
                'batch_rows': total_rows,
                'batch_start_time': datetime.utcnow(),
                'batch_end_time': datetime.utcnow()
            }]
            
            sdf_cached.unpersist()
            print(f"✅ {total_rows:,} filas cargadas en 1 lote")
            return total_rows, batch_audit
            
        else:
            # Procesamiento en múltiples lotes
            print(f"📦 Procesando en lotes de {batch_size:,} filas...")
            
            num_batches = (total_rows + batch_size - 1) // batch_size
            print(f"Total de lotes: {num_batches}")
            
            # Repartir para distribución uniforme
            sdf_repartitioned = sdf_cached.repartition(num_batches)
            
            total_loaded = 0
            batch_audit = []
            
            # Procesar particiones como lotes
            partitions = sdf_repartitioned.rdd.glom().collect()
            
            for batch_num, partition_data in enumerate(partitions, 1):
                if not partition_data:
                    continue
                
                batch_start = datetime.utcnow()
                batch_rows = len(partition_data)
                
                print(f"  📦 Lote {batch_num}/{len(partitions)}: {batch_rows:,} filas")
                
                # Crear DataFrame con metadatos de lote
                partition_df = spark.createDataFrame(partition_data, sdf_cached.schema)
                
                partition_with_metadata = partition_df \
                    .withColumn('BATCH_NUMBER', F.lit(batch_num)) \
                    .withColumn('BATCH_SIZE', F.lit(batch_rows)) \
                    .withColumn('TOTAL_FILE_ROWS', F.lit(total_rows))
                
                # Cargar lote
                partition_with_metadata.write \
                    .format("net.snowflake.spark.snowflake") \
                    .options(**sfOptions) \
                    .option("dbtable", table_name) \
                    .option("truncate_table", "off") \
                    .option("usestagingtable", "on") \
                    .option("continue_on_error", "false") \
                    .mode("append") \
                    .save()
                
                batch_end = datetime.utcnow()
                total_loaded += batch_rows
                
                # Registrar auditoría del lote
                batch_audit.append({
                    'batch_number': batch_num,
                    'batch_rows': batch_rows,
                    'batch_start_time': batch_start,
                    'batch_end_time': batch_end
                })
                
                print(f"    ✅ Lote {batch_num} completado en {(batch_end - batch_start).total_seconds():.1f}s")
            
            sdf_cached.unpersist()
            print(f"🎉 CARGA COMPLETA: {total_loaded:,} filas en {len(partitions)} lotes")
            return total_loaded, batch_audit
            
    except Exception as e:
        error_str = str(e)
        print(f"❌ Error cargando a {table_name}: {error_str}")
        
        # Limpiar cache en caso de error
        try:
            sdf.unpersist()
        except:
            pass
        
        return 0, []

# 8. Procesamiento por servicio, año, mes
total_files_processed = 0
total_rows_ingested = 0

for service_type in service_types:
    print(f"\nProcesando servicio: {service_type}")
    
    for year in range(start_year, end_year + 1):
        for month in months:
            fname = f'{service_type}_tripdata_{year}-{month+1:02d}.parquet'
            fpath = os.path.join(data_dir, fname)
            
            if not os.path.isfile(fpath):
                print(f"Archivo no encontrado: {fname}")
                continue
            
            print(f"Procesando {fname}")
            
            try:
                # Leer Parquet
                sdf = spark.read.parquet(fpath)
                
                # DIAGNÓSTICO: Verificar datos originales
                original_count = sdf.count()
                original_columns = len(sdf.columns)
                print(f"   📊 ARCHIVO ORIGINAL: {original_count:,} filas, {original_columns} columnas")
                
                # Mostrar esquema para debug
                print(f"   📋 Columnas originales: {sdf.columns[:10]}...")  # Primeras 10
                
                # Cache para múltiples operaciones
                sdf = sdf.cache()
                
                # Normalizar columnas timestamp CON VALIDACIÓN MEJORADA Y PRESERVACIÓN DE DATOS
                print(f"   🔧 Normalizando timestamps con preservación de datos...")
                
                # CORRECCIÓN CRÍTICA: Preservar los valores originales antes de transformar
                timestamp_mapping = {
                    'tpep_pickup_datetime': 'TPEP_PICKUP_DATETIME',
                    'tpep_dropoff_datetime': 'TPEP_DROPOFF_DATETIME', 
                    'lpep_pickup_datetime': 'LPEP_PICKUP_DATETIME',
                    'lpep_dropoff_datetime': 'LPEP_DROPOFF_DATETIME'
                }
                
                for old_col, new_col in timestamp_mapping.items():
                    if old_col in sdf.columns:
                        current_type = sdf.schema[old_col].dataType
                        print(f"   🕐 Procesando {old_col} (tipo: {current_type})")
                        
                        # Mostrar muestra de datos ANTES de transformar
                        sample_data = sdf.select(old_col).limit(3).collect()
                        print(f"   📊 Muestra ANTES: {[row[old_col] for row in sample_data]}")
                        
                        # Solo convertir si hay datos válidos
                        if not isinstance(current_type, T.NullType):
                            if isinstance(current_type, T.LongType):
                                # Epoch en milisegundos a timestamp
                                print(f"   🔄 Convirtiendo desde epoch milisegundos")
                                sdf = sdf.withColumn(new_col, 
                                    F.when(F.col(old_col).isNull(), None)
                                     .when(F.col(old_col) == 0, None)
                                     .otherwise(F.from_unixtime(F.col(old_col) / 1000).cast(T.TimestampType())))
                                     
                            elif isinstance(current_type, T.StringType):
                                # String a timestamp con múltiples formatos
                                print(f"   🔄 Convirtiendo desde string")
                                sdf = sdf.withColumn(new_col,
                                    F.when(F.col(old_col).isNull(), None)
                                     .when(F.col(old_col) == "", None)
                                     .otherwise(F.to_timestamp(F.col(old_col))))
                                     
                            elif isinstance(current_type, T.TimestampType):
                                # Ya es timestamp, solo renombrar si es necesario
                                print(f"   ✅ Ya es timestamp, renombrando")
                                if old_col != new_col:
                                    sdf = sdf.withColumnRenamed(old_col, new_col)
                                else:
                                    # No hacer nada, ya está correcto
                                    pass
                            else:
                                # Conversión directa para otros tipos
                                print(f"   🔄 Conversión directa desde {current_type}")
                                sdf = sdf.withColumn(new_col, F.col(old_col).cast(T.TimestampType()))
                        else:
                            # NullType - crear columna timestamp vacía
                            print(f"   ⚠️ Columna {old_col} es NullType, creando timestamp vacío")
                            sdf = sdf.withColumn(new_col, F.lit(None).cast(T.TimestampType()))
                        
                        # Verificar conversión
                        if new_col in sdf.columns:
                            sample_converted = sdf.select(new_col).limit(3).collect()
                            print(f"   📊 Muestra DESPUÉS: {[row[new_col] for row in sample_converted]}")
                        
                        # Eliminar columna original si es diferente
                        if old_col != new_col and old_col in sdf.columns:
                            sdf = sdf.drop(old_col)
                
                # Normalizar VendorID
                if 'VendorID' in sdf.columns:
                    sdf = sdf.withColumnRenamed('VendorID', 'VENDORID')
                if 'VENDORID' in sdf.columns:
                    sdf = sdf.withColumn('VENDORID', F.col('VENDORID').cast(T.IntegerType()))
                
                # Agregar metadatos OBLIGATORIOS COMPLETOS
                print(f"   📋 Agregando metadatos obligatorios...")
                sdf = sdf.withColumn('RUN_ID', F.lit(run_id)) \
                         .withColumn('SERVICE_TYPE', F.lit(service_type)) \
                         .withColumn('SOURCE_YEAR', F.lit(year)) \
                         .withColumn('SOURCE_MONTH', F.lit(month+1)) \
                         .withColumn('INGESTED_AT_UTC', F.lit(ingested_at_utc)) \
                         .withColumn('SOURCE_PATH', F.lit(fpath)) \
                         .withColumn('FILE_SIZE_BYTES', F.lit(os.path.getsize(fpath))) \
                         .withColumn('ORIGINAL_FILE_ROWS', F.lit(original_count))
                
                # Crear CLAVE NATURAL ROBUSTA para idempotencia
                print(f"   🔑 Creando clave natural robusta...")
                
                # Identificar columna pickup principal
                pickup_col = None
                dropoff_col = None
                
                if service_type == 'green':
                    pickup_col = 'LPEP_PICKUP_DATETIME'
                    dropoff_col = 'LPEP_DROPOFF_DATETIME'
                elif service_type == 'yellow':
                    pickup_col = 'TPEP_PICKUP_DATETIME'
                    dropoff_col = 'TPEP_DROPOFF_DATETIME'
                
                # Construir clave natural con componentes disponibles
                natural_key_components = []
                
                # 1. Timestamp pickup (obligatorio para clave válida)
                if pickup_col and pickup_col in sdf.columns:
                    natural_key_components.append(F.coalesce(F.col(pickup_col).cast('string'), F.lit('NULL_PICKUP')))
                else:
                    natural_key_components.append(F.lit('NO_PICKUP'))
                
                # 2. Ubicaciones (si están disponibles)
                if 'PULOCATIONID' in sdf.columns:
                    natural_key_components.append(F.coalesce(F.col('PULOCATIONID').cast('string'), F.lit('NULL_PU')))
                else:
                    natural_key_components.append(F.lit('NO_PU'))
                    
                if 'DOLOCATIONID' in sdf.columns:
                    natural_key_components.append(F.coalesce(F.col('DOLOCATIONID').cast('string'), F.lit('NULL_DO')))
                else:
                    natural_key_components.append(F.lit('NO_DO'))
                
                # 3. VendorID (si está disponible)
                if 'VENDORID' in sdf.columns:
                    natural_key_components.append(F.coalesce(F.col('VENDORID').cast('string'), F.lit('NULL_VENDOR')))
                else:
                    natural_key_components.append(F.lit('NO_VENDOR'))
                
                # 4. Distancia del viaje (como diferenciador adicional)
                if 'TRIP_DISTANCE' in sdf.columns:
                    natural_key_components.append(F.coalesce(F.col('TRIP_DISTANCE').cast('string'), F.lit('NULL_DIST')))
                else:
                    natural_key_components.append(F.lit('NO_DIST'))
                
                # Crear clave natural concatenada
                sdf = sdf.withColumn('NATURAL_KEY', F.concat_ws('|', *natural_key_components))
                
                # Verificar calidad de la clave natural
                sample_keys = sdf.select('NATURAL_KEY').limit(3).collect()
                print(f"   🔍 Ejemplos clave natural: {[row['NATURAL_KEY'] for row in sample_keys]}")
                
                # Contar claves nulas o problemáticas
                null_keys = sdf.filter(F.col('NATURAL_KEY').isNull() | (F.col('NATURAL_KEY') == '')).count()
                print(f"   📊 Claves naturales problemáticas: {null_keys:,}")
                
                if null_keys > 0:
                    print(f"   ⚠️ Hay {null_keys:,} registros con claves naturales problemáticas")
                
                # Eliminar duplicados
                if 'NATURAL_KEY' in sdf.columns:
                    # DIAGNÓSTICO: Contar antes de eliminar duplicados
                    count_before_dedup = sdf.count()
                    print(f"   📊 ANTES de eliminar duplicados: {count_before_dedup:,} filas")
                    
                    sdf = sdf.dropDuplicates(['NATURAL_KEY'])
                    
                    # DIAGNÓSTICO: Contar después de eliminar duplicados
                    count_after_dedup = sdf.count()
                    duplicates_removed = count_before_dedup - count_after_dedup
                    print(f"   📊 DESPUÉS de eliminar duplicados: {count_after_dedup:,} filas")
                    print(f"   🗑️ Duplicados eliminados: {duplicates_removed:,} filas")
                    
                    if duplicates_removed > (count_before_dedup * 0.8):  # Si se eliminan más del 80%
                        print(f"   ⚠️ ADVERTENCIA: Se eliminaron demasiados duplicados ({duplicates_removed/count_before_dedup*100:.1f}%)")
                        
                        # Verificar calidad de NATURAL_KEY
                        null_keys = sdf.filter(F.col('NATURAL_KEY').isNull()).count()
                        print(f"   🔍 NATURAL_KEY nulos: {null_keys:,}")
                        
                        # Mostrar algunos ejemplos de NATURAL_KEY
                        sample_keys = sdf.select('NATURAL_KEY').limit(5).collect()
                        print(f"   🔍 Ejemplos NATURAL_KEY: {[row['NATURAL_KEY'] for row in sample_keys]}")
                else:
                    print(f"   ⚠️ Sin columna NATURAL_KEY para deduplicar")
                
                # Normalizar nombres a mayúsculas
                for old_col in sdf.columns:
                    new_col = old_col.upper()
                    if old_col != new_col:
                        sdf = sdf.withColumnRenamed(old_col, new_col)
                
                # AGREGAR COLUMNAS FALTANTES PARA COMPATIBILIDAD - SCHEMA MEJORADO
                print(f"   📋 Normalizando esquema con metadatos completos...")
                
                # Esquemas mejorados con metadatos obligatorios
                expected_columns_green = [
                    # Metadatos obligatorios (primero para mejor organización)
                    'RUN_ID', 'SERVICE_TYPE', 'SOURCE_YEAR', 'SOURCE_MONTH', 
                    'INGESTED_AT_UTC', 'SOURCE_PATH', 'NATURAL_KEY',
                    'FILE_SIZE_BYTES', 'ORIGINAL_FILE_ROWS', 'BATCH_NUMBER', 'BATCH_SIZE', 'TOTAL_FILE_ROWS',
                    
                    # Datos de negocio Green Taxi
                    'VENDORID', 'LPEP_PICKUP_DATETIME', 'LPEP_DROPOFF_DATETIME',
                    'PASSENGER_COUNT', 'TRIP_DISTANCE', 'RATECODEID', 
                    'STORE_AND_FWD_FLAG', 'PULOCATIONID', 'DOLOCATIONID',
                    'PAYMENT_TYPE', 'FARE_AMOUNT', 'EXTRA', 'MTA_TAX',
                    'TIP_AMOUNT', 'TOLLS_AMOUNT', 'IMPROVEMENT_SURCHARGE',
                    'TOTAL_AMOUNT', 'CONGESTION_SURCHARGE', 'AIRPORT_FEE', 'CBD_CONGESTION_FEE'
                ]
                
                expected_columns_yellow = [
                    # Metadatos obligatorios (primero para mejor organización)
                    'RUN_ID', 'SERVICE_TYPE', 'SOURCE_YEAR', 'SOURCE_MONTH',
                    'INGESTED_AT_UTC', 'SOURCE_PATH', 'NATURAL_KEY',
                    'FILE_SIZE_BYTES', 'ORIGINAL_FILE_ROWS', 'BATCH_NUMBER', 'BATCH_SIZE', 'TOTAL_FILE_ROWS',
                    
                    # Datos de negocio Yellow Taxi
                    'VENDORID', 'TPEP_PICKUP_DATETIME', 'TPEP_DROPOFF_DATETIME',
                    'PASSENGER_COUNT', 'TRIP_DISTANCE', 'RATECODEID',
                    'STORE_AND_FWD_FLAG', 'PULOCATIONID', 'DOLOCATIONID',
                    'PAYMENT_TYPE', 'FARE_AMOUNT', 'EXTRA', 'MTA_TAX',
                    'TIP_AMOUNT', 'TOLLS_AMOUNT', 'IMPROVEMENT_SURCHARGE',
                    'TOTAL_AMOUNT', 'CONGESTION_SURCHARGE', 'AIRPORT_FEE'
                ]
                
                # Seleccionar columnas esperadas según el tipo de servicio
                expected_columns = expected_columns_green if service_type == 'green' else expected_columns_yellow
                
                # Agregar columnas faltantes con valores apropiados por tipo
                current_columns = set(sdf.columns)
                for col in expected_columns:
                    if col not in current_columns:
                        print(f"   ➕ Agregando columna faltante: {col}")
                        
                        # Valores por defecto apropiados según el tipo de columna
                        if col in ['BATCH_NUMBER', 'BATCH_SIZE', 'TOTAL_FILE_ROWS']:
                            # Estas se agregarán en la función de carga
                            sdf = sdf.withColumn(col, F.lit(None).cast(T.IntegerType()))
                        elif col in ['FILE_SIZE_BYTES', 'ORIGINAL_FILE_ROWS']:
                            # Ya se agregaron antes
                            continue
                        elif 'DATETIME' in col or 'TIMESTAMP' in col:
                            sdf = sdf.withColumn(col, F.lit(None).cast(T.TimestampType()))
                        elif col in ['FARE_AMOUNT', 'TRIP_DISTANCE', 'TIP_AMOUNT', 'TOTAL_AMOUNT', 
                                   'EXTRA', 'MTA_TAX', 'TOLLS_AMOUNT', 'IMPROVEMENT_SURCHARGE',
                                   'CONGESTION_SURCHARGE', 'AIRPORT_FEE', 'CBD_CONGESTION_FEE']:
                            sdf = sdf.withColumn(col, F.lit(None).cast(T.DoubleType()))
                        elif col in ['VENDORID', 'PASSENGER_COUNT', 'RATECODEID', 'PULOCATIONID', 
                                   'DOLOCATIONID', 'PAYMENT_TYPE']:
                            sdf = sdf.withColumn(col, F.lit(None).cast(T.IntegerType()))
                        else:
                            sdf = sdf.withColumn(col, F.lit(None).cast(T.StringType()))
                
                # Reordenar columnas según el orden esperado y verificar existencia
                existing_columns = [col for col in expected_columns if col in sdf.columns]
                missing_columns = [col for col in expected_columns if col not in sdf.columns]
                
                if missing_columns:
                    print(f"   ⚠️ Columnas aún faltantes: {missing_columns}")
                    # Agregar columnas faltantes como string por defecto
                    for col in missing_columns:
                        sdf = sdf.withColumn(col, F.lit(None).cast(T.StringType()))
                
                # Seleccionar en el orden correcto
                sdf = sdf.select(*expected_columns)
                
                # DIAGNÓSTICO: Verificar después de reordenar columnas
                count_after_reorder = sdf.count()
                print(f"   📊 DESPUÉS de reordenar columnas: {count_after_reorder:,} filas, {len(sdf.columns)} columnas")
                
                if count_after_reorder == 0:
                    print(f"   ❌ ERROR: Se perdieron todas las filas en reordenamiento")
                    print(f"   🔍 Columnas esperadas: {expected_columns[:5]}...")
                    print(f"   🔍 Columnas actuales antes: {current_columns}")
                    continue
                
                # VALIDAR Y CORREGIR TIPOS DE DATOS PROBLEMÁTICOS
                print(f"   Validando tipos de datos...")
                
                # Definir tipos esperados para columnas críticas
                column_types = {
                    'VENDORID': T.IntegerType(),
                    'LPEP_PICKUP_DATETIME': T.TimestampType(),
                    'LPEP_DROPOFF_DATETIME': T.TimestampType(),
                    'TPEP_PICKUP_DATETIME': T.TimestampType(),
                    'TPEP_DROPOFF_DATETIME': T.TimestampType(),
                    'PASSENGER_COUNT': T.IntegerType(),
                    'TRIP_DISTANCE': T.DoubleType(),
                    'RATECODEID': T.IntegerType(),
                    'PULOCATIONID': T.IntegerType(),
                    'DOLOCATIONID': T.IntegerType(),
                    'PAYMENT_TYPE': T.IntegerType(),
                    'FARE_AMOUNT': T.DoubleType(),
                    'EXTRA': T.DoubleType(),
                    'MTA_TAX': T.DoubleType(),
                    'TIP_AMOUNT': T.DoubleType(),
                    'TOLLS_AMOUNT': T.DoubleType(),
                    'IMPROVEMENT_SURCHARGE': T.DoubleType(),
                    'TOTAL_AMOUNT': T.DoubleType(),
                    'CONGESTION_SURCHARGE': T.DoubleType(),
                    'AIRPORT_FEE': T.DoubleType(),
                    'CBD_CONGESTION_FEE': T.DoubleType()
                }
                
                # Corregir tipos de datos problemáticos
                for col_name, expected_type in column_types.items():
                    if col_name in sdf.columns:
                        current_type = sdf.schema[col_name].dataType
                        
                        # Si es NullType o tipo incorrecto, convertir
                        if isinstance(current_type, T.NullType) or current_type != expected_type:
                            print(f"   Corrigiendo tipo {col_name}: {current_type} -> {expected_type}")
                            
                            # Para timestamps, manejar casos especiales
                            if isinstance(expected_type, T.TimestampType):
                                sdf = sdf.withColumn(col_name, 
                                    F.when(F.col(col_name).isNull(), None)
                                     .otherwise(F.col(col_name).cast(expected_type)))
                            else:
                                sdf = sdf.withColumn(col_name, F.col(col_name).cast(expected_type))
                
                print(f"   DataFrame ajustado: {len(sdf.columns)} columnas, tipos validados")
                
                # VERIFICAR TIMESTAMPS ANTES DE CARGAR (OPTIMIZADO)
                print(f"   Verificando conversión de timestamps...")
                
                # DIAGNÓSTICO ESPECIAL: Si hay muy pocas filas, mostrar todo
                final_count = sdf.count()
                print(f"   📊 CONTEO FINAL antes de cargar: {final_count:,} filas")
                
                if final_count <= 10:
                    print(f"   🔍 ARCHIVO PEQUEÑO - Mostrando todas las filas:")
                    sdf.show(truncate=False)
                    
                    print(f"   🔍 Esquema final:")
                    sdf.printSchema()
                
                # Solo verificar si hay columnas timestamp para debug ligero
                ts_columns = [col for col in ['LPEP_PICKUP_DATETIME', 'LPEP_DROPOFF_DATETIME', 'TPEP_PICKUP_DATETIME', 'TPEP_DROPOFF_DATETIME'] if col in sdf.columns]
                if ts_columns:
                    print(f"   Columnas timestamp presentes: {ts_columns}")
                    # Verificación rápida: solo contar registros no nulos (más eficiente que collect)
                    for ts_col in ts_columns[:2]:  # Solo verificar las primeras 2
                        non_null_count = sdf.filter(F.col(ts_col).isNotNull()).count()
                        print(f"   {ts_col}: {non_null_count:,} valores no nulos")
                else:
                    print(f"   Sin columnas timestamp válidas")
                
                # Tabla destino
                table_name = "YELLOW_TAXI" if service_type == 'yellow' else "GREEN_TAXI"
                
                # CARGA IDEMPOTENTE con metadatos completos
                print(f"   🚀 Iniciando carga idempotente a {table_name}")
                rows_loaded, batch_audit = load_to_snowflake_idempotent(
                    sdf, table_name, service_type, year, month+1, sfOptions
                )
                
                if rows_loaded > 0:
                    total_files_processed += 1
                    total_rows_ingested += rows_loaded
                    
                    # Auditoría COMPLETA con metadatos de lotes
                    for batch_info in batch_audit:
                        auditoria.append({
                            'table_name': table_name,
                            'rows_ingested': batch_info['batch_rows'],
                            'batch_number': batch_info['batch_number'],
                            'batch_start_time': batch_info['batch_start_time'],
                            'batch_end_time': batch_info['batch_end_time'],
                            'run_id': run_id,
                            'service_type': service_type,
                            'source_year': year,
                            'source_month': month+1,
                            'ingested_at_utc': ingested_at_utc,
                            'source_path': fpath,
                            'file_size_bytes': os.path.getsize(fpath),
                            'original_file_rows': original_count,
                            'natural_key_strategy': f'{pickup_col}|PULOCATIONID|DOLOCATIONID|VENDORID|TRIP_DISTANCE'
                        })
                    
                    print(f"   ✅ {rows_loaded:,} filas cargadas con {len(batch_audit)} lotes auditados")
                else:
                    print(f"   ❌ No se cargaron filas para {fname}")
                
            except Exception as e:
                print(f"Error procesando {fname}: {str(e)}")
                # Limpiar cache en caso de error
                try:
                    sdf.unpersist()
                except:
                    pass
                continue
            
            finally:
                # OPTIMIZACIÓN: Siempre liberar memoria del cache
                try:
                    sdf.unpersist()
                except:
                    pass

print(f"\nRESUMEN:")
print(f"Archivos procesados: {total_files_processed}")
print(f"Total filas: {total_rows_ingested:,}")

# 9. Cargar taxi_zone_lookup
print("\nVerificando taxi_zone_lookup...")
try:
    zone_check = spark.read \
        .format("net.snowflake.spark.snowflake") \
        .options(**sfOptions) \
        .option("query", "SELECT COUNT(*) as cnt FROM TAXI_ZONE_LOOKUP") \
        .load()
    
    zone_count = zone_check.collect()[0]['cnt']
    print(f"TAXI_ZONE_LOOKUP tiene {zone_count} registros")
        
except Exception as e:
    print(f"Error verificando taxi_zone_lookup: {str(e)}")

# 10. Registrar auditoría COMPLETA con metadatos de lotes
if auditoria:
    print(f"\n📋 Registrando auditoría completa ({len(auditoria)} entradas de lotes)...")
    try:
        audit_df = spark.createDataFrame(auditoria)
        
        # Renombrar columnas para tabla de auditoría
        column_mapping = {
            'table_name': 'TABLE_NAME',
            'rows_ingested': 'ROWS_INGESTED', 
            'batch_number': 'BATCH_NUMBER',
            'batch_start_time': 'BATCH_START_TIME',
            'batch_end_time': 'BATCH_END_TIME',
            'run_id': 'RUN_ID',
            'service_type': 'SERVICE_TYPE',
            'source_year': 'SOURCE_YEAR',
            'source_month': 'SOURCE_MONTH',
            'ingested_at_utc': 'INGESTED_AT_UTC',
            'source_path': 'SOURCE_PATH',
            'file_size_bytes': 'FILE_SIZE_BYTES',
            'original_file_rows': 'ORIGINAL_FILE_ROWS',
            'natural_key_strategy': 'NATURAL_KEY_STRATEGY'
        }
        
        for old_col, new_col in column_mapping.items():
            if old_col in audit_df.columns:
                audit_df = audit_df.withColumnRenamed(old_col, new_col)
        
        # Agregar timestamp de auditoría
        audit_df = audit_df.withColumn('AUDIT_TIMESTAMP_UTC', F.lit(datetime.utcnow()))
        
        # Cargar auditoría
        audit_df.write \
            .format("net.snowflake.spark.snowflake") \
            .options(**sfOptions) \
            .option("dbtable", "RAW_AUDITORIA") \
            .mode("append") \
            .save()
        
        print(f"✅ Auditoría registrada con {len(auditoria)} entradas de lotes")
        
        # Mostrar resumen de auditoría
        print(f"\n📊 RESUMEN DE AUDITORÍA:")
        audit_summary = audit_df.groupBy('TABLE_NAME', 'SERVICE_TYPE', 'SOURCE_YEAR', 'SOURCE_MONTH') \
            .agg(
                F.sum('ROWS_INGESTED').alias('TOTAL_ROWS'),
                F.max('BATCH_NUMBER').alias('MAX_BATCH'),
                F.min('BATCH_START_TIME').alias('START_TIME'),
                F.max('BATCH_END_TIME').alias('END_TIME')
            ).collect()
        
        for row in audit_summary:
            print(f"   {row['TABLE_NAME']}: {row['TOTAL_ROWS']:,} filas en {row['MAX_BATCH']} lotes")
            
    except Exception as e:
        print(f"❌ Error registrando auditoría: {str(e)}")

print(f"\n🎉 INGESTA COMPLETADA - RESUMEN FINAL")
print(f"═══════════════════════════════════════")
print(f"📋 ESTRATEGIA IMPLEMENTADA:")
print(f"   • Partición lógica por año/mes en tablas por servicio")
print(f"   • Idempotencia con DELETE + INSERT por partición")
print(f"   • Clave natural robusta: pickup_datetime|pu|do|vendor|distance")
print(f"   • Metadatos completos por registro y lote")
print(f"")
print(f"📊 ESTADÍSTICAS:")
print(f"   • Run ID: {run_id}")
print(f"   • Archivos procesados: {total_files_processed}")
print(f"   • Total filas ingested: {total_rows_ingested:,}")
print(f"   • Timestamp conversión mejorada: ✅")
print(f"   • Auditoría completa: ✅")

# NO cerrar Spark aquí para evitar problemas de conexión

ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent call last):
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1038, in send_command
    response = connection.send_command(command)
               ^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/clientserver.py", line 539, in send_command
    raise Py4JNetworkError(
py4j.protocol.Py4JNetworkError: Error while sending or receiving


Error cerrando sesiones anteriores (normal): [Errno 111] Connection refused
Creando nueva sesión Spark...


ConnectionRefusedError: [Errno 111] Connection refused

In [34]:
# INGESTA SIMPLE A SNOWFLAKE RAW - VERSIÓN FUNCIONAL
# 
# ESTRATEGIA ELEGIDA: 
# - Una tabla por servicio (GREEN_TAXI, YELLOW_TAXI) con partición lógica por año/mes
# - Metadatos obligatorios en cada registro
# - Clave natural para idempotencia: pickup_datetime + PULocationID + DOLocationID + VendorID
# - Sin upserts, solo INSERT directo a capa RAW
#
# JUSTIFICACIÓN:
# - Mantiene esquemas específicos por servicio
# - Permite consultas eficientes con filtros por año/mes
# - Auditoría completa con metadatos en cada registro

import os
import pyspark 
from datetime import datetime
from pyspark.sql import functions as F
from pyspark.sql import types as T

# 1. Configuración básica
print("=== CONFIGURACIÓN BÁSICA ===")

# Variables de configuración
service_types = ['green']  # Empezamos solo con green
start_year = 2015
end_year = 2015
months = [1, 2]  # Solo enero y febrero para prueba
data_dir = '/home/jovyan/work/data'

# Generar run_id único
run_id = f"run_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}"
ingested_at_utc = datetime.utcnow().isoformat()

print(f"Run ID: {run_id}")
print(f"Timestamp: {ingested_at_utc}")

# 2. Configuración Snowflake
sfOptions = {
    "sfURL": "YKFGMFI-GRC01155.snowflakecomputing.com",
    "sfUser": "MARTIN",
    "sfPassword": "P7kh2nUSu727FKZ",
    "sfDatabase": "NY_TAXI",
    "sfSchema": "RAW",
    "sfWarehouse": "COMPUTE_WH",
    "sfRole": "ACCOUNTADMIN"
}

print("✅ Configuración lista")

=== CONFIGURACIÓN BÁSICA ===
Run ID: run_20251019_010247
Timestamp: 2025-10-19T01:02:47.729833
✅ Configuración lista


In [35]:
# 3. Función para convertir timestamps de epoch milisegundos a timestamp
def convert_epoch_to_timestamp(df, column_name):
    """
    Convierte columnas de epoch en milisegundos a timestamp
    Ejemplo: 1420072270000 -> 2015-01-01 01:57:50
    Verifica el tipo de datos antes de convertir
    """
    if column_name in df.columns:
        # Verificar el tipo de datos de la columna
        current_type = df.schema[column_name].dataType
        print(f"🔍 Tipo de datos de {column_name}: {current_type}")
        
        # Si ya es timestamp, no hacer nada
        if isinstance(current_type, T.TimestampType):
            print(f"✅ {column_name} ya es timestamp, no necesita conversión")
            return df
        
        # Si es LongType (epoch), convertir
        elif isinstance(current_type, T.LongType):
            print(f"🔄 Convirtiendo {column_name} de epoch a timestamp")
            df = df.withColumn(
                column_name, 
                F.when(F.col(column_name).isNull(), None)
                 .when(F.col(column_name) == 0, None)
                 .otherwise(F.from_unixtime(F.col(column_name) / 1000).cast(T.TimestampType()))
            )
            print(f"✅ Convertido {column_name} de epoch a timestamp")
        
        # Si es StringType, intentar conversión directa
        elif isinstance(current_type, T.StringType):
            print(f"🔄 Convirtiendo {column_name} de string a timestamp")
            df = df.withColumn(
                column_name,
                F.when(F.col(column_name).isNull(), None)
                 .when(F.col(column_name) == "", None)
                 .otherwise(F.to_timestamp(F.col(column_name)))
            )
            print(f"✅ Convertido {column_name} de string a timestamp")
        
        else:
            print(f"⚠️ Tipo no soportado para {column_name}: {current_type}")
    else:
        print(f"⚠️ Columna {column_name} no encontrada")
    
    return df

# 4. Función para agregar metadatos obligatorios
def add_metadata(df, service_type, source_year, source_month, source_path):
    """
    Agrega metadatos obligatorios a cada registro
    """
    df = df.withColumn('run_id', F.lit(run_id)) \
           .withColumn('service_type', F.lit(service_type)) \
           .withColumn('source_year', F.lit(source_year)) \
           .withColumn('source_month', F.lit(source_month)) \
           .withColumn('ingested_at_utc', F.lit(ingested_at_utc)) \
           .withColumn('source_path', F.lit(source_path))
    
    print(f"✅ Metadatos agregados para {service_type} {source_year}-{source_month:02d}")
    return df

print("✅ Funciones auxiliares definidas")

✅ Funciones auxiliares definidas


In [19]:
# 5. Función para crear clave natural
def create_natural_key(df, service_type):
    """
    Crea clave natural para idempotencia
    Formato: pickup_datetime|PULocationID|DOLocationID|VendorID
    """
    # Determinar columna de pickup según el servicio
    pickup_col = 'lpep_pickup_datetime' if service_type == 'green' else 'tpep_pickup_datetime'
    
    # Crear componentes de la clave natural
    components = [
        F.coalesce(F.col(pickup_col).cast('string'), F.lit('NULL')),
        F.coalesce(F.col('PULocationID').cast('string'), F.lit('NULL')),
        F.coalesce(F.col('DOLocationID').cast('string'), F.lit('NULL')),
        F.coalesce(F.col('VendorID').cast('string'), F.lit('NULL'))
    ]
    
    # Concatenar con separador
    df = df.withColumn('natural_key', F.concat_ws('|', *components))
    
    print(f"✅ Clave natural creada para {service_type}")
    return df

# 6. Función simple de carga a Snowflake
def load_to_snowflake(df, table_name):
    """
    Carga simple a Snowflake usando mode append
    """
    try:
        print(f"🚀 Cargando {df.count():,} registros a {table_name}")
        
        df.write \
            .format("net.snowflake.spark.snowflake") \
            .options(**sfOptions) \
            .option("dbtable", table_name) \
            .mode("append") \
            .save()
        
        print(f"✅ Carga exitosa a {table_name}")
        return True
        
    except Exception as e:
        print(f"❌ Error cargando a {table_name}: {e}")
        return False

print("✅ Funciones de carga definidas")

✅ Funciones de carga definidas


In [36]:
# INICIALIZAR SPARK SESSION
print("=== INICIALIZANDO SPARK ===")

from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T

# Configuración de JARs para Snowflake
jars_dir = '/home/jovyan/work/jars'
spark_jars = f"{jars_dir}/spark-snowflake_2.12-2.12.0-spark_3.4.jar,{jars_dir}/snowflake-jdbc-3.14.4.jar"

# Crear sesión Spark simple
print("🚀 Creando sesión Spark...")

spark = SparkSession.builder \
    .appName("NYC_Taxi_Ingesta_Simple") \
    .config("spark.jars", spark_jars) \
    .config("spark.driver.memory", "2g") \
    .config("spark.executor.memory", "2g") \
    .getOrCreate()

# Verificar que funciona
test_count = spark.range(5).count()
print(f"✅ Spark funcionando correctamente - Test: {test_count} registros")
print(f"✅ Versión Spark: {spark.version}")

# Mostrar configuración activa
print(f"✅ JARs configurados: {len(spark_jars.split(','))} archivos")

=== INICIALIZANDO SPARK ===
🚀 Creando sesión Spark...
✅ Spark funcionando correctamente - Test: 5 registros
✅ Versión Spark: 3.5.0
✅ JARs configurados: 2 archivos


In [None]:
# 7. PROCESAMIENTO PRINCIPAL - PASO A PASO
print("=== INICIANDO PROCESAMIENTO ===")

# Contadores para auditoría
total_files_processed = 0
total_rows_ingested = 0
audit_records = []

for service_type in service_types:
    print(f"\n📋 Procesando servicio: {service_type.upper()}")
    
    for year in range(start_year, end_year + 1):
        for month in months:
            # Construir nombre de archivo
            fname = f'{service_type}_tripdata_{year}-{month:02d}.parquet'
            fpath = os.path.join(data_dir, fname)
            
            print(f"\n📁 Procesando: {fname}")
            
            # Verificar si existe el archivo
            if not os.path.isfile(fpath):
                print(f"⚠️ Archivo no encontrado: {fname}")
                continue
            
            try:
                # PASO 1: Leer archivo Parquet
                print("   📖 Leyendo archivo Parquet...")
                df = spark.read.parquet(fpath)
                original_count = df.count()
                print(f"   📊 Registros originales: {original_count:,}")
                
                if original_count == 0:
                    print("   ⚠️ Archivo vacío, saltando...")
                    continue
                
                # PASO 2: Convertir timestamps de epoch a timestamp
                print("   🕐 Convirtiendo timestamps...")
                if service_type == 'green':
                    df = convert_epoch_to_timestamp(df, 'lpep_pickup_datetime')
                    df = convert_epoch_to_timestamp(df, 'lpep_dropoff_datetime')
                else:  # yellow
                    df = convert_epoch_to_timestamp(df, 'tpep_pickup_datetime')
                    df = convert_epoch_to_timestamp(df, 'tpep_dropoff_datetime')
                
                # PASO 3: Agregar metadatos obligatorios
                print("   📋 Agregando metadatos...")
                df = add_metadata(df, service_type, year, month, fpath)
                
                # PASO 4: Crear clave natural
                print("   🔑 Creando clave natural...")
                df = create_natural_key(df, service_type)
                
                # PASO 5: Eliminar duplicados usando clave natural
                print("   🧹 Eliminando duplicados...")
                count_before = df.count()
                df = df.dropDuplicates(['natural_key'])
                count_after = df.count()
                duplicates_removed = count_before - count_after
                print(f"   📊 Duplicados eliminados: {duplicates_removed:,}")
                
                # PASO 6: Cargar a Snowflake
                table_name = "GREEN_TAXI" if service_type == 'green' else "YELLOW_TAXI"
                success = load_to_snowflake(df, table_name)
                
                if success:
                    total_files_processed += 1
                    total_rows_ingested += count_after
                    
                    # Registrar para auditoría
                    audit_records.append({
                        'table': table_name,
                        'rows_ingested': count_after,
                        'run_id': run_id,
                        'service_type': service_type,
                        'source_year': year,
                        'source_month': month,
                        'ingested_at_utc': ingested_at_utc,
                        'source_path': fpath
                    })
                    
                    print(f"   ✅ {count_after:,} registros cargados exitosamente")
                else:
                    print(f"   ❌ Error en la carga")
                
            except Exception as e:
                print(f"   ❌ Error procesando {fname}: {e}")
                continue

print(f"\n🎉 PROCESAMIENTO COMPLETADO")
print(f"📊 Archivos procesados: {total_files_processed}")
print(f"📊 Total registros ingested: {total_rows_ingested:,}")

=== INICIANDO PROCESAMIENTO ===

📋 Procesando servicio: GREEN

📁 Procesando: green_tripdata_2015-01.parquet
   📖 Leyendo archivo Parquet...
   📊 Registros originales: 1,508,493
   🕐 Convirtiendo timestamps...
🔍 Tipo de datos de lpep_pickup_datetime: TimestampNTZType()
⚠️ Tipo no soportado para lpep_pickup_datetime: TimestampNTZType()
🔍 Tipo de datos de lpep_dropoff_datetime: TimestampNTZType()
⚠️ Tipo no soportado para lpep_dropoff_datetime: TimestampNTZType()
   📋 Agregando metadatos...
✅ Metadatos agregados para green 2015-01
   🔑 Creando clave natural...
✅ Clave natural creada para green
   🧹 Eliminando duplicados...
   📊 Duplicados eliminados: 3,278


In [14]:
# 8. REGISTRAR AUDITORÍA
print("\n=== REGISTRANDO AUDITORÍA ===")

if audit_records:
    try:
        # Crear DataFrame de auditoría
        print(f"📝 Creando registro de auditoría con {len(audit_records)} entradas...")
        
        audit_df = spark.createDataFrame(audit_records)
        
        # Cargar a tabla de auditoría
        audit_df.write \
            .format("net.snowflake.spark.snowflake") \
            .options(**sfOptions) \
            .option("dbtable", "RAW_AUDITORIA") \
            .mode("append") \
            .save()
        
        print("✅ Auditoría registrada exitosamente")
        
        # Mostrar resumen de auditoría
        print("\n📊 RESUMEN DE AUDITORÍA:")
        for record in audit_records:
            print(f"   {record['table']}: {record['rows_ingested']:,} registros ({record['source_year']}-{record['source_month']:02d})")
        
    except Exception as e:
        print(f"❌ Error registrando auditoría: {e}")
else:
    print("⚠️ No hay registros para auditoría")

print(f"\n🏁 PROCESO COMPLETADO")
print(f"Run ID: {run_id}")
print(f"Archivos procesados: {total_files_processed}")
print(f"Total registros: {total_rows_ingested:,}")


=== REGISTRANDO AUDITORÍA ===
⚠️ No hay registros para auditoría

🏁 PROCESO COMPLETADO
Run ID: run_20251019_003137
Archivos procesados: 0
Total registros: 0


In [31]:
# INGESTA RAW PURA - SIN TRANSFORMACIONES (CON COMPATIBILIDAD SNOWFLAKE)
# 
# PRINCIPIO: Datos tal como vienen + metadatos mínimos obligatorios
# - Preservar valores originales (no transformar contenido)
# - Solo ajustar tipos para compatibilidad con Snowflake 
# - Agregar metadatos de ingesta obligatorios
# - Eliminar duplicados ANTES de cargar (para idempotencia)

print("=== INGESTA RAW PURA (COMPATIBLE SNOWFLAKE) ===")

# 1. Configuración mínima
service_types = ['green']
start_year = 2015
end_year = 2015  
months = [1]
data_dir = '/home/jovyan/work/data'

# Metadatos de run
run_id = f"raw_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}"
ingested_at_utc = datetime.utcnow().isoformat()

print(f"Raw Run ID: {run_id}")

# 2. Función para hacer tipos compatibles con Snowflake (SIN cambiar valores)
def make_snowflake_compatible(df):
    """
    Convierte solo tipos problemáticos para Snowflake
    PRESERVA los valores originales - solo cambia el tipo de dato
    """
    print("   🔧 Ajustando tipos para Snowflake (preservando valores)...")
    
    for field in df.schema.fields:
        col_name = field.name
        current_type = field.dataType
        
        # Convertir TimestampNTZType a String para preservar valores exactos
        if isinstance(current_type, T.TimestampType):
            print(f"     📅 {col_name}: TimestampType → String (preservando valores)")
            df = df.withColumn(col_name, F.col(col_name).cast(T.StringType()))
        
        # Otros tipos problemáticos se pueden agregar aquí si aparecen
        
    return df

# 3. Función RAW - sin transformaciones de contenido
def add_raw_metadata_only(df, service_type, source_year, source_month, source_path):
    """
    Solo agrega metadatos obligatorios - NO transforma datos originales
    """
    return df.withColumn('raw_run_id', F.lit(run_id)) \
             .withColumn('raw_service_type', F.lit(service_type)) \
             .withColumn('raw_source_year', F.lit(source_year)) \
             .withColumn('raw_source_month', F.lit(source_month)) \
             .withColumn('raw_ingested_at_utc', F.lit(ingested_at_utc)) \
             .withColumn('raw_source_path', F.lit(source_path))

# 4. Clave natural RAW - usando datos originales
def create_raw_natural_key(df, service_type):
    """
    Crea clave natural con datos originales (sin transformar)
    """
    pickup_col = 'lpep_pickup_datetime' if service_type == 'green' else 'tpep_pickup_datetime'
    
    components = [
        F.coalesce(F.col(pickup_col).cast('string'), F.lit('NULL')),
        F.coalesce(F.col('PULocationID').cast('string'), F.lit('NULL')),  
        F.coalesce(F.col('DOLocationID').cast('string'), F.lit('NULL')),
        F.coalesce(F.col('VendorID').cast('string'), F.lit('NULL'))
    ]
    
    return df.withColumn('raw_natural_key', F.concat_ws('|', *components))

print("✅ Funciones RAW compatibles definidas")

=== INGESTA RAW PURA (COMPATIBLE SNOWFLAKE) ===
Raw Run ID: raw_20251019_005946
✅ Funciones RAW compatibles definidas


In [38]:
# 2.5. Función para convertir época Unix a timestamp legible (solo si es necesario)
def convert_epoch_columns_to_timestamp(df):
    """
    Convierte columnas con tiempo en formato Unix epoch (milisegundos) a timestamp legible
    SOLO si la columna es numérica (Long/Integer). Si ya es timestamp, la deja tal como está.
    """
    print("   🕐 Verificando y convirtiendo columnas de época Unix...")
    
    # Columnas que potencialmente contienen época Unix en milisegundos 
    potential_epoch_columns = ['lpep_pickup_datetime', 'lpep_dropoff_datetime', 'tpep_pickup_datetime', 'tpep_dropoff_datetime']
    
    for col_name in potential_epoch_columns:
        if col_name in df.columns:
            # Obtener el tipo de dato de la columna
            col_type = None
            for field in df.schema.fields:
                if field.name == col_name:
                    col_type = field.dataType
                    break
            
            # Solo convertir si es un tipo numérico (LongType, IntegerType)
            if isinstance(col_type, (T.LongType, T.IntegerType)):
                print(f"     📅 {col_name} es {type(col_type).__name__} - Convirtiendo de época Unix (ms) a timestamp")
                df = df.withColumn(
                    col_name, 
                    F.from_unixtime(F.col(col_name) / 1000).cast(T.TimestampType())
                )
            elif isinstance(col_type, T.TimestampType):
                print(f"     ✅ {col_name} ya es TimestampType - No requiere conversión")
            else:
                print(f"     ⚠️ {col_name} es {type(col_type).__name__} - Tipo no reconocido para fecha")
    
    return df

In [44]:
df = spark.read.parquet("/home/jovyan/work/data/green_tripdata_2015-01.parquet")
df.printSchema()
df.select("lpep_dropoff_datetime").show()
df = df.withColumn(
    "lpep_dropoff_datetime",
    to_timestamp(col("lpep_dropoff_datetime"), "yyyy-MM-dd HH:mm:ss")
)

root
 |-- VendorID: long (nullable = true)
 |-- lpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- lpep_dropoff_datetime: timestamp_ntz (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: integer (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- trip_type: double (nullable = true)
 |-- congestion_surcharge: integer (nullable = true)

+---------------------+
|lpep_dropoff_datetime|
+---------------------+
|  2015-01-01 00:50:4

root
 |-- VendorID: long (nullable = true)
 |-- lpep_pickup_datetime: timestamp_ntz (nullable = true)
 |-- lpep_dropoff_datetime: timestamp (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: integer (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- trip_type: double (nullable = true)
 |-- congestion_surcharge: integer (nullable = true)



In [46]:
df.select("lpep_dropoff_datetime").show()

+---------------------+
|lpep_dropoff_datetime|
+---------------------+
|  2015-01-01 00:50:41|
|  2015-01-01 00:03:30|
|  2015-01-01 00:33:26|
|  2015-01-01 00:27:07|
|  2015-01-01 00:40:32|
|  2015-01-01 01:01:00|
|  2015-01-01 01:15:52|
|  2015-01-01 00:10:52|
|  2015-01-01 00:18:55|
|  2015-01-01 00:46:26|
|  2015-01-01 01:10:09|
|  2015-01-01 01:20:19|
|  2015-01-01 00:39:42|
|  2015-01-01 01:14:53|
|  2015-01-01 00:47:57|
|  2015-01-01 01:29:36|
|  2015-01-01 00:19:41|
|  2015-01-01 00:28:00|
|  2015-01-01 00:49:47|
|  2015-01-01 00:55:29|
+---------------------+
only showing top 20 rows



In [None]:
df.

In [37]:
# 2.6. Función de diagnóstico para mostrar esquema
def show_dataframe_schema(df, label="DataFrame"):
    """
    Muestra el esquema del DataFrame para diagnóstico
    """
    print(f"   📋 Esquema de {label}:")
    for field in df.schema.fields:
        print(f"     - {field.name}: {type(field.dataType).__name__}")
    return df

In [None]:
# 4. PROCESAMIENTO RAW PURO
print("\n=== PROCESAMIENTO RAW (SIN TRANSFORMACIONES) ===")

total_files_processed = 0
total_rows_ingested = 0  
audit_records = []

for service_type in service_types:
    print(f"\n📋 Servicio: {service_type.upper()}")
    
    for year in range(start_year, end_year + 1):
        for month in months:
            fname = f'{service_type}_tripdata_{year}-{month:02d}.parquet'
            fpath = os.path.join(data_dir, fname)
            
            print(f"\n📁 Procesando: {fname}")
            
            if not os.path.isfile(fpath):
                print(f"⚠️ Archivo no encontrado")
                continue
            
            try:
                # PASO 1: Leer tal como viene
                print("   📖 Leyendo archivo RAW...")
                df = spark.read.parquet(fpath)
                original_count = df.count()
                print(f"   📊 Registros: {original_count:,}")
                
                # PASO 1.1: Mostrar esquema original para diagnóstico
                df = show_dataframe_schema(df, "archivo original")
                
                if original_count == 0:
                    continue
                
                # PASO 1.5: Convertir época Unix a timestamp legible
                print("   🕐 Convirtiendo época Unix a timestamp...")
                df = convert_epoch_columns_to_timestamp(df)
                
                # PASO 2: Hacer tipos compatibles con Snowflake
                print("   🔧 Ajustando tipos para Snowflake...")
                df = make_snowflake_compatible(df)
                
                # PASO 3: Solo agregar metadatos RAW (sin transformar datos)
                print("   📋 Agregando metadatos RAW...")
                df = add_raw_metadata_only(df, service_type, year, month, fpath)
                
                # PASO 4: Clave natural con datos originales
                print("   🔑 Clave natural RAW...")
                df = create_raw_natural_key(df, service_type)
                
                # PASO 5: Eliminar duplicados (única "transformación" permitida)
                print("   🧹 Eliminando duplicados...")
                count_before = df.count()
                df = df.dropDuplicates(['raw_natural_key'])
                count_after = df.count()
                print(f"   📊 {count_before - count_after:,} duplicados eliminados")
                
                # PASO 6: Mostrar esquema original preservado
                print("   📋 Esquema preservado:")
                original_columns = [col for col in df.columns if not col.startswith('raw_')]
                print(f"   📊 Columnas originales: {len(original_columns)}")
                print(f"   📊 Metadatos agregados: {len([col for col in df.columns if col.startswith('raw_')])}")
                
                # PASO 7: Cargar a Snowflake RAW
                table_name = "GREEN_TAXI" if service_type == 'green' else "YELLOW_TAXI"
                print(f"   🚀 Cargando {count_after:,} registros RAW a {table_name}")
                
                df.write \
                    .format("net.snowflake.spark.snowflake") \
                    .options(**sfOptions) \
                    .option("dbtable", table_name) \
                    .mode("append") \
                    .save()
                
                print(f"   ✅ Carga RAW exitosa")
                
                # Auditoría
                audit_records.append({
                    'table': table_name,
                    'rows_ingested': count_after,
                    'run_id': run_id,
                    'service_type': service_type,
                    'source_year': year,
                    'source_month': month,
                    'ingested_at_utc': ingested_at_utc,
                    'source_path': fpath
                })
                
                total_files_processed += 1
                total_rows_ingested += count_after
                
            except Exception as e:
                print(f"   ❌ Error: {e}")
                continue

print(f"\n🎉 INGESTA RAW COMPLETADA")
print(f"📊 Archivos: {total_files_processed}")
print(f"📊 Registros: {total_rows_ingested:,}")
print(f"📊 Estrategia: DATOS ORIGINALES + METADATOS MÍNIMOS")


=== PROCESAMIENTO RAW (SIN TRANSFORMACIONES) ===

📋 Servicio: GREEN

📁 Procesando: green_tripdata_2015-01.parquet
   📖 Leyendo archivo RAW...
   📊 Registros: 1,508,493
   🕐 Convirtiendo época Unix a timestamp...
   🕐 Convirtiendo columnas de época Unix a timestamp...
     📅 Convirtiendo lpep_pickup_datetime de época Unix (ms) a timestamp
   ❌ Error: [DATATYPE_MISMATCH.BINARY_OP_DIFF_TYPES] Cannot resolve "(lpep_pickup_datetime / 1000)" due to data type mismatch: the left and right operands of the binary operator have incompatible types ("TIMESTAMP_NTZ" and "INT").;
'Project [VendorID#3664L, cast(from_unixtime((lpep_pickup_datetime#3665 / 1000), yyyy-MM-dd HH:mm:ss, Some(Etc/UTC)) as timestamp) AS lpep_pickup_datetime#3729, lpep_dropoff_datetime#3666, store_and_fwd_flag#3667, RatecodeID#3668L, PULocationID#3669L, DOLocationID#3670L, passenger_count#3671L, trip_distance#3672, fare_amount#3673, extra#3674, mta_tax#3675, tip_amount#3676, tolls_amount#3677, ehail_fee#3678, improvement_surc

In [None]:
# 5. AUDITORÍA RAW
print("\n=== REGISTRANDO AUDITORÍA RAW ===")

if audit_records:
    try:
        audit_df = spark.createDataFrame(audit_records)
        
        audit_df.write \
            .format("net.snowflake.spark.snowflake") \
            .options(**sfOptions) \
            .option("dbtable", "RAW_AUDITORIA") \
            .mode("append") \
            .save()
        
        print("✅ Auditoría RAW registrada")
        
        for record in audit_records:
            print(f"   {record['table']}: {record['rows_ingested']:,} registros")
            
    except Exception as e:
        print(f"❌ Error auditoría: {e}")

print(f"\n🏁 PROCESO RAW COMPLETADO")
print(f"Run ID: {run_id}")

# 6. VERIFICACIÓN RAW
print("\n=== VERIFICACIÓN RAW ===")

try:
    green_count = spark.read \
        .format("net.snowflake.spark.snowflake") \
        .options(**sfOptions) \
        .option("query", f"SELECT COUNT(*) as count FROM GREEN_TAXI WHERE raw_run_id = '{run_id}'") \
        .load() \
        .collect()[0]['count']
    
    print(f"✅ GREEN_TAXI RAW: {green_count:,} registros")
    
    # Mostrar muestra de datos RAW (con timestamps originales)
    if green_count > 0:
        print("\n🔍 Muestra datos RAW (timestamps en formato original):")
        sample = spark.read \
            .format("net.snowflake.spark.snowflake") \
            .options(**sfOptions) \
            .option("query", f"SELECT lpep_pickup_datetime, lpep_dropoff_datetime, raw_run_id FROM GREEN_TAXI WHERE raw_run_id = '{run_id}' LIMIT 3") \
            .load()
        
        sample.show(3, truncate=False)
        
except Exception as e:
    print(f"⚠️ Error verificación: {e}")

print("\n✅ INGESTA RAW PURA COMPLETADA")
print("📋 PRINCIPIOS RESPETADOS:")
print("   • Datos originales preservados")
print("   • Solo metadatos de ingesta agregados")  
print("   • Sin transformaciones de tipos/valores")
print("   • Timestamps en formato original (epoch)")
print("   • Deduplicación antes de carga (idempotencia)")

In [50]:
# SOLUCIÓN DEFINITIVA AL ERROR SNOWFLAKE
# Convertir TODOS los tipos problemáticos a tipos compatibles

print("=== INGESTA RAW CON CONVERSIÓN FORZADA DE TIPOS ===")

# Configuración
service_types = ['green']
start_year = 2015
end_year = 2015  
months = [1]
data_dir = '/home/jovyan/work/data'
run_id = f"raw_fixed_{datetime.utcnow().strftime('%Y%m%d_%H%M%S')}"
ingested_at_utc = datetime.utcnow().isoformat()

print(f"Run ID: {run_id}")

def convert_all_to_snowflake_types(df):
    """
    SOLO convierte pickup_datetime y dropoff_datetime a TIMESTAMP_NTZ(9)
    Mantiene todas las demás columnas tal como están
    """
    print("   🔧 Convirtiendo SOLO pickup/dropoff a TIMESTAMP_NTZ(9)...")
    
    for field in df.schema.fields:
        col_name = field.name
        current_type = field.dataType
        
        # SOLO convertir timestamps de pickup/dropoff a formato TIMESTAMP_NTZ(9)
        if ("pickup_datetime" in col_name.lower() or "dropoff_datetime" in col_name.lower()):
            if "timestamp" in str(current_type).lower():
                print(f"     📅 {col_name}: {current_type} → TIMESTAMP_NTZ(9) format")
                df = df.withColumn(col_name, F.date_format(F.col(col_name), "yyyy-MM-dd HH:mm:ss.SSSSSSSSS"))
            elif isinstance(current_type, T.LongType):
                print(f"     🕐 {col_name}: LongType → TIMESTAMP_NTZ(9) format")
                df = df.withColumn(col_name, F.date_format(F.from_unixtime(F.col(col_name) / 1000), "yyyy-MM-dd HH:mm:ss.SSSSSSSSS"))
        
        # MANTENER todas las demás columnas sin cambios (comentado para claridad)
        # No convertir otros timestamps - mantener tipos originales
    
    print("   ✅ Otros tipos mantenidos sin cambios")
    return df

# Procesamiento simplificado
total_files_processed = 0
total_rows_ingested = 0
audit_records = []

for service_type in service_types:
    print(f"\n📋 Servicio: {service_type.upper()}")
    
    for year in range(start_year, end_year + 1):
        for month in months:
            fname = f'{service_type}_tripdata_{year}-{month:02d}.parquet'
            fpath = os.path.join(data_dir, fname)
            
            print(f"\n📁 Procesando: {fname}")
            
            if not os.path.isfile(fpath):
                print(f"⚠️ Archivo no encontrado")
                continue
            
            try:
                # PASO 1: Leer archivo
                print("   📖 Leyendo archivo...")
                df = spark.read.parquet(fpath)
                original_count = df.count()
                print(f"   📊 Registros: {original_count:,}")
                
                if original_count == 0:
                    continue
                
                # PASO 3: Convertir tipos problemáticos
                df = convert_all_to_snowflake_types(df)
                
                # PASO 4: Agregar metadatos
                print(" Agregando metadatos...")
                df = df.withColumn('run_id', F.lit(run_id)) \
                       .withColumn('service_type', F.lit(service_type)) \
                       .withColumn('source_year', F.lit(year)) \
                       .withColumn('source_month', F.lit(month)) \
                       .withColumn('ingested_at_utc', F.lit(ingested_at_utc)) \
                       .withColumn('source_path', F.lit(fpath))
                
                # PASO 5: Clave natural simple
                pickup_col = 'lpep_pickup_datetime' if service_type == 'green' else 'tpep_pickup_datetime'
                df = df.withColumn('raw_natural_key', 
                    F.concat_ws('|',
                        F.coalesce(F.col(pickup_col).cast('string'), F.lit('NULL')),
                        F.coalesce(F.col('VendorID').cast('string'), F.lit('NULL'))
                    ))
                
                # PASO 6: Eliminar duplicados
                print("   🧹 Eliminando duplicados...")
                count_before = df.count()
                df = df.dropDuplicates(['raw_natural_key'])
                count_after = df.count()
                print(f"   📊 {count_before - count_after:,} duplicados eliminados")
            
                # PASO 8: Cargar a Snowflake con configuración TIMESTAMP_NTZ
                table_name = "GREEN_TAXI" if service_type == 'green' else "YELLOW_TAXI"
                print(f"   🚀 Cargando {count_after:,} registros a {table_name}")
                
                df.write \
                    .format("net.snowflake.spark.snowflake") \
                    .options(**sfOptions) \
                    .option("dbtable", table_name) \
                    .option("keep_column_case", "on") \
                    .mode("append") \
                    .save()
                
                print(f"   ✅ ¡CARGA EXITOSA!")
                
                total_files_processed += 1
                total_rows_ingested += count_after
                
            except Exception as e:
                print(f"   ❌ Error: {e}")
                import traceback
                traceback.print_exc()
                continue

print(f"\n🎉 PROCESO COMPLETADO")
print(f"📊 Archivos procesados: {total_files_processed}")
print(f"📊 Total registros: {total_rows_ingested:,}")
print(f"✅ Timestamps convertidos a formato TIMESTAMP_NTZ(9) compatible")
print(f"")
print(f"📋 SIGUIENTE PASO EN SNOWFLAKE:")
print(f"   ALTER TABLE GREEN_TAXI ALTER COLUMN lpep_pickup_datetime SET DATA TYPE TIMESTAMP_NTZ(9);")
print(f"   ALTER TABLE GREEN_TAXI ALTER COLUMN lpep_dropoff_datetime SET DATA TYPE TIMESTAMP_NTZ(9);")

=== INGESTA RAW CON CONVERSIÓN FORZADA DE TIPOS ===
Run ID: raw_fixed_20251019_020617

📋 Servicio: GREEN

📁 Procesando: green_tripdata_2015-01.parquet
   📖 Leyendo archivo...
   📊 Registros: 1,508,493
   🔧 Convirtiendo SOLO pickup/dropoff a TIMESTAMP_NTZ(9)...
     📅 lpep_pickup_datetime: TimestampNTZType() → TIMESTAMP_NTZ(9) format
     📅 lpep_dropoff_datetime: TimestampNTZType() → TIMESTAMP_NTZ(9) format
   ✅ Otros tipos mantenidos sin cambios
 Agregando metadatos...
   🧹 Eliminando duplicados...
   📊 303,246 duplicados eliminados
   🚀 Cargando 1,205,247 registros a GREEN_TAXI
   ❌ Error: An error occurred while calling o903.save.
: java.sql.SQLException: Status of query associated with resultSet is FAILED_WITH_ERROR. Numeric value '2015-01-01 00:16:25.000000000' is not recognized
  File 'iHzq9zR9pa/1.CSV.gz', line 1, character 36
  Row 1, column "GREEN_TAXI"["SOURCE_YEAR":3]
  If you would like to continue loading when an error is encountered, use other values such as 'SKIP_FILE' or

Traceback (most recent call last):
  File "/tmp/ipykernel_219/205834128.py", line 109, in <module>
    .save()
     ^^^^^^
  File "/usr/local/spark/python/pyspark/sql/readwriter.py", line 1461, in save
    self._jwrite.save()
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/java_gateway.py", line 1322, in __call__
    return_value = get_return_value(
                   ^^^^^^^^^^^^^^^^^
  File "/usr/local/spark/python/pyspark/errors/exceptions/captured.py", line 179, in deco
    return f(*a, **kw)
           ^^^^^^^^^^^
  File "/usr/local/spark/python/lib/py4j-0.10.9.7-src.zip/py4j/protocol.py", line 326, in get_return_value
    raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o903.save.
: java.sql.SQLException: Status of query associated with resultSet is FAILED_WITH_ERROR. Numeric value '2015-01-01 00:16:25.000000000' is not recognized
  File 'iHzq9zR9pa/1.CSV.gz', line 1, character 36
  Row 1, column "GREEN_TAXI"["SOURCE_YEAR":3]
  If y

In [51]:
#df.select("lpep_dropoff_datetime").show()
df.printSchema()


root
 |-- VendorID: long (nullable = true)
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- RatecodeID: long (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: double (nullable = true)
 |-- extra: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- ehail_fee: integer (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- trip_type: double (nullable = true)
 |-- congestion_surcharge: integer (nullable = true)
 |-- raw_run_id: string (nullable = false)
 |-- raw_service_type: string (nullable = false)
 |-- raw_source_