# Parámetros

In [0]:
# catalog = "sales"
# bronze_schema = "sales_bronze"
# bronze_table = "sales_raw"
# silver_schema = "sales_silver"
# silver_table = "sales_curated"

In [0]:
#Obtención de los parametros
try:
    catalog = dbutils.widgets.get("catalog")
    bronze_schema = dbutils.widgets.get("bronze_schema")
    bronze_table = dbutils.widgets.get("bronze_table")
    silver_schema = dbutils.widgets.get("silver_schema")
    silver_table = dbutils.widgets.get("silver_table")
    print("Parámetros cargados exitosamente.")

except Exception as e:
    print(f"Error fatal: No se pudieron obtener los parámetros")
    print(f"Detalle del error: {e}")
    # Detiene la ejecución del notebook si los parámetros fallan
    raise Exception("Error al obtener parámetros")


# Cargar datos Sales desde Bronze

In [0]:
from pyspark.sql.functions import (
    col, when, current_timestamp, floor, round, to_date
)
from pyspark.sql.types import (
    IntegerType, DecimalType, DoubleType
)

try:
    # Carga las ventas desde la capa bronze
    df_sales_raw = spark.table(f"{catalog}.{bronze_schema}.{bronze_table}")
    print("Lectura de capa Bronze exitosa.")

except Exception as e:
    print(f"Error fatal: No se pudo leer la tabla de origen Bronze.")
    print(f"Detalle del error: {e}")
    raise Exception("Error al leer de Bronze.")

try:
    #Se identificaron estas columnas como cruciales para el analisis del registro, si alguno de estos valores tiene un valor nulo el registro sera removido
    filter_cols = ["invoice_line_no", "date", "store", "county_number", "itemno", "vendor_no", "sale_bottles", "category", "state_bottle_retail", "state_bottle_cost"]

    df_cleaned_nulls = df_sales_raw.dropna(subset=filter_cols)
    print(f"Se eliminaron {df_sales_raw.count() - df_cleaned_nulls.count()} registros por tener valores nulos en las columnas cruciales")

    #Completar valores faltantes no excluyentes (no se consideran category_name, vendor_name, im_desc y county porque sus nulos fueron removidos al analizar si un registro desde la fuente es válido)
    df_silver = df_sales_raw.na.fill({
            "address": "UNKNOWN",
            "city": "UNKNOWN",
            "zipcode": "00000",
            "name": "UNKNOWN",
            "pack": "0", 
    })

    #Se transforman las columnas númericas al tipo de dato correspondiente
    df_silver_enriched = (df_cleaned_nulls
        .withColumn("date", to_date(col("date"), "yyyy-MM-dd"))
        .withColumn("store", col("store").cast(IntegerType())) 
        .withColumn("county_number", col("county_number").cast(IntegerType()))
        .withColumn("category", col("category").cast(IntegerType())) 
        .withColumn("vendor_no", col("vendor_no").cast(IntegerType())) 
        .withColumn("itemno", col("itemno").cast(IntegerType()))
        .withColumn("pack", col("pack").cast(DoubleType()).cast(IntegerType()))
        .withColumn("bottle_volume_ml", col("bottle_volume_ml").cast(DoubleType()).cast(IntegerType()))
        .withColumn("sale_bottles", col("sale_bottles").cast(IntegerType()))
        .withColumn("state_bottle_cost", col("state_bottle_cost").cast(DecimalType(10, 2)))
        .withColumn("state_bottle_retail", col("state_bottle_retail").cast(DecimalType(10, 2)))
        .withColumn("sale_dollars", col("sale_dollars").cast(DecimalType(10, 2)))
        .withColumn("sale_liters", col("sale_liters").cast(DecimalType(10, 3)))
        .withColumn("sale_gallons", col("sale_gallons").cast(DecimalType(10, 3)))
    )

    #Elimino aquellos registros que los productos no tienen costo de venta, precio de venta o volumen de botella
    df_silver_enriched = df_silver_enriched.filter(
        (col("state_bottle_cost") > 0) & 
        (col("state_bottle_retail") > 0) &
        (col("bottle_volume_ml") > 0))
 

    df_silver_enriched = df_silver_enriched.withColumn(
        # Recalcula 'sale_dollars' (botellas vendidas x precio unitario)
        "sale_dollars", round(col("sale_bottles") * col("state_bottle_retail"), 2)
    ).withColumn(
        # Calcula 'total_cost' (botellas vendidas x costo unitario)
        "total_cost", round(col("sale_bottles") * col("state_bottle_cost"), 2)
    ).withColumn(
        # Calcula 'sale_liters' (botellas vendidas x volumen por botella)
        "sale_liters", round(col("sale_bottles") * col("bottle_volume_ml") / 1000, 3)
    ).withColumn(
        # Calcula 'sale_cases' (división entera entre botellas vendidas y tamaño de pack)
        "sale_cases", when(col("pack") > 0, floor(col("sale_bottles") / col("pack")))
                    .otherwise(0)
    ).withColumn(
        # Agrega la fecha de carga
        "StoreDay", current_timestamp()
    )

    #calcula 'sale_margin' sale_dollars - total_cost
    df_silver_enriched = df_silver_enriched.withColumn(
        "sale_margin", round(col("sale_dollars") - col("total_cost"), 2)
    )

    #Se transforman en el tipo de dato correspondiente las columnas calculadas
    df_silver_enriched = (df_silver_enriched
        .withColumn("total_cost", col("total_cost").cast(DecimalType(10, 2)))
        .withColumn("sale_margin", col("sale_margin").cast(DecimalType(10, 2)))
        .withColumn("sale_cases", col("sale_cases").cast(IntegerType()))
    )

    #Determino el orden de las columnas
    df_sales_final = df_silver_enriched.select(
    "invoice_line_no",
    "date",
    col("store").alias("store_number"),
    col("name").alias("store_name"),
    "address",
    "city",
    "zipcode",
    "county_number",
    "county",
    "category",
    "category_name",
    "vendor_no",
    "vendor_name",
    "itemno",
    "im_desc",
    "pack",
    "bottle_volume_ml",
    "state_bottle_cost",
    "state_bottle_retail",
    "sale_bottles",
    "sale_dollars",     
    "sale_liters",      
    "total_cost",        # Nuevo
    "sale_margin",       # Nuevo
    "sale_cases",        # Nuevo
    "StoreDay"           
    )

    #Puente entre el dataframe y la tabla destino
    df_sales_final.createOrReplaceTempView("tmp_sales_final")
    print("Transformaciones y creación de vista temporal 'tmp_sales_final' exitosa.")
    
