#### Etape 1.3 : Agregations avancees Spark
- Joindre consommations avec referentiel batiments

In [1]:
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql import functions as F
from pathlib import Path

DATA_DIR = Path("../data_ecf").resolve()
OUTPUT_DIR = Path("../output").resolve()
CONSOMMATIONS_CLEAN_PATH = OUTPUT_DIR / "consommation_clean"
BATIMENTS_PATH = DATA_DIR / "batiments.csv"
CONSOMMATIONS_AGREGEES_PATH = OUTPUT_DIR / "consommations_agregees"

In [2]:
builder: SparkSession.Builder = SparkSession.builder
spark = (
    builder
    .appName("03_agregations_spark")
    .master("local[*]")
    .config("spark.driver.memory", "2g")
    .config("spark.sql.shuffle.partitions", "8")
    .config("spark.hadoop.fs.hdfs.impl", "org.apache.hadoop.fs.LocalFileSystem")
    .config("spark.hadoop.native.library.disable", "true")
    .getOrCreate()
)

spark.sparkContext.setLogLevel("WARN")

print(f"Spark version: {spark.version}")
print(f"Spark UI: {spark.sparkContext.uiWebUrl}")

Spark version: 3.5.7
Spark UI: http://host.docker.internal:4040


In [3]:
df_pandas = pd.read_parquet(CONSOMMATIONS_CLEAN_PATH.as_posix())
df_consommations = spark.createDataFrame(df_pandas)

print(f"Nombre de lignes dans consommations_clean : {df_consommations.count()}")
print(f"Nombre de colonnes dans consommations_clean : {len(df_consommations.columns)}")

Nombre de lignes dans consommations_clean : 7492584
Nombre de colonnes dans consommations_clean : 9


In [4]:
df_batiments = (
    spark.read.option("header", "true")
    .option("inferSchema", "true")
    .csv(BATIMENTS_PATH.as_posix())
)

print(f"Nombre de lignes dans batiments : {df_batiments.count()}")
print(f"Nombre de colonnes dans batiments : {len(df_batiments.columns)}")

Nombre de lignes dans batiments : 146
Nombre de colonnes dans batiments : 8


In [5]:
# Jointure
df_join = df_consommations.join(df_batiments, on="batiment_id", how="left")

print(f"Nombre de colonnes après jointure : {len(df_join.columns)}")

Nombre de colonnes après jointure : 16


- Calculer l'intensite energetique (kWh/m2)

In [6]:
df_with_intensity = df_join.withColumn(
    "intensite_energetique",
    F.col("consommation") / F.col("surface_m2"),
)

df_with_intensity.show(5)

+-----------+-------------------+------------+-----+----+----+-----+----------+------------+--------------------+-----------+-------+----------+------------------+------------------+------------------+---------------------+
|batiment_id|          timestamp|consommation|unite|hour|year|month|      date|type_energie|                 nom|       type|commune|surface_m2|annee_construction|classe_energetique|nb_occupants_moyen|intensite_energetique|
+-----------+-------------------+------------+-----+----+----+-----+----------+------------+--------------------+-----------+-------+----------+------------------+------------------+------------------+---------------------+
|    BAT0001|2023-01-01 02:00:00|         0.2|   m3|   2|2023|    1|2023-01-01|         eau|       Ecole Paris 1|      ecole|  Paris|      1926|              1978|                 E|               225| 1.038421599169262...|
|    BAT0111|2023-01-01 05:00:00|        1.15|   m3|   5|2023|    1|2023-01-01|         eau|Mediatheque 

In [7]:
# Sauvegarde en parquet
df_final = df_with_intensity.toPandas()
df_final.to_parquet(CONSOMMATIONS_AGREGEES_PATH, index=False)

----------------------------------------
Exception occurred during processing of request from ('127.0.0.1', 49244)
Traceback (most recent call last):
  File "c:\Users\Administrateur\AppData\Local\Programs\Python\Python311\Lib\socketserver.py", line 317, in _handle_request_noblock
    self.process_request(request, client_address)
  File "c:\Users\Administrateur\AppData\Local\Programs\Python\Python311\Lib\socketserver.py", line 348, in process_request
    self.finish_request(request, client_address)
  File "c:\Users\Administrateur\AppData\Local\Programs\Python\Python311\Lib\socketserver.py", line 361, in finish_request
    self.RequestHandlerClass(request, client_address, self)
  File "c:\Users\Administrateur\AppData\Local\Programs\Python\Python311\Lib\socketserver.py", line 755, in __init__
    self.handle()
  File "c:\Users\Administrateur\AppData\Local\Programs\Python\Python311\Lib\site-packages\pyspark\accumulators.py", line 295, in handle
    poll(accum_updates)
  File "c:\Users\Admi

ConnectionRefusedError: [WinError 10061] Aucune connexion n’a pu être établie car l’ordinateur cible l’a expressément refusée

- Identifier les batiments hors norme (>3x la mediane de leur categorie)

In [7]:
median = df_with_intensity.groupBy("type").agg(F.median("intensite_energetique").alias("intensite_energetique_mediane"))

df_with_median = df_with_intensity.join(median, on="type", how="left")

hors_norme = df_with_median.filter(F.col("intensite_energetique") > 3 * F.col("intensite_energetique_mediane"))

print(f"Nombre de batiment hors norme : {hors_norme.count()}")
hors_norme.show(5)

Nombre de batiment hors norme : 2725373
+-----------+-----------+-------------------+------------+-----+----+----+-----+----------+------------+--------------------+-------+----------+------------------+------------------+------------------+---------------------+-----------------------------+
|       type|batiment_id|          timestamp|consommation|unite|hour|year|month|      date|type_energie|                 nom|commune|surface_m2|annee_construction|classe_energetique|nb_occupants_moyen|intensite_energetique|intensite_energetique_mediane|
+-----------+-----------+-------------------+------------+-----+----+----+-----+----------+------------+--------------------+-------+----------+------------------+------------------+------------------+---------------------+-----------------------------+
|mediatheque|    BAT0004|2023-01-01 19:00:00|        79.9|  kWh|  19|2023|    1|2023-01-01| electricite| Mediatheque Paris 4|  Paris|       907|              2019|                 C|               1

- Calculer les totaux par commune et par type de batiment

In [8]:
df_totaux = (
    df_join
    .groupBy(["commune", "type"])
    .agg(F.sum("df_with_intensity").alias("total_df_with_intensity"))
    .orderBy(F.col("total_df_with_intensity").desc())
)

df_totaux.show(5)

ConnectionRefusedError: [WinError 10061] Aucune connexion n’a pu être établie car l’ordinateur cible l’a expressément refusée

- Creer une vue SQL exploitable

In [10]:
# Création d'une vue temporaire
df_with_intensity.createOrReplaceTempView("consommations_batiments")

**Livrables** :
- Notebook `03_agregations_spark.ipynb`
- Table agregee `consommations_agregees.parquet`
- Requetes Spark SQL demonstrant l'utilisation de la vue