In [0]:
%pip install omegaconf
%restart_python

## LOG DE ERRORES

In [0]:


import logging, os, shutil
from pyspark.sql import functions as F
import os, re
from datetime import datetime

FINAL_LOG_DIR  = "/Volumes/workspace/global_mobility/log"
FINAL_LOG_PATH = os.path.join(FINAL_LOG_DIR, f"etl_run_{datetime.now():%Y%m%d_%H%M%S}.log")

os.makedirs(FINAL_LOG_DIR, exist_ok=True)

logger = logging.getLogger("etl_logger")
logger.setLevel(logging.INFO)
logger.propagate = False  # evita duplicado en el root

for h in list(logger.handlers):
    logger.removeHandler(h)
    try:
        h.flush()
        h.close()
    except Exception:
        pass

fmt = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")

file_handler = logging.FileHandler(FINAL_LOG_PATH, mode="a", encoding="utf-8")
file_handler.setFormatter(fmt)
logger.addHandler(file_handler)

stream_handler = logging.StreamHandler()
stream_handler.setFormatter(fmt)
logger.addHandler(stream_handler)

def log_info(msg: str):
    logger.info(msg)

def log_error(msg: str):
    logger.error(f" {msg}")

def cerrar_log():
    for h in list(logger.handlers):
        try:
            h.flush()
            h.close()
        except Exception:
            pass
        logger.removeHandler(h)



## VALIDACION DEL ARCHIVO CONFIG

In [0]:
from omegaconf import OmegaConf

REPO_ROOT = os.path.dirname(os.getcwd())  
CFG_PATH = os.path.join(REPO_ROOT, "config", "config.yaml")

try:
    # --- Verificar existencia
    if not os.path.exists(CFG_PATH):
        log_error(f"Archivo de configuración no encontrado en: {CFG_PATH}")
        raise FileNotFoundError(f"No existe el archivo {CFG_PATH}")

    config = OmegaConf.load(CFG_PATH)
    log_info("config.yaml cargado correctamente.")

    # --- Estructura básica
    required_keys = ["paths", "params", "delivery_types", "unit_factors"]
    for key in required_keys:
        if key not in config:
            log_error(f"Falta la sección '{key}' en config.yaml")
            raise ValueError(f"Falta la sección '{key}' en config.yaml")

    # --- Validar params
    for i, param in enumerate(config.params):
        for field in ["country", "start_date", "end_date", "proccess"]:
            if field not in param:
                log_error(f"Falta '{field}' en bloque params[{i}] del config.yaml")
                raise ValueError(f"Falta '{field}' en bloque params[{i}]")

        if param.proccess not in ["YES", "NO"]:
            log_error(f"Valor inválido en 'proccess' ({param.proccess}) — debe ser YES o NO")
            raise ValueError(f"Valor inválido en 'proccess' ({param.proccess})")

    log_info(f"params OK ({len(config.params)} bloques validados).")

    # --- Validar delivery_types
    if "routine" not in config.delivery_types or "bonus" not in config.delivery_types:
        log_error("Faltan listas 'routine' y/o 'bonus' en delivery_types.")
        raise ValueError("delivery_types incompleto.")

    valid_rutina = [s.strip().upper() for s in config.delivery_types.routine or []]
    valid_bonif  = [s.strip().upper() for s in config.delivery_types.bonus or []]

    solapados = set(valid_rutina) & set(valid_bonif)
    if solapados:
        log_error(f"delivery_types contiene códigos duplicados entre routine y bonus: {sorted(solapados)}")
        raise ValueError(f"Códigos duplicados en delivery_types: {sorted(solapados)}")

    log_info(f"delivery_types OK. rutina={valid_rutina}, bonif={valid_bonif}")

    # --- Validar unit_factors
    uf_cfg = getattr(config, "unit_factors", None)
    uf = OmegaConf.to_container(uf_cfg, resolve=True) if uf_cfg is not None else None
    if not isinstance(uf, dict) or len(uf) == 0:
        log_error("unit_factors debe ser un diccionario no vacío {unidad: factor}.")
        raise ValueError("unit_factors inválido o vacío.")

    norm_factors = {}
    for k, v in uf.items():
        key = str(k).strip().upper()
        if not key:
            log_error("unit_factors tiene una clave vacía.")
            raise ValueError("Clave vacía en unit_factors.")
        try:
            val = float(v)
        except Exception:
            log_error(f"unit_factors: valor no numérico para la clave '{k}': {v}")
            raise ValueError(f"unit_factors inválido en '{k}'")
        if val <= 0:
            log_error(f"unit_factors: factor debe ser > 0 para '{key}', actual={val}")
            raise ValueError(f"unit_factors con valor no válido en '{key}'")
        norm_factors[key] = val

    keys = [F.lit(k) for k in norm_factors.keys()]
    vals = [F.lit(v) for v in norm_factors.values()]
    factor_map = F.map_from_arrays(F.array(*keys), F.array(*vals))
    log_info(f"unit_factors OK. {norm_factors}")

    # --- Finalización
    log_info("Configuración validada correctamente.")

