# Copia de Datos de Clientes del Lakehouse Landing a Lakehouse Bronze

In [None]:
from pyspark.sql.functions import current_timestamp, sha2, concat_ws, col
from pyspark.sql.utils import AnalysisException
from delta.tables import DeltaTable

# Configuración
src_catalog = "Landing"
dst_catalog = "Bronze"
tables_to_process = ["Clientes", "Productos", "Tiendas"]

# Mapa de claves por tabla (ajusta según tu modelo)
key_map = {
    "Clientes": ["ClienteID"],
    "Productos": ["ProductoID"],
    "Tiendas": ["TiendaID"]
}

# Helpers
def get_table_columns(table_full_name):
    try:
        schema = spark.table(table_full_name).schema
        return [(f.name, f.dataType.simpleString()) for f in schema]
    except AnalysisException:
        return []

def add_ingestion_col_if_missing(table_full_name, dry_run=True):
    existing = get_table_columns(table_full_name)
    existing_names = [n for n, _ in existing]
    if "ingestion_ts" in existing_names:
        print(f"[ALTER] ingestion_ts ya existe en {table_full_name}")
        return True
    sql = f"ALTER TABLE {table_full_name} ADD COLUMNS (ingestion_ts timestamp)"
    if dry_run:
        print(f"[ALTER dry_run] {sql}")
        return True
    try:
        spark.sql(sql)
        print(f"[ALTER] ingestion_ts agregado a {table_full_name}")
        return True
    except Exception as e:
        print(f"[ALTER] fallo al agregar ingestion_ts a {table_full_name}: {e}")
        return False

def build_diff_condition_sql(cols):
    """
    Construye una condición SQL que detecta diferencias entre t.col y s.col,
    manejando NULLs correctamente.
    Devuelve una cadena con ORs entre comparaciones por columna.
    """
    conds = []
    for c in cols:
        conds.append(
            f"(t.`{c}` <> s.`{c}` OR (t.`{c}` IS NULL AND s.`{c}` IS NOT NULL) OR (t.`{c}` IS NOT NULL AND s.`{c}` IS NULL))"
        )
    return " OR ".join(conds)

