# Etape 1.3 : Agregations avancees Spark

**Livrables** :
- Ce notebook `03_agregations_spark.ipynb`
- Table agregee `03_consommations_agregees.parquet`
- Requetes Spark SQL demonstrant l'utilisation de la vue 

---
---

## Imports

In [1]:
import sys
import os
import platform
from pathlib import Path
import time
from datetime import datetime
import psutil
from pyspark.sql import SparkSession, functions as F, Window

---

## (optionnel) Enregistrement de la date de la dernière execution de ce notebook

In [2]:
print(f"- Date de la dernière execution de ce notebook : {datetime.now().strftime('%d/%m/%Y %H:%M:%S')} (FR)")

- Date de la dernière execution de ce notebook : 20/02/2026 20:32:09 (FR)


---

## (optionnel) Mesure du temps de traitement global pour ce script - enregistrement de l'heure de début + estimation instantanée des ressources machine libres

In [3]:
## Heure de début
start_time_03 = time.time()

## Machine: current available RAM (in GB)
ram_available_03 = psutil.virtual_memory().available / (1024**3)

## Machine: current available CPU
logical = psutil.cpu_count()
physical = psutil.cpu_count(logical=False) or logical

cpu_used = psutil.cpu_percent(interval=2)
cpu_available_pct_03 = 100 - cpu_used

available_logical_03 = logical * cpu_available_pct_03 / 100
available_physical_03 = physical * cpu_available_pct_03 / 100

## Show available resources
print(f"- Current machine RAM available : {ram_available_03:.2f} GB")
print(f"- Current machine CPU available : {cpu_available_pct_03:.1f}%")
print(f"    Approx logical cores free  : {available_logical_03:.2f}")
print(f"    Approx physical cores free : {available_physical_03:.2f}")

- Current machine RAM available : 8.31 GB
- Current machine CPU available : 71.9%
    Approx logical cores free  : 11.50
    Approx physical cores free : 5.75


---

## Chemins des données

In [4]:
# ==============================================================================================================
#                                                   OUTPUTS
# ==============================================================================================================
OUT_DIR = (Path.cwd() / ".." / "output").resolve()
OUT_CONSO_AGG_PARQUET =  os.path.join(OUT_DIR, "03_consommations_agregees.parquet")

# ==============================================================================================================
#                                                   INPUTS
# ==============================================================================================================
IN_DIR = (Path.cwd() / ".." / "data").resolve()
IN_BAT_CSV = os.path.join(IN_DIR, "batiments.csv")
IN_CONSO_CLEAN_PARQUET = os.path.join(OUT_DIR, "02_consommations_clean") # parquet partitionné

# ==============================================================================================================
#                                                   OTHERS
# ==============================================================================================================
TMP_DIR = (Path.cwd() / ".." / "my_tmp").resolve()
TMP_FILE_TXT = TMP_DIR / "tmp_03_resources.txt" # Enregistrer les metrics pour ce script

---

## Création d'une session Spark locale

In [5]:
# --- Creer une session Spark locale
spark = SparkSession.builder \
    .appName("ECF 2 - ENERGIE - Etape 1.3 : Agregations avancees Spark") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "8") \
    .getOrCreate()

# Reduire les logs
spark.sparkContext.setLogLevel("WARN")

# Affichage de l'actuelle version de Spark & de l'url de Spark UI ainsi que python & java
print(f"Spark version  : {spark.version}")
print(f"Spark UI       : {spark.sparkContext.uiWebUrl}")
print()
print(f"Hadoop version : {spark._jvm.org.apache.hadoop.util.VersionInfo.getVersion()}")
print(f"Hadoop home    : {spark._jvm.java.lang.System.getenv('HADOOP_HOME')}")
print()
print(f"Python version : {platform.python_version()}")
print(f"Python path    : { sys.executable}")
print()
print(f"Java version   : {spark._jvm.java.lang.System.getProperty('java.version')}")
print(f"Java home      : { spark._jvm.java.lang.System.getProperty('java.home')}")

