In [None]:
import sys
import logging
from pyspark.context import SparkContext
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from awsglue.context import GlueContext
from awsglue.job import Job  # A√±adido: Manejo formal del Job
from awsglue.utils import getResolvedOptions

# CONFIGURACI√ìN Y PAR√ÅMETROS
args = getResolvedOptions(sys.argv,
                          ['JOB_NAME',
                           'DATABASE',
                           'OUTPUT_PATH'])

sc = SparkContext()
glueContext = GlueContext(sc)
spark = glueContext.spark_session
job = Job(glueContext)
job.init(args['JOB_NAME'], args) # Inicializaci√≥n obligatoria para tracking en AWS

database = args['DATABASE']
output_path = args['OUTPUT_PATH']

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger(__name__)

logger.info("Iniciando proceso ETL basado en Contratos de Datos")

# CONTRATOS DE DATOS (Metadata Driven)
DATA_CONTRACTS = {
    "Precios_compras_2017": {
        "int": ["Brand", "Classification", "VendorNumber"],
        "float": ["Price", "PurchasePrice"],
        "string": ["Description", "Size", "Volume", "VendorName"],
        "date": []
    },
    "Inicio_inventario": {
        "int": ["Store", "Brand", "onHand"],
        "float": ["Price"],
        "string": ["InventoryId", "City", "Description", "Size"],
        "date": ["startDate"]
    },
    "Final_inventario": {
        "int": ["Store", "Brand", "onHand"],
        "float": ["Price"],
        "string": ["InventoryId", "City", "Description", "Size"],
        "date": ["endDate"]
    },
    "Facturas_compras": {
        "int": ["VendorNumber", "PONumber", "Quantity"],
        "float": ["Dollars", "Freight"],
        "string": ["VendorName", "Approval"],
        "date": ["InvoiceDate", "PODate", "PayDate"]
    },
    "Compra_final": {
        "int": ["Store", "Brand", "VendorNumber", "PONumber", "Quantity", "Classification"],
        "float": ["PurchasePrice", "Dollars"],
        "string": ["InventoryId", "Description", "Size", "VendorName"],
        "date": ["PODate", "ReceivingDate", "InvoiceDate", "PayDate"]
    },
    "Venta_final": {
        "int": ["Store", "Brand", "SalesQuantity", "Volume", "Classification", "VendorNo"],
        "float": ["SalesDollars", "SalesPrice", "ExciseTax"],
        "string": ["InventoryId", "Description", "Size", "VendorName"],
        "date": ["SalesDate"]
    }
}

# FUNCIONES DE LIMPIEZA Y TRANSFORMACI√ìN

def cast_columns(df, contract):
    """Aplica tipado fuerte seg√∫n el contrato"""
    for col_name in contract["int"]:
        df = df.withColumn(col_name, col(col_name).cast(IntegerType()))
    for col_name in contract["float"]:
        df = df.withColumn(col_name, col(col_name).cast(DoubleType()))
    for col_name in contract["date"]:
        # Se asume formato est√°ndar, 'coerce' nativo de Spark al castear
        df = df.withColumn(col_name, to_date(col(col_name), "yyyy-MM-dd"))
    return df

def clean_strings(df, contract):
    """Estandariza textos: sin espacios extra y todo en may√∫sculas"""
    for col_name in contract["string"]:
        df = df.withColumn(col_name, trim(upper(col(col_name))))
    return df

def impute_nulls(df, contract):
    """Estrategia de imputaci√≥n: Mediana para n√∫meros, constantes para el resto"""
    numeric_cols = contract["int"] + contract["float"]
    for col_name in numeric_cols:
        if col_name in df.columns:
            # Calculamos mediana de forma aproximada para optimizar recursos
            median_val = df.approxQuantile(col_name, [0.5], 0.01)
            if median_val:
                df = df.fillna({col_name: median_val[0]})

    for col_name in contract["string"]:
        df = df.fillna({col_name: "UNKNOWN"})
    for col_name in contract["date"]:
        df = df.fillna({col_name: "1900-01-01"})
    return df

def treat_outliers(df, contract):
    """Capping de valores at√≠picos usando el Rango Intercuart√≠lico (IQR)"""
    numeric_cols = contract["int"] + contract["float"]
    for col_name in numeric_cols:
        if col_name in df.columns:
            quantiles = df.approxQuantile(col_name, [0.25, 0.75], 0.01)
            q1, q3 = quantiles[0], quantiles[1]
            iqr = q3 - q1
            lower, upper_bound = q1 - 1.5 * iqr, q3 + 1.5 * iqr

            df = df.withColumn(
                col_name,
                when(col(col_name) < lower, lower)
                .when(col(col_name) > upper_bound, upper_bound)
                .otherwise(col(col_name))
            )
    return df

# EJECUCI√ìN DEL PROCESO

tables_to_process = list(DATA_CONTRACTS.keys())

for table_name in tables_to_process:
    try:
        logger.info(f"Procesando tabla: {table_name}")

        # Lectura desde el Diccionario de Datos (Data Catalog)
        dynamic_frame = glueContext.create_dynamic_frame.from_catalog(
            database=database,
            table_name=table_name
        )
        df = dynamic_frame.toDF()

        if df.count() == 0:
            logger.warning(f"La tabla {table_name} est√° vac√≠a. Saltando...")
            continue

        # Aplicaci√≥n de transformaciones en cadena
        contract = DATA_CONTRACTS[table_name]
        df = cast_columns(df, contract)
        df = clean_strings(df, contract)
        df = impute_nulls(df, contract)
        df = treat_outliers(df, contract)

        # Escritura en Capa CLEAR (Formato Parquet optimizado)
        # Se usa 'overwrite' para que el Job sea re-ejecutable sin duplicar data
        target_path = f"{output_path}/{table_name}"
        df.write.mode("overwrite").format("parquet").save(target_path)

        logger.info(f"‚úÖ {table_name} procesada y guardada en {target_path}")

    except Exception as e:
        logger.error(f"‚ùå Error procesando {table_name}: {str(e)}")

# Finalizaci√≥n formal del Job para liberar recursos
job.commit()
logger.info("üèÅ ETL COMPLETO FINALIZADO CON √âXITO")