# Loop principal: MERGE condicional (actualiza solo si hay diferencias reales)
for tbl in tables_to_process:
    src_full = f"{src_catalog}.{tbl}"
    dst_full = f"{dst_catalog}.{tbl}"
    key_cols = key_map.get(tbl)

    print(f"\n=== Procesando {src_full} -> {dst_full} (upsert condicional) ===")

    # Leer origen y añadir ingestion_ts (se actualizará solo si la fila cambia)
    try:
        df = spark.table(src_full).withColumn("ingestion_ts", current_timestamp())
    except AnalysisException as e:
        print(f"[ERROR] no se pudo leer {src_full}: {e}")
        continue

    if df.rdd.isEmpty():
        print(f"[INFO] {src_full} está vacío. Omitiendo.")
        continue

    # Deduplicar por clave negocio en el DF (mantener la última fila por clave si hay duplicados)
    if key_cols:
        df = df.dropDuplicates(key_cols)
    else:
        print(f"[WARN] No hay key_cols definidas para {tbl}; se procederá sin dedupe por clave.")

    # Asegurar ingestion_ts en la tabla destino (dry_run + aplicar)
    existing = get_table_columns(dst_full)
    if existing:
        add_ingestion_col_if_missing(dst_full, dry_run=True)
        add_ingestion_col_if_missing(dst_full, dry_run=False)
    else:
        print(f"[INFO] {dst_full} no existe; se creará con el primer write.")

    # Columnas a comparar: todas menos ingestion_ts
    compare_cols = [c for c in df.columns if c != "ingestion_ts"]
    if not compare_cols:
        print(f"[WARN] No hay columnas para comparar en {tbl} (solo ingestion_ts). Se hará MERGE normal.")
    
    # Preparar conteos reales: n_total, n_new, n_changed (usando hashes)
    n_total = df.count()
    n_new = None
    n_changed = 0
    dst_exists = True if existing else False

    if dst_exists and key_cols and compare_cols:
        try:
            dst_df = spark.table(dst_full).select(*key_cols, *compare_cols)
            # hashes para origen y destino (excluyendo ingestion_ts)
            df_h = df.select(*key_cols, *compare_cols).withColumn("_row_hash", sha2(concat_ws("||", *compare_cols), 256))
            dst_h = dst_df.withColumn("_row_hash", sha2(concat_ws("||", *compare_cols), 256)).select(*key_cols, "_row_hash")
            # n_new: claves en origen que no están en destino
            existing_keys = dst_df.select(*key_cols).distinct()
            n_new = df.join(existing_keys, on=key_cols, how="left_anti").count()
            # n_changed: claves comunes con hash distinto
            joined = df_h.alias("s").join(dst_h.alias("t"), on=key_cols, how="inner")
            n_changed = joined.filter(col("s._row_hash") != col("t._row_hash")).count()
        except Exception as e:
            print(f"[WARN] No se pudo calcular n_new/n_changed para {dst_full}: {e}")
            n_new = None
            n_changed = None
    else:
        # Si destino no existe, todo es nuevo
        if not dst_exists:
            n_new = n_total
            n_changed = 0

    print(f"[INFO] Filas en batch: {n_total}; estimado nuevas={n_new}; estimado cambios_reales={n_changed}")

    # Ejecutar MERGE condicional: actualizar solo si hay diferencias reales
    try:
        # Si la tabla no existe, crearla con append
        try:
            DeltaTable.forName(spark, dst_full)
            table_exists = True
        except Exception:
            table_exists = False

        if not table_exists:
            df.write.format("delta").mode("append").saveAsTable(dst_full)
            print(f"[OK] Tabla {dst_full} creada y datos escritos (primer load). Filas: {n_total}")
            continue

        # Verificar key_cols
        if not key_cols:
            raise ValueError(f"No hay key_cols definidas para MERGE en {tbl}.")

        delta_tbl = DeltaTable.forName(spark, dst_full)
        join_cond = " AND ".join([f"t.{k}=s.{k}" for k in key_cols])

        # Si no hay columnas para comparar (solo ingestion_ts), usamos whenMatchedUpdateAll()
        if not compare_cols:
            (delta_tbl.alias("t")
              .merge(df.alias("s"), join_cond)
              .whenMatchedUpdateAll()
              .whenNotMatchedInsertAll()
              .execute()
            )
            print(f"[OK] MERGE ejecutado en {dst_full} (no había columnas para comparar).")
        else:
            # Construir condición SQL que detecta diferencias reales
            cond_sql = build_diff_condition_sql(compare_cols)
            # Preparar mapping set para la actualización (actualizar todas las columnas del DF)
            set_map = {c: f"s.`{c}`" for c in df.columns}
            # Ejecutar MERGE condicional: update solo si cond_sql es True
            (delta_tbl.alias("t")
              .merge(df.alias("s"), join_cond)
              .whenMatchedUpdate(condition=cond_sql, set=set_map)
              .whenNotMatchedInsertAll()
              .execute()
            )
            print(f"[OK] MERGE condicional ejecutado en {dst_full}. Nuevas={n_new}, cambios_reales={n_changed}")

    except Exception as e:
        print(f"[ERROR] MERGE falló para {dst_full}: {e}")



StatementMeta(, , -1, SessionError, , SessionError)

InvalidHttpRequestToLivy: [TooManyRequestsForCapacity] This spark job can't be run because you have hit a spark compute or API rate limit. To run this spark job, cancel an active Spark job through the Monitoring hub, choose a larger capacity SKU, or try again later. HTTP status code: 430 {Learn more} HTTP status code: 430.

In [2]:
from pyspark.sql.functions import (
    current_timestamp, lit, sha2, concat_ws, col, to_date, date_format, regexp_replace
)
from pyspark.sql.utils import AnalysisException
from delta.tables import DeltaTable
import uuid, datetime, os

# Configuración
landing_path = "Files/Ventas/"            # <-- debe ser la carpeta que contiene los CSV
dst_table = "Bronze.Ventas"
audit_table = "Auditoria.audit_venta"
key_cols = ["VentaID"]
date_col = "FechaVenta"

# Helper: listar archivos .csv (soporta Databricks dbutils, Hadoop FS o local)
def list_csv_files(path):
    try:
        files = [f.name for f in dbutils.fs.ls(path) if f.name.endswith(".csv")]
        return sorted(files)
    except NameError:
        pass
    except Exception:
        pass
    try:
        hadoop_conf = spark._jsc.hadoopConfiguration()
        fs = spark._jvm.org.apache.hadoop.fs.FileSystem.get(hadoop_conf)
        path_obj = spark._jvm.org.apache.hadoop.fs.Path(path)
        if not fs.exists(path_obj):
            return []
        status = fs.listStatus(path_obj)
        files = []
        for s in status:
            if s.isFile():
                name = s.getPath().getName()
                if name.endswith(".csv"):
                    files.append(name)
        return sorted(files)
    except Exception:
        pass
    try:
        local_path = path.replace("file://", "")
        files = [f for f in os.listdir(local_path) if f.endswith(".csv")]
        return sorted(files)
    except Exception:
        pass
    print(f"[WARN] No se pudo listar archivos en {path}")
    return []