except Exception as e:
    print(f"Error fatal: Fallo durante la transformación de datos")
    print(f"Detalle del error: {e}")
    raise Exception("Verifique la lógica de transformación o la calidad de los datos")


# Crear tabla Sales Curated si no existe

In [0]:
try:  
  # Define la consulta SQL para crear la tabla Sales Curated
  create_table_query = f"""
  CREATE TABLE IF NOT EXISTS {catalog}.{silver_schema}.{silver_table} (
    invoice_line_no STRING,
    date DATE,
    store_number INT,
    store_name STRING,
    address STRING,
    city STRING,
    zipcode STRING,
    county_number INT,
    county STRING,
    category INT,
    category_name STRING,
    vendor_no INT,
    vendor_name STRING,
    itemno INT,
    im_desc STRING,
    pack INT,
    bottle_volume_ml INT,
    state_bottle_cost DECIMAL(10, 2),
    state_bottle_retail DECIMAL(10, 2),
    sale_bottles INT,
    sale_dollars DECIMAL(10, 2),
    sale_liters DECIMAL(10, 3),
    total_cost DECIMAL(10, 2),
    sale_margin DECIMAL(10, 2),
    sale_cases INT,
    StoreDay TIMESTAMP
  )
  USING DELTA
  """

  # Ejecuta la consulta para crear la tabla
  spark.sql(create_table_query)
  print("Tabla de destino asegurada")

except Exception as e:
    print(f"Error fatal: Fallo durante la creación/verificación de la tabla en Silver.")
    print(f"Detalle del error: {e}")
    raise Exception("Verifique la sintaxis de CREATE TABLE")

# Carga de la tabla Sales Curated

In [0]:
try:
  #Elimino el contenido que pudo haber quedado cargado de una corrida anterior
  truncate_query = f"TRUNCATE TABLE {catalog}.{silver_schema}.{silver_table}"
  spark.sql(truncate_query)
  print("Tabla de destino truncada.")

  # 6. Inserta los datos en la tabla Silver
  insert_query = f"""
  INSERT INTO {catalog}.{silver_schema}.{silver_table} (
    invoice_line_no,
    date,
    store_number,
    store_name,
    address,
    city,
    zipcode,
    county_number,
    county,
    category,
    category_name,
    vendor_no,
    vendor_name,
    itemno,
    im_desc,
    pack,
    bottle_volume_ml,
    state_bottle_cost,
    state_bottle_retail,
    sale_bottles,
    sale_dollars,
    sale_liters,
    total_cost,
    sale_margin,
    sale_cases,
    StoreDay
  )
  SELECT
    invoice_line_no,
    date,
    store_number,
    store_name,
    address,
    city,
    zipcode,
    county_number,
    county,
    category,
    category_name,
    vendor_no,
    vendor_name,
    itemno,
    im_desc,
    pack,
    bottle_volume_ml,
    state_bottle_cost,
    state_bottle_retail,
    sale_bottles,
    sale_dollars,
    sale_liters,
    total_cost,
    sale_margin,
    sale_cases,
    StoreDay
  FROM tmp_sales_final
  """

  # Ejecuta la consulta de inserción
  spark.sql(insert_query)
  print("Inserción de datos en capa Silver completada exitosamente.")
except Exception as e:
    print(f"Error fatal: Fallo durante la escritura de datos en la capa Silver.")
    print(f"Detalle del error: {e}")
    raise Exception("Error al TRUNCATE/INSERT. Verifique desfase de esquema.")