except Exception as e:
    log_error(f" Error durante la validación del config.yaml: {e}")
    raise


## ESQUEMAS USADOS

In [0]:

%sql
CREATE SCHEMA IF NOT EXISTS RDV;
CREATE SCHEMA IF NOT EXISTS UDV;

In [0]:

%sql
CREATE TABLE IF NOT EXISTS RDV.data_ventas (
  pais VARCHAR(2) ,
  fecha_proceso DATE  ,
  transporte STRING,
  ruta STRING ,
  tipo_entrega STRING,
  material STRING,
  precio DECIMAL(21,2) ,
  cantidad DECIMAL(21,2) COMMENT 'Cantidad segun la unidad' ,
  unidad VARCHAR(2) COMMENT 'Tipo de unidad (caja , unidad)'
)
USING DELTA
PARTITIONED BY (fecha_proceso);

In [0]:

%sql
CREATE TABLE IF NOT EXISTS UDV.data_ventas_depurado (
  cod_pais VARCHAR(2) ,
  fec_proceso DATE  ,
  cod_transporte STRING,
  cod_ruta STRING ,
  cod_tipo_entrega STRING,
  cod_material STRING,
  precio_unitario_unidades DECIMAL(21,3) COMMENT 'Precio unitario por unidad' ,
  mto_venta DECIMAL(21,2) ,
  cant_uni_medida DECIMAL(21,2) COMMENT 'Cantidad segun la unidad de medida',
  cod_uni_medida VARCHAR(2) COMMENT 'Tipo de unidad (caja , unidad)',
  cant_unidades DECIMAL(21,2) COMMENT 'Cantidad en unidades',
  ind_rutina BOOLEAN COMMENT 'Indicador de rutina',
  ind_bonificacion BOOLEAN COMMENT 'Indicador de bonificación',
  origen_datos STRING COMMENT 'Archivo origen',
  fec_actualizacion_registro	STRING	COMMENT 'Fecha de actualización del registro (en formato string)'
)
USING DELTA
PARTITIONED BY (fec_proceso)

;

In [0]:

%sql
CREATE TABLE IF NOT EXISTS UDV.data_ventas_obs (
  cod_pais STRING ,
  fec_proceso STRING  ,
  cod_transporte STRING,
  cod_ruta STRING ,
  cod_tipo_entrega STRING,
  cod_material STRING,
  mto_venta STRING ,
  cant_uni_medida STRING,
  cod_uni_medida STRING,
  motivo_obs STRING COMMENT 'Motivo de la observación',
  origen_datos STRING COMMENT 'Archivo origen',
  fec_actualizacion_registro	STRING	COMMENT 'Fecha de actualización del registro (en formato string)'
)
USING DELTA
PARTITIONED BY (fec_proceso);

In [0]:

from pyspark.sql.types import StructType, StructField, StringType, DoubleType, DateType

schema_csv = StructType([
    StructField("pais", StringType(), True),
    StructField("fecha_proceso", StringType(), True),
    StructField("transporte", StringType(), True),
    StructField("ruta", StringType(), True),
    StructField("tipo_entrega", StringType(), True),
    StructField("material", StringType(), True),
    StructField("precio", DoubleType(), True),
    StructField("cantidad", DoubleType(), True),
    StructField("unidad", StringType(), True),
])


### CAPA BRONZE:

In [0]:

from pyspark.sql import SparkSession
    
