In [None]:
CATALOG = ""  
DB_STG = "stg_scada"
DB_REF = "ref"
DB_CUR = "cur"
DB_FACT5 = "fact5m"
DB_MART = "mart"
DB_QA = "qa"
DB_DIM = "dim"

TBL_READ = f"{DB_STG}.readings_5m"
TBL_ASSET = f"{DB_STG}.asset_master"
TBL_PC   = f"{DB_REF}.power_curve_wind"     
TBL_POA  = f"{DB_REF}.irradiance_ref"       
TBL_SET  = f"{DB_REF}.dispatch_limits"       
TBL_DT   = f"{DB_REF}.downtimes"             

def qt(t): return f"{CATALOG}.{t}" if CATALOG else t

spark.sql(f"CREATE DATABASE IF NOT EXISTS {DB_FACT5}")
spark.sql(f"CREATE DATABASE IF NOT EXISTS {DB_MART}")
spark.sql(f"CREATE DATABASE IF NOT EXISTS {DB_QA}")
spark.sql(f"CREATE DATABASE IF NOT EXISTS {DB_DIM}")

from pyspark.sql import functions as F, Window as W
from pyspark.sql.types import TimestampType, DoubleType, IntegerType

# -------------------------------
# 0) Parámetros de señales 
# -------------------------------
SIG = {
    "power_kw": ["ActivePower", "AC_Power_kW", "WTG_ActivePower"],
    "energy_kwh": ["Energy_kWh", "AC_Energy_kWh"],   
    "availability_pct": ["Availability", "TechAvailability"],
    "wind_speed_ms": ["WindSpeed_ms", "NacelleWindSpeed"],
    "poa_wm2": ["POA_Wm2"],                 
    "temp_c": ["TempC", "ModuleTemp_C"],
    "status": ["Status", "OperatingState"],
    "reactive_kvar": ["ReactivePower_kVAr"],
    "freq_hz": ["Frequency_Hz"],
    "voltage_kv": ["Voltage_kV"]
}
FIVE_MIN_HOURS = 5.0/60.0  

# -------------------------------
# 1) Lecturas base  y maestro
# -------------------------------
df = (spark.table(qt(TBL_READ))
      .withColumn("ts_utc", F.col("ts_utc").cast(TimestampType()))
      .withColumn("value", F.col("value").cast(DoubleType()))
      .withColumn("signal_lc", F.lower(F.col("signal"))))

assets = spark.table(qt(TBL_ASSET)) \
              .withColumnRenamed("AssetId","AssetId") \
              .withColumn("RatedPower_kW", F.col("RatedPower_kW").cast(DoubleType()))

# helper: filtrar por grupo de señales
def pick(df_in, names):
    names_lc = [n.lower() for n in names]
    return df_in.where(F.col("signal_lc").isin(names_lc))

# -------------------------------
# 2) Reamostrado / completitud a 5-min 
# -------------------------------
min_ts, max_ts = df.agg(F.min("ts_utc"), F.max("ts_utc")).first()
if not min_ts or not max_ts:
    raise ValueError("No hay datos en stg_scada.readings_5m")


from datetime import datetime, timedelta
def daterange(start, end, step_minutes=5):
    while start <= end:
        yield start
        start += timedelta(minutes=step_minutes)


asset_ids = [r["asset_id"] for r in df.select("asset_id").distinct().collect()]
grid_rows = []
for a in asset_ids:
    t = min_ts
    while t <= max_ts:
        grid_rows.append((a, t))
        t += timedelta(minutes=5)
grid_schema = "asset_id string, ts_utc timestamp"
grid = spark.createDataFrame(grid_rows, grid_schema)


pwr = (pick(df, SIG["power_kw"])
       .groupBy("asset_id","ts_utc")
       .agg(F.avg("value").alias("P_kW")))

base = grid.join(pwr, ["asset_id","ts_utc"], "left")

# -------------------------------
# 3) Calidad y *gap handling* ligero
# -------------------------------
# Quality flags:
#   - missing: sin dato
#   - zero_flat: rachas de ceros anómalos durante producción (no de noche, en solar)
#   - outlier: percentil 99.9
#   - clipped: igual al límite de despacho 
#   - maintenance: dentro de ventana de downtime

