# Silver Layer — Transformaciones y Cálculo de KPIs
**Fuente:** `catalog_termoplanta.bronze.historian_signals`  
**Destino:** `catalog_termoplanta.silver.historian_kpis`  
**Descripción:** Conversión de unidades, cálculo de KPIs de performance, 
detección de outliers por IQR y clasificación de condición operativa.

In [0]:
%python
# ============================================================
# PARÁMETROS
# ============================================================
CATALOG = "catalog_termoplanta"
PCI_GN  = 39.79   # Poder calorífico inferior del Gas Natural (MJ/m3)

print(f"Catálogo: {CATALOG}")
print(f"PCI Gas Natural: {PCI_GN} MJ/m3")

In [0]:
%python
from pyspark.sql import functions as F
from pyspark.sql.window import Window

# ============================================================
# LECTURA DESDE BRONZE
# ============================================================
df = spark.table(f"{CATALOG}.bronze.historian_signals")
print(f"Registros desde bronze: {df.count():,}")
df.printSchema()

In [0]:
%python
# ============================================================
# CÁLCULO DE KPIS — Portado de Pandas a PySpark
# ============================================================

# Condición TGs OFF (todas apagadas)
tgs_off = (
    (F.col("G1_DWATT") < 50) &
    (F.col("G2_DWATT") < 50) &
    (F.col("S1_DWATT") < 50)
)

df_kpis = (
    df

    # --- Heat Rate Neto (BTU/kWh) ---
    .withColumn("HR", F.when(
        tgs_off, F.lit(0.0)
    ).otherwise(
        (((F.col("G1_FQG") + F.col("G2_FQG")) * (1000/0.453592) * PCI_GN) /
        ((F.col("G1_DWATT") + F.col("G2_DWATT") + F.col("S1_DWATT")) -
          F.col("10JI8128") - F.col("10JI8129")) / 4.18) * 3.96567
    ))

    # --- Heat Rate Bruto (BTU/kWh) ---
    .withColumn("HR_bruto", F.when(
        tgs_off, F.lit(0.0)
    ).otherwise(
        (((F.col("G1_FQG") + F.col("G2_FQG")) * (1000/0.453592) * PCI_GN) /
        (F.col("G1_DWATT") + F.col("G2_DWATT") + F.col("S1_DWATT")) / 4.18) * 3.96567
    ))

    # --- Eficiencia HRSG11 (%) ---
    # Conversión °F → K: T_K = (T_F - 32)/1.8 + 273.15
    .withColumn("n_HRSG11",
        (((F.col("G1_TTXM") - 32) / 1.8 + 273.15) - ((F.col("`11TI1870`") - 32) / 1.8 + 273.15)) /
        (((F.col("G1_TTXM") - 32) / 1.8 + 273.15) - ((F.col("`00TI8002`") - 32) / 1.8 + 273.15)) * 100
    )

    # --- Eficiencia HRSG12 (%) ---
    .withColumn("n_HRSG12",
        (((F.col("G2_TTXM") - 32) / 1.8 + 273.15) - ((F.col("`12TI1870`") - 32) / 1.8 + 273.15)) /
        (((F.col("G2_TTXM") - 32) / 1.8 + 273.15) - ((F.col("`00TI8002`") - 32) / 1.8 + 273.15)) * 100
    )

    # --- Efectividad Condensador Box 1 (%) ---
    .withColumn("n_Box1",
        (((F.col("10TI6591A") - 32) * (5/9)) - ((F.col("10TI6595A") - 32) * (5/9))) /
        (((F.col("10TI6591A") - 32) * (5/9)) - ((F.col("10TI3005") - 32) * (5/9))) * 100
    )

    # --- Efectividad Condensador Box 2 (%) ---
    .withColumn("n_Box2",
        (((F.col("10TI6591B") - 32) * (5/9)) - ((F.col("10TI6595B") - 32) * (5/9))) /
        (((F.col("10TI6591B") - 32) * (5/9)) - ((F.col("10TI3005") - 32) * (5/9))) * 100
    )

    # --- Eficiencia Turbina TG11 (%) ---
    .withColumn("n_TG11", F.when(
        F.col("G1_DWATT") < 10, F.lit(0.0)
    ).otherwise(
        (3600 * 100) / ((((F.col("G1_FQG") * 0.45359237 / 0.74) * 3600) * 35885.5) /
        F.col("G1_DWATT") / 1000)
    ))

    # --- Eficiencia Turbina TG12 (%) ---
    .withColumn("n_TG12", F.when(
        F.col("G2_DWATT") < 10, F.lit(0.0)
    ).otherwise(
        (3600 * 100) / ((((F.col("G2_FQG") * 0.45359237 / 0.74) * 3600) * 35885.5) /
        F.col("G2_DWATT") / 1000)
    ))

    # --- Eficiencia Compresor TG11 (%) ---
    .withColumn("n_c_TG11",
        (((F.col("G1_CTIM") - 32) * (5/9) + 273.15) *
         (F.col("G1_CPR") ** (1 - 1/1.4)) -
         ((F.col("G1_CTIM") - 32) * (5/9) + 273.15)) /
        (((F.col("G1_CTD") - 32) * (5/9) + 273.15) -
         ((F.col("G1_CTIM") - 32) * (5/9) + 273.15)) * 100
    )

    # --- Eficiencia Compresor TG12 (%) ---
    .withColumn("n_c_TG12",
        (((F.col("G2_CTIM") - 32) * (5/9) + 273.15) *
         (F.col("G2_CPR") ** (1 - 1/1.4)) -
         ((F.col("G2_CTIM") - 32) * (5/9) + 273.15)) /
        (((F.col("G2_CTD") - 32) * (5/9) + 273.15) -
         ((F.col("G2_CTIM") - 32) * (5/9) + 273.15)) * 100
    )

    # --- Potencia total (MW) ---
    .withColumn("potencia_total_mw",
        F.col("G1_DWATT") + F.col("G2_DWATT") + F.col("S1_DWATT")
    )

    # --- Columnas de tiempo ---
    .withColumn("fecha",   F.to_date(F.col("timestamp")))
    .withColumn("hora",    F.hour(F.col("timestamp")))
    .withColumn("semana",  F.weekofyear(F.col("timestamp")))
    .withColumn("year",    F.year(F.col("timestamp")))

    # --- Metadata ---
    .withColumn("processing_date", F.current_timestamp())
)

