# Notebook 5 : Gold 1 - Agr√©gations Business (PySpark)

**Dur√©e** : 10 minutes  
**Lakehouse** : Lakehouse_gold  
**Objectif** : Cr√©er tables business-ready pour Power BI

## Cellule 1 : Import et chargement donn√©es Silver

In [None]:
from pyspark.sql import functions as F

# Charger Silver enriched
df_enriched = spark.table("Lakehouse_silver.silver.consumption_enriched")

# Charger pr√©dictions
df_predictions = spark.table("Lakehouse_silver.silver.consumption_predictions")

# Jointure
df = df_enriched.join(
    df_predictions.select("hour", "site_id", F.col("prediction").alias("predicted_consumption_mw")),
    on=["hour", "site_id"],
    how="left"
).withColumn(
    "prediction_error_mw",
    F.abs(F.col("avg_consumption_mw") - F.col("predicted_consumption_mw"))
)

print(f"üìä Donn√©es Silver combin√©es : {df.count()} lignes")
df.show(5)

## Cellule 2 : Agr√©gation quotidienne par site

In [None]:
df_daily = df.groupBy(
    F.to_date("hour").alias("date"),
    "site_id",
    "site_type",
    "capacity_mw"
).agg(
    F.sum("avg_consumption_mw").alias("total_consumption_mwh"),
    F.avg("avg_consumption_mw").alias("avg_consumption_mw"),
    F.max("max_consumption_mw").alias("peak_consumption_mw"),
    F.avg("baseline_7d_mw").alias("avg_baseline_mw"),
    F.avg("predicted_consumption_mw").alias("avg_predicted_mw"),
    F.avg("prediction_error_mw").alias("avg_prediction_error_mw"),
    F.avg("price_eur_mwh").alias("avg_price_eur_mwh"),
    F.max("price_eur_mwh").alias("peak_price_eur_mwh"),
    F.avg("temperature_c").alias("avg_temperature_c"),
    F.sum(F.when(F.col("anomaly") == "‚ö†Ô∏è ANOMALIE", 1).otherwise(0)).alias("nb_anomalies"),
    F.count("*").alias("nb_measurements")
).orderBy(F.desc("date"), "site_id")

df_daily.write.mode("overwrite").format("delta").saveAsTable("gold.daily_consumption_by_site")
print(f"‚úÖ Table gold.daily_consumption_by_site cr√©√©e : {df_daily.count()} lignes")
df_daily.show(10)

## Cellule 3 : KPIs par site

In [None]:
df_kpis = df_daily.groupBy(
    "site_id",
    "site_type",
    "capacity_mw"
).agg(
    F.sum("total_consumption_mwh").alias("total_consumption_month_mwh"),
    F.avg("avg_consumption_mw").alias("avg_consumption_mw"),
    F.round(F.avg("avg_consumption_mw") / F.col("capacity_mw") * 100, 2).alias("load_factor_pct"),
    F.round(F.sum("nb_anomalies") * 100.0 / F.sum("nb_measurements"), 2).alias("anomaly_rate_pct"),
    F.round(F.avg("avg_prediction_error_mw"), 3).alias("avg_prediction_error_mw"),
    F.round(F.avg("avg_prediction_error_mw") / F.avg("avg_consumption_mw") * 100, 1).alias("prediction_error_pct"),
    F.round(F.avg("avg_price_eur_mwh"), 2).alias("avg_electricity_price_eur_mwh"),
    F.countDistinct("date").alias("nb_days")
).orderBy(F.desc("total_consumption_month_mwh"))

# Ajouter region et flexible depuis enriched
df_sites_ref = spark.table("Lakehouse_bronze.bronze.sites_reference")
df_kpis = df_kpis.join(df_sites_ref.select("site_id", "region", "flexible"), on="site_id", how="left")

