In [0]:
# =========================
# transform_defects (SILVER)
# Source: catalog_project.bronze.bronze_manufacturing_defects
# Target: catalog_project.silver.silver_manufacturing_defects
# =========================

from pyspark.sql import functions as F


#WIDGETS

In [0]:
dbutils.widgets.removeAll()
dbutils.widgets.text("catalog", "catalog_project")
dbutils.widgets.text("bronze_schema", "bronze")
dbutils.widgets.text("silver_schema", "silver")

catalog       = dbutils.widgets.get("catalog")
bronze_schema = dbutils.widgets.get("bronze_schema")
silver_schema = dbutils.widgets.get("silver_schema")

In [0]:
spark.sql(f"USE CATALOG {catalog}")
spark.sql(f"CREATE SCHEMA IF NOT EXISTS {silver_schema}")

In [0]:
source_table = f"{catalog}.{bronze_schema}.bronze_manufacturing_defects"
target_table = f"{catalog}.{silver_schema}.silver_manufacturing_defects"


#LOAD BRONZE

In [0]:
df = spark.table(source_table)


#Limpieza y guards

In [0]:
# Guards / cleanup
# =========================
# Normalizar rangos típicos para que KPIs no se ensucien:
# - DefectRate: esperado 0..1
# - DowntimePercentage: esperado 0..100
df_clean = (
    df
    .withColumn(
        "DefectRate",
        F.when((F.col("DefectRate") >= 0) & (F.col("DefectRate") <= 1), F.col("DefectRate")).otherwise(F.lit(None))
    )
    .withColumn(
        "DowntimePercentage",
        F.when((F.col("DowntimePercentage") >= 0) & (F.col("DowntimePercentage") <= 100), F.col("DowntimePercentage")).otherwise(F.lit(None))
    )
)


#COLUMNAS KPI

In [0]:
df_silver = (
    df_clean
    .withColumn("is_defective", F.col("DefectStatus") == 1)

    # costos/energía por unidad
    .withColumn(
        "cost_per_unit",
        F.when(F.col("ProductionVolume") > 0, F.col("ProductionCost") / F.col("ProductionVolume"))
         .otherwise(F.lit(None))
    )
    .withColumn(
        "energy_per_unit",
        F.when(F.col("ProductionVolume") > 0, F.col("EnergyConsumption") / F.col("ProductionVolume"))
         .otherwise(F.lit(None))
    )

    # defectos esperados (si rate está presente)
    .withColumn(
        "expected_defects",
        F.when(F.col("ProductionVolume").isNotNull() & F.col("DefectRate").isNotNull(),
               F.col("ProductionVolume") * F.col("DefectRate"))
         .otherwise(F.lit(None))
    )

    # métricas por 1k unidades
    .withColumn(
        "defects_per_1k_units",
        F.when(F.col("DefectRate").isNotNull(), F.col("DefectRate") * 1000.0).otherwise(F.lit(None))
    )
    .withColumn(
        "incident_rate_per_1k_units",
        F.when(F.col("ProductionVolume") > 0,
               (F.col("SafetyIncidents") / F.col("ProductionVolume")) * 1000.0)
         .otherwise(F.lit(None))
    )
)

#SELECCION FINAL DE COLUMNAS

In [0]:
df_silver = df_silver.select(
    "ProductionVolume",
    "ProductionCost",
    "cost_per_unit",
    "SupplierQuality",
    "DeliveryDelay",
    "DefectRate",
    "defects_per_1k_units",
    "expected_defects",
    "QualityScore",
    "MaintenanceHours",
    "DowntimePercentage",
    "InventoryTurnover",
    "StockoutRate",
    "WorkerProductivity",
    "SafetyIncidents",
    "incident_rate_per_1k_units",
    "EnergyConsumption",
    "energy_per_unit",
    "EnergyEfficiency",
    "AdditiveProcessTime",
    "AdditiveMaterialCost",
    "DefectStatus",
    "is_defective",
    "_ingestion_ts",
    "_source_file"
)


Verificacion de calidad de los datos

#SAVE SILVER

In [0]:
(df_silver.write
 .mode("overwrite")
 .option("overwriteSchema", "true")
 .format("delta")
 .saveAsTable(target_table)
)

target_path = "abfss://silver@storageaccountcf9603.dfs.core.windows.net/transform_defects"

(df_silver.write
  .format("delta")
  .mode("overwrite")   # o "append"
  .save(target_path)
)


#Validacion

In [0]:
print(f"OK: {target_table}")
display(spark.table(target_table).limit(10))