spark = SparkSession.builder.getOrCreate()

def procesar_pais_rdv(country, start_date, end_date):
    try:

        df_csv = (spark.read.schema(schema_csv)
                .option("header", True)
                .option("inferSchema", False)
                .csv(config.paths.raw_csv)) 
        
        df_ventas = df_csv.select(
        F.col("pais"),
        F.to_date(F.regexp_replace(F.col("fecha_proceso"), r"\s+", ""), "yyyyMMdd").alias("fecha_proceso"),
        F.col("transporte"),
        F.col("ruta"),
        F.col("tipo_entrega"),
        F.col("material"),
        F.col("precio").cast("decimal(21,2)"),
        F.col("cantidad").cast("decimal(21,2)"),
        F.col("unidad")
        )

        # FILTRADO DE LOS DATOS SEGUN EL ARCHIVO YAML

        df_ventas = df_ventas.filter(
            (F.col("pais") == country) &
            (F.col("fecha_proceso") >= start_date) &
            (F.col("fecha_proceso") <= end_date)
        )

        (df_ventas.write
        .format("delta")
        .mode("overwrite")
        .option("replaceWhere", f"pais = '{country}' AND fecha_proceso >= DATE '{start_date}' AND fecha_proceso <= DATE '{end_date}'")
        .partitionBy("fecha_proceso")
        .saveAsTable("RDV.data_ventas"))

    except Exception as e:
        log_error(f"{country}: Error durante el proceso  rdv {str(e)}")



### CAPA SILVER

In [0]:

def actualizar_config(country):
    cfg_copy = OmegaConf.to_container(config, resolve=True)
    for p in cfg_copy["params"]:
        if p["country"] == country:
            p["proccess"] = "YES"

    OmegaConf.save(OmegaConf.create(cfg_copy), CFG_PATH)
    print(f"Config actualizado: {country} procesado")

factor_expr = F.element_at(
    factor_map, 
    F.upper(F.trim(F.col("cod_uni_medida")))
)

errores_concat = F.concat_ws(
    "|",
    F.when(F.col("cant_uni_medida").isNull(), F.lit("ERR_CANT_NULL")),
    F.when(F.col("cant_uni_medida").isNotNull() & (F.col("cant_uni_medida") <= 0),
           F.lit("ERR_CANT_NO_POSITIVA")),
    F.when(F.col("cod_material").isNull(), F.lit("ERR_SIN_MATERIAL_CONOCIDO")),
    F.when(factor_expr.isNull(), F.lit("ERR_UNIDAD_DESCONOCIDA")),
    F.when(F.col("mto_venta").isNotNull() & (F.col("mto_venta") <= 0),
           F.lit("ERR_VENTA_NO_POSITIVA")),
    F.when(~F.upper(F.trim(F.col("cod_tipo_entrega"))).isin(valid_rutina + valid_bonif),
           F.lit("ERR_TIPO_ENTREGA_NO_CONSIDERADA"))
)