df_kpis.write.mode("overwrite").format("delta").saveAsTable("gold.site_kpis")
print(f"‚úÖ Table gold.site_kpis cr√©√©e : {df_kpis.count()} lignes")
df_kpis.show()

## Cellule 4 : Agr√©gation par type de site

In [None]:
df_by_type = df.groupBy(
    F.to_date("hour").alias("date"),
    "site_type"
).agg(
    F.sum("avg_consumption_mw").alias("total_consumption_mwh"),
    F.avg("avg_consumption_mw").alias("avg_consumption_mw"),
    F.sum("capacity_mw").alias("total_capacity_mw"),
    F.round(F.sum("avg_consumption_mw") / F.sum("capacity_mw") * 100, 2).alias("load_factor_pct"),
    F.avg("price_eur_mwh").alias("avg_price_eur_mwh"),
    F.countDistinct("site_id").alias("nb_sites")
).orderBy(F.desc("date"), "site_type")

df_by_type.write.mode("overwrite").format("delta").saveAsTable("gold.consumption_by_site_type")
print(f"‚úÖ Table gold.consumption_by_site_type cr√©√©e : {df_by_type.count()} lignes")
df_by_type.show(10)

## Cellule 5 : Opportunit√©s effacement (Demand Response)

In [None]:
df_curtailment = df.filter(F.col("flexible") == True).filter(F.col("avg_consumption_mw") > 0.5)

df_curtailment = df_curtailment.withColumn(
    "ratio_vs_baseline",
    F.round(F.col("avg_consumption_mw") / F.col("baseline_7d_mw"), 2)
).withColumn(
    "curtailment_potential_mw",
    F.round(F.col("avg_consumption_mw") * 0.3, 3)
).withColumn(
    "potential_gain_eur",
    F.when(
        F.col("price_eur_mwh") > F.col("curtailment_price_eur_mwh"),
        F.round((F.col("avg_consumption_mw") * 0.3) * (F.col("price_eur_mwh") - F.col("curtailment_price_eur_mwh")), 2)
    ).otherwise(0)
).withColumn(
    "action_signal",
    F.when((F.col("price_eur_mwh") > 300) & (F.col("flexible") == True), "üî¥ EFFACEMENT MAX")
     .when((F.col("price_eur_mwh") > 200) & (F.col("flexible") == True), "üü† EFFACEMENT PARTIEL")
     .when((F.col("price_eur_mwh") < 50) & (F.col("flexible") == True), "üü¢ CONSOMMER")
     .otherwise("Aucune action")
)

df_curtailment = df_curtailment.select(
    "hour", "site_id", "site_type", "flexible", "curtailment_price_eur_mwh",
    "avg_consumption_mw", "baseline_7d_mw", "ratio_vs_baseline",
    "price_eur_mwh", "curtailment_potential_mw", "potential_gain_eur", "action_signal"
).orderBy(F.desc("potential_gain_eur"))

df_curtailment.write.mode("overwrite").format("delta").saveAsTable("gold.curtailment_opportunities")
print(f"‚úÖ Table gold.curtailment_opportunities cr√©√©e : {df_curtailment.count()} lignes")
df_curtailment.filter("potential_gain_eur > 0").show(10)

## Cellule 6 : Pivot mensuel pour Power BI

In [None]:
df_monthly = df_daily.groupBy(
    "site_id",
    "site_type",
    F.year("date").alias("year"),
    F.month("date").alias("month")
).agg(
    F.sum("total_consumption_mwh").alias("total_mwh"),
    F.avg(F.col("avg_consumption_mw") / F.col("capacity_mw") * 100).alias("avg_load_factor_pct"),
    F.avg("avg_price_eur_mwh").alias("avg_price_eur_mwh"),
    F.sum("nb_anomalies").alias("total_anomalies")
).orderBy(F.desc("year"), F.desc("month"), "site_id")

df_monthly.write.mode("overwrite").format("delta").saveAsTable("gold.monthly_consumption_pivot")
print(f"‚úÖ Table gold.monthly_consumption_pivot cr√©√©e : {df_monthly.count()} lignes")
df_monthly.show()

