# Etape 1.3 : Agregations avancees Spark

## Joindre consommations avec référentiel bâtiments

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import os

# Création Sesssion Spark 

spark = SparkSession.builder \
    .appName("ECF2 - 03 Agregations Spark") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "8") \
    .config("spark.hadoop.io.native.lib.available", "false") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

DATA_DIR = "../data_ecf"
OUTPUT_DIR = "../output"

CLEAN_PATH = "../output/consommations_clean"  # parquet partitionné
BUILDINGS_PATH = "../data_ecf/batiments.csv"

# Chargement des consommations propres et des bâtiments 

df_conso = spark.read.parquet(CLEAN_PATH)
df_buildings = spark.read.option("header", True).option("inferSchema", True).csv(BUILDINGS_PATH)

df_conso.printSchema()
df_buildings.printSchema()

print(df_conso.count(), df_buildings.count())


# Jointure consommations et bâtiments 

df_join = (
    df_conso
    .join(
        df_buildings.select(
            "batiment_id", "nom", "type", "commune",
            "surface_m2", "annee_construction", "classe_energetique", "nb_occupants_moyen"
        ),
        on="batiment_id",
        how="left"
    )
)

# Audit rapide de jointure (doit être quasi 0)
missing_ref = df_join.filter(F.col("commune").isNull()).count()
print("Lignes sans correspondance batiment:", missing_ref)


root
 |-- batiment_id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- conso_clean: double (nullable = true)
 |-- unite: string (nullable = true)
 |-- date: date (nullable = true)
 |-- type_energie: string (nullable = true)

root
 |-- batiment_id: string (nullable = true)
 |-- nom: string (nullable = true)
 |-- type: string (nullable = true)
 |-- commune: string (nullable = true)
 |-- surface_m2: integer (nullable = true)
 |-- annee_construction: integer (nullable = true)
 |-- classe_energetique: string (nullable = true)
 |-- nb_occupants_moyen: integer (nullable = true)

100000 146
Lignes sans correspondance batiment: 0


## Calculer l'intensité energetique (kWh/m2)

In [3]:
df_enriched = (
    df_join
    .withColumn("surface_m2", F.col("surface_m2").cast("double"))
    .withColumn("conso_clean", F.col("conso_clean").cast("double"))
    .withColumn(
        "intensite_par_m2",
        F.when((F.col("surface_m2") > 0) & (F.col("type_energie").isin("electricite", "gaz")),
               F.col("conso_clean") / F.col("surface_m2"))
         .when((F.col("surface_m2") > 0) & (F.col("type_energie") == "eau"),
               F.col("conso_clean") / F.col("surface_m2"))
         .otherwise(None)
    )
    .withColumn(
        "unite_intensite",
        F.when(F.col("type_energie").isin("electricite", "gaz"), F.lit("kWh/m2"))
         .when(F.col("type_energie") == "eau", F.lit("m3/m2"))
         .otherwise(F.lit(None))
    )
)

df_enriched.select("batiment_id","type_energie","conso_clean","surface_m2","intensite_par_m2","unite_intensite").show(5)


+-----------+------------+-----------+----------+--------------------+---------------+
|batiment_id|type_energie|conso_clean|surface_m2|    intensite_par_m2|unite_intensite|
+-----------+------------+-----------+----------+--------------------+---------------+
|    BAT0001| electricite|      27.26|    1926.0|0.014153686396677051|         kWh/m2|
|    BAT0001| electricite|     411.17|    1926.0|  0.2134839044652129|         kWh/m2|
|    BAT0002| electricite|      13.67|    1156.0|0.011825259515570934|         kWh/m2|
|    BAT0002| electricite|      16.38|    1156.0| 0.01416955017301038|         kWh/m2|
|    BAT0002| electricite|      16.07|    1156.0|0.013901384083044983|         kWh/m2|
+-----------+------------+-----------+----------+--------------------+---------------+
only showing top 5 rows



## Identifier les batiments hors norme (>3x la mediane de leur categorie)

In [None]:
# Agréger mensuel par bâtiment & énergie

df_monthly_bat = (
    df_enriched
    .withColumn("month", F.date_trunc("month", F.col("timestamp")))
    .groupBy("batiment_id", "nom", "type", "commune", "surface_m2", "type_energie", "month")
    .agg(
        F.sum("conso_clean").alias("conso_month_sum"),
        F.avg("intensite_par_m2").alias("intensite_moy_m2")  # moyenne des intensités horaires
    )
)