print(f"Registros con KPIs calculados: {df_kpis.count():,}")

In [0]:
%python
# ============================================================
# CLASIFICACIÓN DE CONDICIÓN OPERATIVA
# Igual que el notebook original
# ============================================================
df_kpis = df_kpis.withColumn("condicion_operativa",
    F.when(
        (F.col("G1_DWATT") > 170) & (F.col("G2_DWATT") > 170) & (F.col("S1_DWATT") > 150),
        F.lit("Carga_Base_CC2x1")
    ).when(
        (F.col("G1_DWATT") > 170) & (F.col("G2_DWATT") < 25) & (F.col("S1_DWATT") > 70),
        F.lit("BL_1x1_TG11")
    ).when(
        (F.col("G1_DWATT") < 25) & (F.col("G2_DWATT") > 170) & (F.col("S1_DWATT") > 70),
        F.lit("BL_1x1_TG12")
    ).when(
        (F.col("G1_DWATT") > 90) & (F.col("G1_DWATT") < 140) &
        (F.col("G2_DWATT") > 90) & (F.col("G2_DWATT") < 140) &
        (F.col("S1_DWATT") > 45),
        F.lit("MT_CC2x1")
    ).otherwise(F.lit("Otro"))
)

print("=== Distribución de condiciones operativas ===")
df_kpis.groupBy("condicion_operativa").count().orderBy("count", ascending=False).show()

In [0]:
%python
# ============================================================
# DETECCIÓN DE OUTLIERS — Método IQR por condición operativa
# Se marca con flag, NO se elimina (conservar dato original)
# ============================================================
kpi_cols = ["HR", "n_TG11", "n_TG12"]
window_cond = Window.partitionBy("condicion_operativa", "semana", "year")

for col in kpi_cols:
    q1_col = f"{col}_q1"
    q3_col = f"{col}_q3"
    flag_col = f"{col}_outlier"

    df_kpis = (
        df_kpis
        .withColumn(q1_col, F.percentile_approx(F.col(col), 0.25).over(window_cond))
        .withColumn(q3_col, F.percentile_approx(F.col(col), 0.75).over(window_cond))
        .withColumn(flag_col,
            (F.col(col) < (F.col(q1_col) - 1.5 * (F.col(q3_col) - F.col(q1_col)))) |
            (F.col(col) > (F.col(q3_col) + 1.5 * (F.col(q3_col) - F.col(q1_col))))
        )
        .drop(q1_col, q3_col)
    )

# Flag combinado: outlier en cualquier KPI clave
df_kpis = df_kpis.withColumn("es_outlier",
    F.col("HR_outlier") | F.col("n_TG11_outlier") | F.col("n_TG12_outlier")
)

print("Outliers detectados:")
df_kpis.groupBy("es_outlier").count().show()

In [0]:
%python
# ============================================================
# ESCRITURA EN DELTA — SILVER LAYER
# ============================================================
(
    df_kpis
    .write
    .format("delta")
    .mode("overwrite")
    .option("overwriteSchema", "true")
    .partitionBy("year", "condicion_operativa")
    .saveAsTable(f"{CATALOG}.silver.historian_kpis")
)

print(f"✅ Tabla silver escrita: {CATALOG}.silver.historian_kpis")

In [0]:
%python
# ============================================================
# VERIFICACIÓN
# ============================================================
df_check = spark.table(f"{CATALOG}.silver.historian_kpis")
print(f"Registros en silver: {df_check.count():,}")

print("\n=== Estadísticas KPIs principales ===")
df_check.filter(
    (F.col("condicion_operativa") == "Carga_Base_CC2x1") &
    (~F.col("es_outlier"))
).select("HR", "HR_bruto", "n_TG11", "n_TG12", "n_HRSG11", "n_HRSG12").describe().show()