# Check du module winutils.exe
print()
if os.path.exists(os.path.join(os.environ["HADOOP_HOME"], "bin", "winutils.exe")):
        print("[ok]: Hadoop module 'winutils.exe' exists at path:", os.path.join(os.environ["HADOOP_HOME"], "bin", "winutils.exe"))
else:
        print("[ko]: Required hadoop module 'winutils.exe' do not exists")
        sys.exit(1)

Spark version  : 3.5.7
Spark UI       : http://joel:4041

Hadoop version : 3.3.4
Hadoop home    : C:\Users\joel\data-info\dev-tools\data-tools\hadoop

Python version : 3.11.9
Python path    : C:\Users\joel\Downloads\ecf_energie\.venv\Scripts\python.exe

Java version   : 11.0.28
Java home      : C:\Users\joel\data-info\dev-tools\languages\java\jdk-11

[ok]: Hadoop module 'winutils.exe' exists at path: C:\Users\joel\data-info\dev-tools\data-tools\hadoop\bin\winutils.exe


---

## Chargement des données

In [6]:
# --- Chargement du référentil bâtiments
df_bat = (spark.read
          .option("header", True)
          .option("inferSchema", True)
          .csv(IN_BAT_CSV))

# Information sur le nombre de lignes et de colonnes
print("df_bat :")
print(f"- Nombre de lignes : {df_bat.count():,}")
print("- Nombre de colonnes :", len(df_bat.columns))
print("- Schema :")
df_bat.printSchema()
print("- Apperçu :")
df_bat.show(5, truncate=False)

df_bat :


- Nombre de lignes : 146
- Nombre de colonnes : 8
- Schema :
root
 |-- batiment_id: string (nullable = true)
 |-- nom: string (nullable = true)
 |-- type: string (nullable = true)
 |-- commune: string (nullable = true)
 |-- surface_m2: integer (nullable = true)
 |-- annee_construction: integer (nullable = true)
 |-- classe_energetique: string (nullable = true)
 |-- nb_occupants_moyen: integer (nullable = true)

- Apperçu :
+-----------+-------------------+-----------+-------+----------+------------------+------------------+------------------+
|batiment_id|nom                |type       |commune|surface_m2|annee_construction|classe_energetique|nb_occupants_moyen|
+-----------+-------------------+-----------+-------+----------+------------------+------------------+------------------+
|BAT0001    |Ecole Paris 1      |ecole      |Paris  |1926      |1978              |E                 |225               |
|BAT0002    |Ecole Paris 2      |ecole      |Paris  |1156      |2004              |C 

In [7]:
# --- Chargement des données de consommation clean (à partir du parquet consommations_clean)
df_conso = spark.read.parquet(IN_CONSO_CLEAN_PARQUET)

# Information sur le schema, nombre de lignes et de colonnes
print("df_conso :")
print(f"- Nombre de lignes : {df_conso.count():,}")
print("- Nombre de colonnes :", len(df_conso.columns))
print("- Schema :")
df_conso.printSchema()
print("- Apperçu :")
df_conso.show(5, truncate=False)


df_conso :


- Nombre de lignes : 991
- Nombre de colonnes : 6
- Schema :
root
 |-- batiment_id: string (nullable = true)
 |-- heure: integer (nullable = true)
 |-- consommation_moyenne: double (nullable = true)
 |-- unite: string (nullable = true)
 |-- date: date (nullable = true)
 |-- type_energie: string (nullable = true)

- Apperçu :


+-----------+-----+--------------------+-----+----------+------------+
|batiment_id|heure|consommation_moyenne|unite|date      |type_energie|
+-----------+-----+--------------------+-----+----------+------------+
|BAT0097    |21   |250.98              |kWh  |2024-09-17|gaz         |
|BAT0106    |21   |178.72              |kWh  |2024-09-17|gaz         |
|BAT0141    |21   |199.88              |kWh  |2024-09-17|gaz         |
|BAT0081    |0    |30.93               |kWh  |2023-11-12|gaz         |
|BAT0081    |10   |60.31               |kWh  |2023-11-12|gaz         |
+-----------+-----+--------------------+-----+----------+------------+
only showing top 5 rows