## Cellule 7 : Table alertes actives

In [None]:
df_alerts = df.filter(
    (F.col("anomaly") == "‚ö†Ô∏è ANOMALIE") |
    (F.col("price_eur_mwh") > 300) |
    (F.col("prediction_error_mw") > 1.0)
)

df_alerts = df_alerts.withColumn(
    "alert_type",
    F.when(F.col("anomaly") == "‚ö†Ô∏è ANOMALIE", "Consommation anormale")
     .when(F.col("price_eur_mwh") > 300, "Prix spot tr√®s √©lev√©")
     .when(F.col("prediction_error_mw") > 1.0, "Erreur pr√©diction √©lev√©e")
     .otherwise("Autre")
).withColumn(
    "priority",
    F.when((F.col("anomaly") == "‚ö†Ô∏è ANOMALIE") & (F.col("price_eur_mwh") > 300), "CRITIQUE")
     .when((F.col("anomaly") == "‚ö†Ô∏è ANOMALIE") | (F.col("price_eur_mwh") > 300), "HAUTE")
     .when(F.col("prediction_error_mw") > 1.0, "MOYENNE")
     .otherwise("BASSE")
)

df_alerts = df_alerts.select(
    F.col("hour").alias("alert_time"),
    "site_id", "site_type", "alert_type",
    "avg_consumption_mw", "baseline_7d_mw", "predicted_consumption_mw",
    "price_eur_mwh", "priority"
).orderBy(
    F.when(F.col("priority") == "CRITIQUE", 1)
     .when(F.col("priority") == "HAUTE", 2)
     .when(F.col("priority") == "MOYENNE", 3)
     .otherwise(4),
    F.desc("alert_time")
)

df_alerts.write.mode("overwrite").format("delta").saveAsTable("gold.active_alerts")
print(f"‚úÖ Table gold.active_alerts cr√©√©e : {df_alerts.count()} lignes")
df_alerts.filter("priority IN ('CRITIQUE', 'HAUTE')").show(10)

## Cellule 8 : R√©sum√© des tables Gold

In [None]:
tables = [
    "gold.daily_consumption_by_site",
    "gold.site_kpis",
    "gold.consumption_by_site_type",
    "gold.curtailment_opportunities",
    "gold.monthly_consumption_pivot",
    "gold.active_alerts"
]

print("\nüìä TABLES GOLD CR√â√âES :")
print("="*50)
for table in tables:
    count = spark.table(f"Lakehouse_gold.{table}").count()
    print(f"{table}: {count:,} lignes")
print("="*50)

## Cellule 9 : R√©sum√©

### ‚úÖ Gold Agr√©gations termin√©

**Tables business cr√©√©es** :
1. **daily_consumption_by_site** : Consommation quotidienne par site
2. **site_kpis** : KPIs par site (facteur charge, anomalies, pr√©dictibilit√©)
3. **consumption_by_site_type** : Agr√©gation par type (Industrie, Commercial, R√©sidentiel)
4. **curtailment_opportunities** : Opportunit√©s d'effacement avec gains potentiels
5. **monthly_consumption_pivot** : Vue mensuelle pour Power BI
6. **active_alerts** : Alertes actives (anomalies, prix √©lev√©s)

**üí° PySpark pour Gold ?**
- ‚úÖ PySpark utilis√© pour coh√©rence et acc√®s cross-lakehouse
- ‚úÖ M√™me fonctionnalit√©s qu'en SQL mais plus flexible
- ‚úÖ Pas de probl√®me TEMP VIEW ou notation 3 parties

**Pr√™t pour** :
- üìä Connexion Power BI (DirectLake sur tables Gold)
- üìà Dashboards business
- üìß Alerting automatique

‚û°Ô∏è **Prochaine √©tape** : Visualisation finale (Notebook 6)