def procesar_pais_udv(country, start_date, end_date):
    try:
        df_rdv_ventas = spark.read.table("RDV.data_ventas").filter(
            (F.col("pais") == country) &
            (F.col("fecha_proceso") >= start_date) &
            (F.col("fecha_proceso") <= end_date)
        )

        df_rdv_ventas = df_rdv_ventas.select(
            F.col("pais").alias("cod_pais"),
            F.col("fecha_proceso").alias("fec_proceso"),
            F.col("transporte").alias("cod_transporte"),
            F.col("ruta").alias("cod_ruta"),
            F.col("tipo_entrega").alias("cod_tipo_entrega"),
            F.col("material").alias("cod_material"),
            F.col("precio").alias("mto_venta"),
            F.col("cantidad").alias("cant_uni_medida"),
            F.col("unidad").alias("cod_uni_medida")
        )
        
        df_rdv_ventas = df_rdv_ventas.select(
            F.col("cod_pais"),
            F.col("fec_proceso"),
            F.col("cod_transporte"),
            F.col("cod_ruta"),
            F.col("cod_tipo_entrega"),
            F.col("cod_material"),
            F.when(
                factor_expr.isNotNull() &
                F.col("cant_uni_medida").isNotNull() & (F.col("cant_uni_medida") > 0) &
                F.col("mto_venta").isNotNull(),
                F.round(
                    F.col("mto_venta") / (F.col("cant_uni_medida") * factor_expr),
                    3
                )
            ).cast("decimal(21,3)").alias("precio_unitario_unidades"),
            F.round(F.col("mto_venta"), 2).cast("decimal(21,2)").alias("mto_venta"),
            F.round(F.col("cant_uni_medida"), 2).cast("decimal(21,2)").alias("cant_uni_medida"),
            F.col("cod_uni_medida"),
            F.when(
                factor_expr.isNotNull() &
                F.col("cant_uni_medida").isNotNull() & (F.col("cant_uni_medida") > 0),
                F.round(F.col("cant_uni_medida") * factor_expr, 2)
            ).cast("decimal(21,2)").alias("cant_unidades"),
            F.when(F.upper(F.trim(F.col("cod_tipo_entrega"))).isin(valid_rutina), True)
            .when(F.upper(F.trim(F.col("cod_tipo_entrega"))).isin(valid_bonif), False)
            .otherwise(F.lit(None)).alias("ind_rutina"),
            F.when(F.upper(F.trim(F.col("cod_tipo_entrega"))).isin(valid_bonif), True)
            .when(F.upper(F.trim(F.col("cod_tipo_entrega"))).isin(valid_rutina), False)
            .otherwise(F.lit(None)).alias("ind_bonificacion"),
            F.lit(config.paths.raw_csv).alias("origen_datos"),
            F.date_format(F.current_timestamp(), "yyyy-MM-dd HH:mm:ss").alias("fec_actualizacion_registro"),
            F.when(F.length(errores_concat) == 0, F.lit(None)).otherwise(errores_concat).alias("motivo_obs")
        )

        df_depurado = df_rdv_ventas.filter(
            F.col("motivo_obs").isNull()
        ).drop("motivo_obs")

        (df_depurado.write
        .format('delta')
        .mode("overwrite")
        .option("replaceWhere", f"cod_pais = '{country}' AND fec_proceso >= DATE '{start_date}' AND fec_proceso <= DATE '{end_date}'")
        .partitionBy("fec_proceso")
        .saveAsTable("UDV.data_ventas_depurado")
        )

        (df_depurado.write
        .format('delta')
        .mode("overwrite")
        .option("replaceWhere", f"cod_pais = '{country}' AND fec_proceso >= DATE '{start_date}' AND fec_proceso <= DATE '{end_date}'")
        .partitionBy("fec_proceso")
        .save(config.paths.output_root)
        ) 

        df_obs = df_rdv_ventas.select(
            F.col("cod_pais").cast("string"),
            F.col("fec_proceso").cast("string"),
            F.col("cod_transporte").cast("string"),
            F.col("cod_ruta").cast("string"),
            F.col("cod_tipo_entrega").cast("string"),
            F.col("cod_material").cast("string"),
            F.col("mto_venta").cast("string"),
            F.col("cant_uni_medida").cast("string"),
            F.col("cod_uni_medida").cast("string"),
            F.col("motivo_obs").cast("string"),
            F.col("origen_datos").cast("string"),
            F.col("fec_actualizacion_registro").cast("string")
        ).filter(
            F.col("motivo_obs").isNotNull()
        )
        
        (df_obs.write.
        format('delta')
        .mode("overwrite")
        .option("replaceWhere", f"cod_pais = '{country}' AND  fec_proceso >=   '{start_date}' AND  fec_proceso <=   '{end_date}'")
        .partitionBy("fec_proceso")
        .saveAsTable("UDV.data_ventas_obs")
        )
        
        

    except Exception as e:
        log_error(f"{country}: Error durante el proceso  UDV -> {str(e)}")

    



In [0]:

for param in config.params:
    if param.proccess == "YES":
        log_info(f"Saltando país {param.country}: ya procesado previamente.")
        continue

    if param.proccess == "NO":
        country = param.country
        start_date = param.start_date
        end_date = param.end_date
        
        log_info(f"Procesando país  {country} ({start_date} - {end_date})...")
        procesar_pais_rdv(country, start_date, end_date)
        procesar_pais_udv(country, start_date, end_date)
        actualizar_config(country)
    
cerrar_log()