---

## Joindre consommations avec referentiel batiments

In [8]:
df_join = df_conso.join(df_bat, on="batiment_id", how="left").select(
    "batiment_id", 
    "nom", 
    "type", 
    "surface_m2", 
    "nb_occupants_moyen", 
    "commune", 
    "annee_construction", 
    "classe_energetique", 
    "date", 
    "heure",
    "type_energie", 
    "consommation_moyenne", 
    "unite")

# Information sur le schema, nombre de lignes et de colonnes
print("df_join :")
print(f"- Nombre de lignes : {df_join.count():,}")
print("- Nombre de colonnes :", len(df_join.columns))
print("- Schema :")
df_join.printSchema()
print("- Apperçu :")
df_join.show(5, truncate=False)

# Contrôle qualité jointure
print()
print("Contrôle qualité jointure :")
unmatched = df_join.filter(F.col("nom").isNull()).select("batiment_id").distinct().count()
if unmatched == 0:
    print("[ok]: Batiments non matchés (id inconnus) :", unmatched)
else:
    print("[ko]: Batiments non matchés (id inconnus) :", unmatched)

df_join :


- Nombre de lignes : 991
- Nombre de colonnes : 13
- Schema :
root
 |-- batiment_id: string (nullable = true)
 |-- nom: string (nullable = true)
 |-- type: string (nullable = true)
 |-- surface_m2: integer (nullable = true)
 |-- nb_occupants_moyen: integer (nullable = true)
 |-- commune: string (nullable = true)
 |-- annee_construction: integer (nullable = true)
 |-- classe_energetique: string (nullable = true)
 |-- date: date (nullable = true)
 |-- heure: integer (nullable = true)
 |-- type_energie: string (nullable = true)
 |-- consommation_moyenne: double (nullable = true)
 |-- unite: string (nullable = true)

- Apperçu :


+-----------+----------------------+-----------+----------+------------------+-----------+------------------+------------------+----------+-----+------------+--------------------+-----+
|batiment_id|nom                   |type       |surface_m2|nb_occupants_moyen|commune    |annee_construction|classe_energetique|date      |heure|type_energie|consommation_moyenne|unite|
+-----------+----------------------+-----------+----------+------------------+-----------+------------------+------------------+----------+-----+------------+--------------------+-----+
|BAT0097    |Mediatheque Nice 97   |mediatheque|1186      |41                |Nice       |1980              |G                 |2024-09-17|21   |gaz         |250.98              |kWh  |
|BAT0106    |Mairie Rennes 106     |mairie     |1272      |77                |Rennes     |1983              |G                 |2024-09-17|21   |gaz         |178.72              |kWh  |
|BAT0141    |Mediatheque Toulon 141|mediatheque|891       |40         

[ok]: Batiments non matchés (id inconnus) : 0


---

## Calculer l'intensite energetique (kWh/m2)
``kWh/m²`` n’a du sens que pour l’électricité + gaz (pas pour l’eau en m³).
=> Donc on calcule ``intensite_kwh_m2`` uniquement sur electricite et gaz.

In [9]:
df = df_join.withColumn(
    "intensite_kwh_m2",
    F.when(F.col("type_energie").isin("electricite", "gaz"),
        F.round(F.col("consommation_moyenne") / F.col("surface_m2"), 3),
    )
    .otherwise(F.lit(None))
)

# Information sur le schema, nombre de lignes et de colonnes
print("df :")
print(f"- Nombre de lignes : {df.count():,}")
print("- Nombre de colonnes :", len(df.columns))
print("- Schema :")
df.printSchema()
print("- Apperçu :")
df.show(500, truncate=False)

df :