# Asegurar esquema Auditoria y tabla audit_venta
spark.sql("CREATE DATABASE IF NOT EXISTS Auditoria")
spark.sql(f"""
CREATE TABLE IF NOT EXISTS {audit_table} (
  filename STRING,
  file_path STRING,
  batch_id STRING,
  ingestion_ts TIMESTAMP,
  year_month STRING,
  row_count LONG,
  new_count LONG,
  updated_count LONG,
  status STRING,
  error_message STRING
) USING DELTA
""")

# Obtener archivos nuevos
files = list_csv_files(landing_path)
processed = []
if spark._jsparkSession.catalog().tableExists(audit_table):
    processed = [r.filename for r in spark.table(audit_table).select("filename").distinct().collect()]
new_files = sorted([f for f in files if f not in processed])

if not new_files:
    print("No hay archivos nuevos para procesar.")
else:
    print("Archivos nuevos:", new_files)

for fname in new_files:
    path = landing_path + fname
    batch_id = str(uuid.uuid4())
    ingestion_ts = datetime.datetime.now()
    print(f"\nProcesando archivo: {fname} (batch_id={batch_id})")

    try:
        # ---------- LECTURA COMO STRING (evita inferSchema problemático) ----------
        df_raw = (spark.read
                  .option("header", "true")
                  .option("inferSchema", "false")   # leer todo como string
                  .option("encoding", "utf-8")
                  .csv(path)
                 )

        # ---------- NORMALIZACIÓN BÁSICA ----------
        # Reemplazar valores comunes que rompen el casteo
        df_norm = df_raw.replace(["N/A", "NA", "-", "—", ""], [None, None, None, None, None])

        # Limpiar separadores de miles/coma decimal si aplica (ejemplo)
        # Aplica regexp_replace solo a columnas numéricas que vienen como string
        if "PrecioUnitario" in df_norm.columns:
            df_norm = df_norm.withColumn("PrecioUnitario", regexp_replace(col("PrecioUnitario"), r"[\\s]", ""))
            df_norm = df_norm.withColumn("PrecioUnitario", regexp_replace(col("PrecioUnitario"), ",", "."))

        # ---------- CASTEO EXPLÍCITO Y VALIDADO ----------
        # Castear columnas críticas con control
        df = (df_norm
              .withColumn("VentaID", col("VentaID").cast("long"))
              .withColumn(date_col, to_date(col(date_col), "yyyy-MM-dd"))
              .withColumn("ProductoID", col("ProductoID").cast("long")) if "ProductoID" in df_norm.columns else df_norm
             )

        # Si la línea anterior devolvió un DataFrame distinto, reasignar df correctamente:
        if isinstance(df, type(df_norm)):  # si no se aplicó ProductoID, df es df_norm
            df = df_norm.withColumn("VentaID", col("VentaID").cast("long")).withColumn(date_col, to_date(col(date_col), "yyyy-MM-dd"))
        else:
            # ya tiene VentaID y FechaVenta casteados; aseguramos columnas adicionales si existen
            df = df.withColumn("VentaID", col("VentaID").cast("long")).withColumn(date_col, to_date(col(date_col), "yyyy-MM-dd"))

        # Añadir ingestion_ts, batch_id y year_month
        df = (df.withColumn("ingestion_ts", current_timestamp())
                .withColumn("batch_id", lit(batch_id))
                .withColumn("year_month", date_format(col(date_col), "yyyy-MM"))
               )

        # Deduplicate por clave dentro del batch
        df = df.dropDuplicates(key_cols).cache()

        # Validaciones rápidas: detectar casts fallidos en columnas críticas
        bad_ventaid = df.filter(col("VentaID").isNull()).count()
        bad_fecha = df.filter(col(date_col).isNull()).count()
        if bad_ventaid > 0 or bad_fecha > 0:
            raise ValueError(f"Casteo fallido: VentaID null={bad_ventaid}, {date_col} null={bad_fecha}. Revisa formato del CSV.")

        row_count = df.count()
        year_months = [r["year_month"] for r in df.select("year_month").distinct().collect()]
        print(f"Filas leídas: {row_count}; particiones en batch: {year_months}")

        # ---------- EXISTENCIA DESTINO ----------
        try:
            DeltaTable.forName(spark, dst_table)
            dst_exists = True
        except Exception:
            dst_exists = False

        total_new = 0
        total_updated = 0
        status = "processed"
        error_message = None

        if not dst_exists:
            # Primer load: crear tabla destino (particionar por year_month si quieres)
            df.write.format("delta").mode("append").saveAsTable(dst_table)
            total_new = row_count
            print(f"Tabla {dst_table} creada con {row_count} filas.")
        else:
            # Procesar por partición year_month para limitar scope del MERGE
            for ym in year_months:
                df_part = df.filter(col("year_month") == ym).cache()
                compare_cols = [c for c in df_part.columns if c not in ("ingestion_ts","batch_id","year_month")]

                # Si no hay columnas para comparar (raro), saltar MERGE condicional
                if not compare_cols:
                    print(f"[WARN] No hay columnas para comparar en partition {ym}; se hará MERGE simple.")
                    delta_tbl = DeltaTable.forName(spark, dst_table)
                    join_cond = " AND ".join([f"t.{k}=s.{k}" for k in key_cols])
                    (delta_tbl.alias("t")
                      .merge(df_part.alias("s"), join_cond)
                      .whenMatchedUpdateAll()
                      .whenNotMatchedInsertAll()
                      .execute()
                    )
                    df_part.unpersist()
                    continue

                # Preparar hashes para conteos precisos
                df_h = df_part.select(*key_cols, *compare_cols).withColumn("_row_hash", sha2(concat_ws("||", *compare_cols), 256))
                dst_part = spark.table(dst_table).filter(col("year_month") == ym).select(*key_cols, *compare_cols)
                dst_h = dst_part.withColumn("_row_hash", sha2(concat_ws("||", *compare_cols), 256)).select(*key_cols, "_row_hash")

                existing_keys = dst_part.select(*key_cols).distinct()
                n_new_part = df_part.join(existing_keys, on=key_cols, how="left_anti").count()
                joined = df_h.alias("s").join(dst_h.alias("t"), on=key_cols, how="inner")
                n_changed_part = joined.filter(col("s._row_hash") != col("t._row_hash")).count()

                print(f"Partition {ym}: filas_batch={df_part.count()}, nuevas={n_new_part}, cambios_reales={n_changed_part}")

                # Ejecutar MERGE condicional limitado a la partición
                delta_tbl = DeltaTable.forName(spark, dst_table)
                join_cond = " AND ".join([f"t.{k}=s.{k}" for k in key_cols])
                conds = " OR ".join([f"(t.`{c}` <> s.`{c}` OR (t.`{c}` IS NULL AND s.`{c}` IS NOT NULL) OR (t.`{c}` IS NOT NULL AND s.`{c}` IS NULL))" for c in compare_cols])
                set_map = {c: f"s.`{c}`" for c in df_part.columns}

                (delta_tbl.alias("t")
                  .merge(df_part.alias("s"), join_cond)
                  .whenMatchedUpdate(condition=conds, set=set_map)
                  .whenNotMatchedInsertAll()
                  .execute()
                )

                total_new += n_new_part
                total_updated += n_changed_part
                df_part.unpersist()

        # Registrar resultado en Auditoria.audit_venta
        audit_row = [(fname, path, batch_id, ingestion_ts, ",".join(year_months), row_count, total_new, total_updated, status, error_message)]
        spark.createDataFrame(audit_row, schema=["filename","file_path","batch_id","ingestion_ts","year_month","row_count","new_count","updated_count","status","error_message"]) \
             .write.format("delta").mode("append").saveAsTable(audit_table)

        print(f"Archivo {fname} procesado. nuevas={total_new}, actualizadas={total_updated}")

    except Exception as e:
        status = "error"
        error_message = str(e)
        print(f"[ERROR] al procesar {fname}: {error_message}")
        # registrar error en auditoría (intenta incluir year_months si existe)
        ym_val = ",".join(year_months) if 'year_months' in locals() else None
        audit_row = [(fname, path, batch_id, ingestion_ts, ym_val, 0, 0, 0, status, error_message)]
        spark.createDataFrame(audit_row, schema=["filename","file_path","batch_id","ingestion_ts","year_month","row_count","new_count","updated_count","status","error_message"]) \
             .write.format("delta").mode("append").saveAsTable(audit_table)

    finally:
        try:
            df.unpersist()
        except Exception:
            pass



StatementMeta(, 4522f64b-382b-42bc-a437-94239f739695, 4, Finished, Available, Finished)

Archivos nuevos: ['ventas_2024-09.csv']

Procesando archivo: ventas_2024-09.csv (batch_id=5d69b53b-f49e-4031-9779-87396149219b)
Filas leídas: 10; particiones en batch: ['2024-09']
Partition 2024-09: filas_batch=10, nuevas=10, cambios_reales=0
[ERROR] al procesar ventas_2024-09.csv: [CANNOT_DETERMINE_TYPE] Some of types cannot be determined after inferring.