# a) Missing
base = base.withColumn("q_missing", F.when(F.col("P_kW").isNull(), F.lit(1)).otherwise(F.lit(0)))

# b) Outliers (por asset)
w_asset = W.partitionBy("asset_id")
p999 = base.approxQuantile("P_kW", [0.999], 0.01)[0] if base.filter("P_kW is not null").count()>0 else None
base = base.withColumn("q_outlier", F.when((F.col("P_kW") > F.lit(p999)) & F.col("P_kW").isNotNull(), 1).otherwise(0))

# c) Límite de despacho 
if spark.catalog.tableExists(qt(TBL_SET)):
    setp = spark.table(qt(TBL_SET)).select("asset_id","ts_utc", F.col("Limit_kW").cast(DoubleType()))
    base = base.join(setp, ["asset_id","ts_utc"], "left")
    base = base.withColumn("q_clipped", F.when((F.col("Limit_kW").isNotNull()) & (F.abs(F.col("P_kW")-F.col("Limit_kW")) < 1e-6), 1).otherwise(0))
else:
    base = base.withColumn("Limit_kW", F.lit(None).cast(DoubleType())).withColumn("q_clipped", F.lit(0))

# d) Mantenimiento 
if spark.catalog.tableExists(qt(TBL_DT)):
    dt = spark.table(qt(TBL_DT)).select("asset_id","start_utc","end_utc")

    base = (base.join(dt, "asset_id", "left")
                 .withColumn("q_maint",
                             F.when((F.col("start_utc").isNotNull()) &
                                    (F.col("ts_utc")>=F.col("start_utc")) &
                                    (F.col("ts_utc")<=F.col("end_utc")), 1).otherwise(0))
                 .drop("start_utc","end_utc"))
else:
    base = base.withColumn("q_maint", F.lit(0))


w_ff = W.partitionBy("asset_id").orderBy("ts_utc").rowsBetween(-2, 0)
base = base.withColumn("P_kW_ff", F.first("P_kW", ignorenulls=True).over(w_ff))
base = base.withColumn("P_kW_clean", F.when(F.col("q_missing")==1, F.col("P_kW_ff")).otherwise(F.col("P_kW")))
base = base.withColumn("P_kW_clean", F.when(F.col("q_outlier")==1, None).otherwise(F.col("P_kW_clean")))

# -------------------------------
# 4) Señales adicionales para KPIs
# -------------------------------
avail = (pick(df, SIG["availability_pct"])
         .groupBy("asset_id","ts_utc")
         .agg(F.avg("value").alias("Availability_%")))

wind = (pick(df, SIG["wind_speed_ms"])
        .groupBy("asset_id","ts_utc")
        .agg(F.avg("value").alias("WindSpeed_ms")))

status = (pick(df, SIG["status"])
          .groupBy("asset_id","ts_utc")
          .agg(F.first("value", ignorenulls=True).alias("StatusCode")))

# Irradiancia / temperatura 
if any(spark.catalog.tableExists(qt(t)) for t in [TBL_POA]):
    poa = spark.table(qt(TBL_POA))
    poa_sel = poa.select("asset_id","ts_utc",
                         F.col("POA_Wm2").cast(DoubleType()).alias("POA_Wm2"),
                         F.col("TempC").cast(DoubleType()).alias("TempC"))
else:
    poa_sel = (pick(df, SIG["poa_wm2"])
               .groupBy("asset_id","ts_utc")
               .agg(F.avg("value").alias("POA_Wm2")))
    # temp opcional
    temp = (pick(df, SIG["temp_c"])
            .groupBy("asset_id","ts_utc")
            .agg(F.avg("value").alias("TempC")))
    poa_sel = poa_sel.join(temp, ["asset_id","ts_utc"], "left")

# Unir señales
wide = (base.join(avail, ["asset_id","ts_utc"], "left")
             .join(wind, ["asset_id","ts_utc"], "left")
             .join(poa_sel, ["asset_id","ts_utc"], "left")
             .join(status, ["asset_id","ts_utc"], "left"))

wide = wide.withColumn("E_kWh_5m", F.when(F.col("P_kW_clean").isNotNull(), F.col("P_kW_clean")*FIVE_MIN_HOURS))