- Nombre de lignes : 991
- Nombre de colonnes : 14
- Schema :
root
 |-- batiment_id: string (nullable = true)
 |-- nom: string (nullable = true)
 |-- type: string (nullable = true)
 |-- surface_m2: integer (nullable = true)
 |-- nb_occupants_moyen: integer (nullable = true)
 |-- commune: string (nullable = true)
 |-- annee_construction: integer (nullable = true)
 |-- classe_energetique: string (nullable = true)
 |-- date: date (nullable = true)
 |-- heure: integer (nullable = true)
 |-- type_energie: string (nullable = true)
 |-- consommation_moyenne: double (nullable = true)
 |-- unite: string (nullable = true)
 |-- intensite_kwh_m2: double (nullable = true)

- Apperçu :


+-----------+-----------------------------+-----------+----------+------------------+-------------+------------------+------------------+----------+-----+------------+--------------------+-----+----------------+
|batiment_id|nom                          |type       |surface_m2|nb_occupants_moyen|commune      |annee_construction|classe_energetique|date      |heure|type_energie|consommation_moyenne|unite|intensite_kwh_m2|
+-----------+-----------------------------+-----------+----------+------------------+-------------+------------------+------------------+----------+-----+------------+--------------------+-----+----------------+
|BAT0097    |Mediatheque Nice 97          |mediatheque|1186      |41                |Nice         |1980              |G                 |2024-09-17|21   |gaz         |250.98              |kWh  |0.212           |
|BAT0106    |Mairie Rennes 106            |mairie     |1272      |77                |Rennes       |1983              |G                 |2024-09-17|21  

---

## Identifier les batiments hors norme (>3x la mediane de leur categorie)
'Catégorie' => type de bâtiment (ecole, mairie, etc.).
On calcule la médiane de l’intensité par type et type_energie, puis on marque hors norme.

=> En Spark, médiane = percentile_approx(col, 0.5) (standard).

In [10]:
# --- Calcule de la médiane par catégorie
df_median = (
    df
    .filter(F.col("intensite_kwh_m2").isNotNull())
    .groupBy("type") # catégorie
    .agg(
        F.expr("percentile_approx(intensite_kwh_m2, 0.5)").alias("mediane_intensite")
    )
)

# --- Joindre la médiane au dataframe
df_with_mediane = df.join(df_median, on="type", how="left")

# --- Identification des bâtiments hors norme
df_outliers = df_with_mediane.filter(
    F.col("intensite_kwh_m2") > 3 * F.col("mediane_intensite")
)

# --- Affichage des résultats
print("- Nombre de batiments hors norme (>3x la mediane de leur categorie) :", df_outliers.count())
print("- Apperçu de ces batiments hors normes :")
df_outliers.show()

- Nombre de batiments hors norme (>3x la mediane de leur categorie) : 97
- Apperçu de ces batiments hors normes :


+-----------+-----------+--------------------+----------+------------------+----------+------------------+------------------+----------+-----+------------+--------------------+-----+----------------+-----------------+
|       type|batiment_id|                 nom|surface_m2|nb_occupants_moyen|   commune|annee_construction|classe_energetique|      date|heure|type_energie|consommation_moyenne|unite|intensite_kwh_m2|mediane_intensite|
+-----------+-----------+--------------------+----------+------------------+----------+------------------+------------------+----------+-----+------------+--------------------+-----+----------------+-----------------+
|mediatheque|    BAT0021|Mediatheque Marse...|      1317|                76| Marseille|              1978|                 G|2024-07-11|    8| electricite|               427.5|  kWh|           0.325|            0.106|
|    piscine|    BAT0043| Piscine Bordeaux 43|      2279|               148|  Bordeaux|              1962|                 G|202

---

## Calculer les totaux par commune et par type de batiment

In [11]:
df_totaux = (
    df
    .groupBy("commune", "type", "type_energie", "unite")
    .agg(
        F.round(F.sum("consommation_moyenne"), 3).alias("consommation_totale"),
        F.countDistinct("batiment_id").alias("nb_batiments")
    )
).select(
    "commune",
    "type",
    "nb_batiments",
    "type_energie",
    "consommation_totale",
    "unite"
)