df_monthly_bat = df_monthly_bat.withColumn(
    "intensite_month_m2",
    F.when(F.col("surface_m2") > 0, F.col("conso_month_sum") / F.col("surface_m2")).otherwise(None)
)

# Médiane par catégorie et par énergie

df_median_cat = (
    df_monthly_bat
    .groupBy("type", "type_energie")
    .agg(F.expr("percentile_approx(intensite_month_m2, 0.5)").alias("median_intensite"))
)

# Flag hors norme

df_outliers = (
    df_monthly_bat
    .join(df_median_cat, on=["type", "type_energie"], how="left")
    .withColumn(
        "hors_norme",
        F.when(
            (F.col("median_intensite").isNotNull()) &
            (F.col("intensite_month_m2") > 3 * F.col("median_intensite")),
            F.lit(True)
        ).otherwise(F.lit(False))
    )
)

df_outliers.filter("hors_norme = true").orderBy(F.desc("intensite_month_m2")).show(20, truncate=False)

## Calculer les totaux par commune et par type de bâtiment

In [None]:
df_totaux_commune_type = (
    df_monthly_bat
    .groupBy("commune", "type", "type_energie", "month")
    .agg(
        F.sum("conso_month_sum").alias("conso_total_month"),
        F.avg("intensite_month_m2").alias("intensite_moy_month_m2")
    )
    .orderBy("month", "commune", "type")
)

df_totaux_commune_type.show(10)

## Créer une vue SQL exploitable

In [18]:
# Créer une table agrégée “finale” (celle à exporter)

df_agregee = df_outliers.select(
    "batiment_id", "nom", "type", "commune", "type_energie", "month",
    "conso_month_sum", "intensite_month_m2",
    "median_intensite", "hors_norme"
)

df_agregee.createOrReplaceTempView("vw_consommations_agregees")

# Top 10 bâtiments les plus énergivores (mensuel, électricité)
spark.sql("""
SELECT month, batiment_id, nom, commune, type, conso_month_sum, intensite_month_m2
FROM vw_consommations_agregees
WHERE type_energie = 'electricite'
ORDER BY conso_month_sum DESC
LIMIT 10
""").show(truncate=False)

# Liste des bâtiments hors norme (flag)
spark.sql("""
SELECT month, batiment_id, nom, commune, type, type_energie,
       intensite_month_m2, median_intensite
FROM vw_consommations_agregees
WHERE hors_norme = true
ORDER BY intensite_month_m2 DESC
""").show(50, truncate=False)

# Totaux par commune (tous types de bâtiments)
spark.sql("""
SELECT month, commune, type_energie,
       SUM(conso_month_sum) AS conso_total_month
FROM vw_consommations_agregees
GROUP BY month, commune, type_energie
ORDER BY month, conso_total_month DESC
""").show(20, truncate=False)


+-------------------+-----------+---------------+-------+-------+------------------+------------------+
|month              |batiment_id|nom            |commune|type   |conso_month_sum   |intensite_month_m2|
+-------------------+-----------+---------------+-------+-------+------------------+------------------+
|2024-12-01 00:00:00|BAT0005    |Piscine Paris 5|Paris  |piscine|190623.21000000002|48.71536161512906 |
|2023-11-01 00:00:00|BAT0005    |Piscine Paris 5|Paris  |piscine|183726.38999999998|46.952821364681824|
|2024-01-01 00:00:00|BAT0005    |Piscine Paris 5|Paris  |piscine|164811.71         |42.11901610017889 |
|2024-02-01 00:00:00|BAT0005    |Piscine Paris 5|Paris  |piscine|162138.65000000002|41.43589317659086 |
|2023-12-01 00:00:00|BAT0005    |Piscine Paris 5|Paris  |piscine|160107.26999999996|40.91675696396626 |
|2024-11-01 00:00:00|BAT0005    |Piscine Paris 5|Paris  |piscine|148129.07         |37.855627395859955|
|2023-02-01 00:00:00|BAT0005    |Piscine Paris 5|Paris  |piscine

In [None]:
## Sauvegarder consommations_agregees.parquet

In [23]:
AGG_PATH = os.path.join(OUTPUT_DIR, "consommations_agregees.parquet")

(df_agregee.write
 .mode("overwrite")
 .parquet(AGG_PATH)
)

print("Table agrégée écrite :", AGG_PATH)

Table agrégée écrite : ../output/consommations_agregees.parquet
