In [0]:
%run ./00_config

In [0]:
import boto3
import os
from botocore.client import Config
import pyarrow.parquet as pq
from pyspark.sql.functions import current_timestamp, expr


REGION = "us-east-2"
NOMBRE_TABLA = BRONCE_TAXI_TABLE
FILE_KEY = FILE_KEY_TAXI_TABLE

# Ruta temporal local
local_path = f"/tmp/{FILE_KEY.split('/')[-1]}"

try:
    # --- AJUSTE DE CONEXI√ìN---
    
    try:
        print(f"1. Descargando a {local_path} ...")
        
        s3 = boto3.client(
            's3',
            aws_access_key_id=ACCESS_KEY.strip(),
            aws_secret_access_key=SECRET_KEY.strip(),
            region_name="us-east-2", # regi√≥n del bucket
            config=Config(signature_version='s3v4') # Se fuerza protocolo de firma v4
        )

        # Verificamos si el archivo ya existe para evitar descargar basura de intentos fallidos
        if os.path.exists(local_path):
            os.remove(local_path)

        s3.download_file(BUCKET_NAME, FILE_KEY, local_path)
        print("‚úÖ Descarga completada con √©xito.")

    except Exception as e:
        print(f"‚ùå Error persistente: {str(e)}")
        raise e # Detenemos la ejecuci√≥n si falla la descarga

    # --- Procesamiento por Lotes (Batching) ---
    print(f"2. Escribiendo en la tabla administrada: '{NOMBRE_TABLA}'...")
    
    parquet_file = pq.ParquetFile(local_path)
    total_batches = parquet_file.num_row_groups
    print(f"   Total de grupos a procesar: {total_batches}")

    for i in range(total_batches):
        # 1. Leer lote con Pandas
        batch = parquet_file.read_row_group(i)
        pdf = batch.to_pandas()
        
        # 2. Convertir a Spark y agregar columna de auditor√≠a y id
        df_chunk = spark.createDataFrame(pdf) \
            .withColumn("ingestion_timestamp", current_timestamp()) \
            .withColumn("id", expr("uuid()"))
        
        # 3. GUARDAR COMO TABLA DELTA
        if i == 0:
            # Sobreescribimos la primera vez para asegurar el esquema correcto
            df_chunk.write.mode("overwrite") \
                .option("overwriteSchema", "true") \
                .saveAsTable(NOMBRE_TABLA)
        else:
            # Append para los siguientes lotes
            df_chunk.write.mode("append") \
                .option("mergeSchema", "true") \
                .saveAsTable(NOMBRE_TABLA)
        
        if i % 10 == 0 or i == total_batches - 1:
            print(f"   ‚úÖ Lote {i+1}/{total_batches} guardado.")

    print(f"\nüéâ Datos en capa Bronze: '{NOMBRE_TABLA}'")
    
    # --- PASO 3: Verificar Resultado ---
    df_final = spark.table(NOMBRE_TABLA)
    print(f"   Total de filas: {df_final.count()}")
    display(df_final.limit(5))

    # Limpieza
    if os.path.exists(local_path):
        os.remove(local_path)

except Exception as e:
    print(f"‚ùå Error: {str(e)}")