print("- Apperçu des totaux par commune et type de bâtiment :")
df_totaux.show()
print(f"(Nombre total de lignes : {df_totaux.count()})")

- Apperçu des totaux par commune et type de bâtiment :


+-------------+-----------+------------+------------+-------------------+-----+
|      commune|       type|nb_batiments|type_energie|consommation_totale|unite|
+-------------+-----------+------------+------------+-------------------+-----+
|       Nantes|     mairie|           4| electricite|             602.88|  kWh|
|        Reims|      ecole|           1|         gaz|            1123.17|  kWh|
|         Lyon|      ecole|           4| electricite|            2540.14|  kWh|
|       Nantes|     mairie|           3|         gaz|             737.64|  kWh|
|         Lyon|      ecole|           4|         gaz|            1907.55|  kWh|
|        Lille|    piscine|           3|         eau|            1822.75|   m3|
|     Le Havre|      ecole|           2| electricite|             583.75|  kWh|
|       Toulon|    piscine|           6|         eau|            2144.23|   m3|
|         Nice|     mairie|           2|         gaz|             211.25|  kWh|
|   Strasbourg|    gymnase|           3|

(Nombre total de lignes : 193)


---

## Creer une vue SQL exploitable

In [12]:
# Création d’une vue SQL permettant l’exploration des consommations agrégées par commune et type de bâtiment.
df_totaux.createOrReplaceTempView("vw_totaux_commune_type")

# démo :
spark.sql("""
SELECT *
FROM vw_totaux_commune_type
ORDER BY consommation_totale DESC
LIMIT 10;
""").show(truncate=False)

+-------------+-------+------------+------------+-------------------+-----+
|commune      |type   |nb_batiments|type_energie|consommation_totale|unite|
+-------------+-------+------------+------------+-------------------+-----+
|Toulon       |piscine|6           |gaz         |15555.22           |kWh  |
|Toulon       |piscine|6           |electricite |14789.04           |kWh  |
|Lille        |piscine|3           |gaz         |11984.68           |kWh  |
|Le Havre     |piscine|4           |electricite |9974.18            |kWh  |
|Le Havre     |piscine|4           |gaz         |9792.95            |kWh  |
|Bordeaux     |piscine|1           |gaz         |8305.15            |kWh  |
|Paris        |piscine|1           |gaz         |6228.5             |kWh  |
|Saint-Etienne|piscine|2           |gaz         |5468.66            |kWh  |
|Lyon         |gymnase|3           |gaz         |4254.1             |kWh  |
|Nice         |piscine|2           |gaz         |4131.8             |kWh  |
+-----------

---

## Table agregee consommations_agregees.parquet

In [13]:
df_totaux.coalesce(1).write.mode("overwrite").parquet(OUT_CONSO_AGG_PARQUET)
print(f"[ok]: Creation avec succès de la table agregee consommations_agregees.parquet : {OUT_CONSO_AGG_PARQUET}")

[ok]: Creation avec succès de la table agregee consommations_agregees.parquet : C:\Users\joel\Downloads\ecf_energie\output\03_consommations_agregees.parquet


---

In [14]:
# --- Enregistrement du temps d'execution de ce script
temps_execution_03 = time.time() - start_time_03

---

## Fermer la session Spark

In [15]:
spark.stop()

---

## Optionel : enregistrement dans un fichier temporaire du temps d'execution + ressources pour utilisation ultérieure (dans le script run_pipeline_hybride.py ou autres)

In [16]:
temps_resources = f"""
    Date : {datetime.now().strftime("%d/%m/%Y %H:%M:%S")} (FR)

    temps_exec_sec={temps_execution_03:.2f}
    ram_gb={ram_available_03:.2f}
    cpu_pct={cpu_available_pct_03:.2f}
    logi_cores={available_logical_03:.1f}
    physi_cores={available_physical_03:.1f}
"""

# Ecrire des données du temps d'execution + ressources dans le fichier TMP_FILE_TXT
TMP_FILE_TXT.write_text(temps_resources, encoding="utf-8")

137