# -------------------------------
# 5) KPI por tecnología
# -------------------------------
cur_asset = assets.select("AssetId","AssetType","PlantId","RatedPower_kW")\
                  .withColumnRenamed("AssetId","asset_id")

wide = wide.join(cur_asset, "asset_id", "left")

# a) Capacity Factor (CF) = Pavg / RatedPower
wide = wide.withColumn("CF",
                       F.when(F.col("RatedPower_kW")>0,
                              F.col("P_kW_clean")/F.col("RatedPower_kW")).otherwise(None))

# b) Curtailment kW (si hay Limit_kW)
wide = wide.withColumn("Curtailment_kW",
                       F.when((F.col("Limit_kW").isNotNull()) & (F.col("P_kW_clean").isNotNull()),
                              F.greatest(F.col("Limit_kW")-F.col("P_kW_clean"), F.lit(0.0))).otherwise(None))

# c) Performance Ratio (PR) — Solar:
#    PR_5m = E_meas / (POA_Wm2 * Area * eta_ref * 5min) → simplificamos usando RatedPower como proxy:
#    PR_5m ~= P_meas / (POA_Wm2/1000 * RatedPower_kW)
#    (si POA_Wm2 es None, queda null)
wide = wide.withColumn("PR",
                       F.when((F.col("AssetType").isin("PV","Solar")) &
                              (F.col("POA_Wm2").isNotNull()) &
                              (F.col("RatedPower_kW")>0),
                              F.col("P_kW_clean") / (F.col("POA_Wm2")/1000.0 * F.col("RatedPower_kW")))
                        .otherwise(None))

# d) Wind Performance Index (WPI)
if spark.catalog.tableExists(qt(TBL_PC)):
    pc = spark.table(qt(TBL_PC)).select(
        F.col("WindSpeed").cast(DoubleType()).alias("WindSpeed_ms"),
        F.col("ExpectedPower_kW").cast(DoubleType()).alias("Pexp_kW"),
        "AssetTypeCanonical"
    ).where(F.col("AssetTypeCanonical").isin("Wind"))
    # para unir por bin de velocidad, discretizamos
    wide = wide.withColumn("ws_bin", F.round(F.col("WindSpeed_ms"), 0))
    pc_bin = pc.withColumn("ws_bin", F.round(F.col("WindSpeed_ms"), 0)).drop("WindSpeed_ms")
    wide = (wide.where(F.col("AssetType").isin("WTG","Wind"))
                 .join(pc_bin.drop("AssetTypeCanonical"), ["ws_bin"], "left")
                 .withColumn("WPI", F.when(F.col("Pexp_kW")>0, F.col("P_kW_clean")/F.col("Pexp_kW")).otherwise(None))
                 .drop("ws_bin","Pexp_kW"))
else:
    wide = wide.withColumn("WPI", F.lit(None).cast(DoubleType()))

# e) Technical Availability ajustada 
wide = wide.withColumn("AvailabilityAdj_%",
                       F.when(F.col("q_maint")==1, None).otherwise(F.col("Availability_%")))

# -------------------------------
# 6) Persistir FACT 
# -------------------------------
spark.sql(f"DROP TABLE IF EXISTS {qt(DB_FACT5+'.ElectricalEnergyGeneration_5m')}")
(wide.select(
    "asset_id","PlantId","ts_utc","AssetType","RatedPower_kW",
    "P_kW_clean","E_kWh_5m","Availability_%","AvailabilityAdj_%","WindSpeed_ms",
    "POA_Wm2","TempC","Limit_kW","Curtailment_kW","PR","CF","WPI","StatusCode",
    "q_missing","q_outlier","q_clipped","q_maint"
 ).write.mode("overwrite").saveAsTable(qt(f"{DB_FACT5}.ElectricalEnergyGeneration_5m")))

def save_tech(df_in, tech_list, name):
    sub = df_in.where(F.col("AssetType").isin(*tech_list))
    if sub.limit(1).count()>0:
        spark.sql(f"DROP TABLE IF EXISTS {qt(DB_FACT5+'.'+name)}")
        sub.write.mode("overwrite").saveAsTable(qt(f"{DB_FACT5}.{name}"))

save_tech(wide, ["WTG","Wind"], "WindGeneration_5m")
save_tech(wide, ["PV","Solar"], "SolarGeneration_5m")
save_tech(wide, ["TRANS","Transmission"], "Transmission_5m")

