In [0]:
#Drop wigdets
dbutils.widgets.removeAll()

# Databricks notebook source
# =========================
# Widgets
# =========================
dbutils.widgets.text("catalogo", "catalog_final_project")
dbutils.widgets.text("esquema_source", "bronze")
dbutils.widgets.text("esquema_sink", "silver")

# COMMAND ----------

from pyspark.sql import functions as F, Window as W

catalogo = dbutils.widgets.get("catalogo")
esq_src  = dbutils.widgets.get("esquema_source")
esq_sink = dbutils.widgets.get("esquema_sink")

tabla_gen_bronze = f"{catalogo}.{esq_src}.hydro_generation_raw"
tabla_hyd_bronze = f"{catalogo}.{esq_src}.hydrology_raw"
tabla_silver     = f"{catalogo}.{esq_sink}.plant_operational_data"

print("GEN (source):", tabla_gen_bronze)
print("HYD (source):", tabla_hyd_bronze)
print("SILVER (sink):", tabla_silver)

# COMMAND ----------

df_gen = spark.table(tabla_gen_bronze).withColumn("dt", F.to_timestamp("datetime"))
df_hyd = spark.table(tabla_hyd_bronze).withColumn("dt", F.to_timestamp("datetime"))

# Deduplicate por (plant_id, dt) conservando última ingesta
w_gen = W.partitionBy("plant_id", "dt").orderBy(F.col("_ingestion_ts").desc_nulls_last())
df_gen = (df_gen.withColumn("_rn", F.row_number().over(w_gen))
                .filter(F.col("_rn") == 1)
                .drop("_rn"))

w_hyd = W.partitionBy("plant_id", "dt").orderBy(F.col("_ingestion_ts").desc_nulls_last())
df_hyd = (df_hyd.withColumn("_rn", F.row_number().over(w_hyd))
                .filter(F.col("_rn") == 1)
                .drop("_rn"))

# Limpieza ligera + flags (sin complicarse)
df_gen = (
    df_gen
    .withColumn("dq_bad_capacity", F.when((F.col("installed_capacity_mw").isNull()) | (F.col("installed_capacity_mw") <= 0), 1).otherwise(0))
    .withColumn("dq_bad_eff", F.when((F.col("turbine_efficiency") < 0) | (F.col("turbine_efficiency") > 1), 1).otherwise(0))
    .withColumn("dq_bad_gen", F.when((F.col("actual_generation_mw") < 0) | (F.col("actual_generation_mw") > F.col("installed_capacity_mw")), 1).otherwise(0))
    .withColumn("dq_bad_flow", F.when(F.col("water_flow_m3s") < 0, 1).otherwise(0))
    # normalización simple
    .withColumn("turbine_efficiency_clean",
                F.when((F.col("turbine_efficiency") >= 0) & (F.col("turbine_efficiency") <= 1), F.col("turbine_efficiency")))
    .withColumn("water_flow_m3s_clean",
                F.when(F.col("water_flow_m3s") >= 0, F.col("water_flow_m3s")))
    .withColumn("actual_generation_mw_clean",
                F.when(F.col("actual_generation_mw") < 0, F.lit(0.0))
                 .when(F.col("actual_generation_mw") > F.col("installed_capacity_mw"), F.col("installed_capacity_mw"))
                 .otherwise(F.col("actual_generation_mw")))
    .withColumn("capacity_factor",
                F.when(F.col("installed_capacity_mw") > 0, F.col("actual_generation_mw_clean") / F.col("installed_capacity_mw")))
)

df_hyd = (
    df_hyd
    .withColumn("dq_bad_inflow", F.when(F.col("inflow_m3s") < 0, 1).otherwise(0))
    .withColumn("dq_bad_rain",   F.when(F.col("rainfall_mm") < 0, 1).otherwise(0))
    .withColumn("inflow_m3s_clean",  F.when(F.col("inflow_m3s") >= 0, F.col("inflow_m3s")))
    .withColumn("rainfall_mm_clean", F.when(F.col("rainfall_mm") >= 0, F.col("rainfall_mm")))
)

# Join hidrología sobre generación
df_silver = (
    df_gen.alias("g")
    .join(df_hyd.alias("h"),
          on=[F.col("g.plant_id") == F.col("h.plant_id"), F.col("g.dt") == F.col("h.dt")],
          how="left")
    .select(
        F.col("g.plant_id"),
        F.col("g.plant_name"),
        F.col("g.dt").alias("datetime"),
        F.col("g.installed_capacity_mw"),
        F.col("g.actual_generation_mw_clean").alias("actual_generation_mw"),
        F.col("g.water_flow_m3s_clean").alias("water_flow_m3s"),
        F.col("g.turbine_efficiency_clean").alias("turbine_efficiency"),
        F.col("g.outage_flag"),
        F.col("g.capacity_factor"),

        F.col("h.river_basin"),
        F.col("h.reservoir_level_m"),
        F.col("h.inflow_m3s_clean").alias("inflow_m3s"),
        F.col("h.rainfall_mm_clean").alias("rainfall_mm"),
        F.col("h.temperature_c"),

        # DQ flags
        F.col("g.dq_bad_capacity"),
        F.col("g.dq_bad_eff"),
        F.col("g.dq_bad_gen"),
        F.col("g.dq_bad_flow"),
        F.col("h.dq_bad_inflow"),
        F.col("h.dq_bad_rain"),

        F.current_timestamp().alias("_silver_ts")
    )
)

# COMMAND ----------

(
    df_silver.write
    .format("delta")
    .mode("overwrite")
    .partitionBy("plant_id")
    .saveAsTable(tabla_silver)
)

print(f"OK: {tabla_silver}")