# -------------------------------
# 7) MART de KPIs
# -------------------------------
# 5-min 
kpi_cols = ["asset_id","PlantId","ts_utc","P_kW_clean","E_kWh_5m","CF","PR","WPI",
            "Availability_%","AvailabilityAdj_%","Curtailment_kW","Limit_kW",
            "WindSpeed_ms","POA_Wm2","TempC","StatusCode",
            "q_missing","q_outlier","q_clipped","q_maint"]
kpi5 = wide.select(*kpi_cols)
spark.sql(f"DROP TABLE IF EXISTS {qt(DB_MART+'.KPI_5m')}")
kpi5.write.mode("overwrite").saveAsTable(qt(f"{DB_MART}.KPI_5m"))

# Dim DateId
dim_date = (spark.table(qt(f"{DB_DIM}.Date"))
            if spark.catalog.tableExists(qt(f"{DB_DIM}.Date"))
            else None)
if dim_date is None:
    from pyspark.sql.types import DateType
    dim_date = (kpi5
                .select(F.to_date("ts_utc").alias("Date"))
                .distinct()
                .withColumn("DateId", F.date_format("Date","yyyyMMdd").cast(IntegerType())))
else:
    dim_date = dim_date.select("Date","DateId")

kpi5 = kpi5.withColumn("Date", F.to_date("ts_utc")).join(dim_date, "Date", "left")

# Agregación ponderada 
agg_daily = (kpi5.groupBy("asset_id","PlantId","DateId")
             .agg(F.sum("E_kWh_5m").alias("Energy_kWh"),
                  F.avg("P_kW_clean").alias("Pavg_kW"),
                  F.max("P_kW_clean").alias("Pmax_kW"),
                  F.avg("CF").alias("CF_avg"),
                  F.avg("PR").alias("PR_avg"),
                  F.avg("WPI").alias("WPI_avg"),
                  F.avg("Availability_%").alias("Availability_avg"),
                  F.avg("AvailabilityAdj_%").alias("AvailabilityAdj_avg"),
                  F.sum("Curtailment_kW").alias("Curtailment_kW_sum")))

spark.sql(f"DROP TABLE IF EXISTS {qt(DB_MART+'.KPI_Daily')}")
agg_daily.write.mode("overwrite").saveAsTable(qt(f"{DB_MART}.KPI_Daily"))

# Mensual
kpi5m = kpi5.withColumn("YearMonth", F.date_format("Date","yyyyMM").cast(IntegerType()))
agg_month = (kpi5m.groupBy("asset_id","PlantId","YearMonth")
             .agg(F.sum("E_kWh_5m").alias("Energy_kWh"),
                  F.avg("P_kW_clean").alias("Pavg_kW"),
                  F.max("P_kW_clean").alias("Pmax_kW"),
                  F.avg("CF").alias("CF_avg"),
                  F.avg("PR").alias("PR_avg"),
                  F.avg("WPI").alias("WPI_avg"),
                  F.avg("Availability_%").alias("Availability_avg"),
                  F.avg("AvailabilityAdj_%").alias("AvailabilityAdj_avg"),
                  F.sum("Curtailment_kW").alias("Curtailment_kW_sum")))

spark.sql(f"DROP TABLE IF EXISTS {qt(DB_MART+'.KPI_Monthly')}")
agg_month.write.mode("overwrite").saveAsTable(qt(f"{DB_MART}.KPI_Monthly"))

# -------------------------------
# 8) QA de calidad (porcentaje de puntos afectados)
# -------------------------------
qa = (wide.groupBy("asset_id")
      .agg(F.avg(F.col("q_missing").cast("double")).alias("pct_missing"),
           F.avg(F.col("q_outlier").cast("double")).alias("pct_outlier"),
           F.avg(F.col("q_clipped").cast("double")).alias("pct_clipped"),
           F.avg(F.col("q_maint").cast("double")).alias("pct_maint")))
spark.sql(f"DROP TABLE IF EXISTS {qt(DB_QA+'.KPI_Quality_5m')}")
qa.write.mode("overwrite").saveAsTable(qt(f"{DB_QA}.KPI_Quality_5m"))

print(" FACT 5-min y MART de KPIs generados.")
