# Exploration initiale

In [23]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
import os


DATA_DIR = "../data_ecf"
CONSO_RAW_PATH = os.path.join(DATA_DIR, "consommations_raw.csv")
METEO_PATH = os.path.join(DATA_DIR, "meteo_raw.csv")
BATIMENTS_PATH = os.path.join(DATA_DIR, "batiments.csv")
TARIF_ENERGIE_PATH = os.path.join(DATA_DIR, "tarifs_energie.csv")


## Session Spark

In [5]:
spark = SparkSession.builder \
    .appName("ECF2") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "8") \
    .getOrCreate()

# Reduire les logs
spark.sparkContext.setLogLevel("WARN")

print(f"Spark version: {spark.version}")
print(f"Spark UI: {spark.sparkContext.uiWebUrl}")

Spark version: 3.5.7
Spark UI: http://host.docker.internal:4041


## Chargement des données

In [6]:
df_conso_raw = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(CONSO_RAW_PATH)

print(f"Nombre de lignes: {df_conso_raw.count():,}")
print(f"Nombre de colonnes: {len(df_conso_raw.columns)}")

Nombre de lignes: 7,758,868
Nombre de colonnes: 5


## Schema infere

In [9]:
df_conso_raw.printSchema()

root
 |-- batiment_id: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- type_energie: string (nullable = true)
 |-- consommation: string (nullable = true)
 |-- unite: string (nullable = true)



## Problèmes de typage

In [10]:
# Differents formats de timestamp
print("Exemples de formats de timestamp:")
df_conso_raw.select("timestamp").distinct().show(20, truncate=False)

Exemples de formats de timestamp:
+-------------------+
|timestamp          |
+-------------------+
|2023-12-21 13:00:00|
|11/29/2024 04:00:00|
|09/15/2024 18:00:00|
|20/12/2023 07:00   |
|29/04/2024 13:00   |
|2024-06-15T14:00:00|
|22/03/2024 11:00   |
|2023-04-07 13:00:00|
|2024-03-11T11:00:00|
|08/21/2024 17:00:00|
|2024-05-27 04:00:00|
|08/02/2023 16:00   |
|2024-11-23 17:00:00|
|2023-07-06T05:00:00|
|2024-04-30 02:00:00|
|08/12/2023 06:00   |
|16/11/2024 21:00   |
|12/08/2024 04:00:00|
|2024-05-27 10:00:00|
|11/03/2023 09:00:00|
+-------------------+
only showing top 20 rows



In [11]:
# La colonne 'consommation' est en string car elle contient des virgules et des valeurs textuelles
# Examinons les valeurs non numeriques

# Valeurs qui ne peuvent pas etre converties en nombre
df_non_numeric = df_conso_raw.filter(
    ~F.col("consommation").rlike("^-?[0-9]+[.,]?[0-9]*$")
)

print(f"Nombre de valeurs non numeriques: {df_non_numeric.count():,}")
df_non_numeric.select("consommation").distinct().show()

Nombre de valeurs non numeriques: 38,975
+------------+
|consommation|
+------------+
|        null|
|         N/A|
|      erreur|
|         ---|
+------------+



In [12]:
# Valeurs avec virgule comme separateur decimal
df_with_comma = df_conso_raw.filter(F.col("consommation").contains(","))
print(f"Nombre de valeurs avec virgule: {df_with_comma.count():,}")
df_with_comma.select("consommation").show(5)

Nombre de valeurs avec virgule: 925,392
+------------+
|consommation|
+------------+
|       10,10|
|       72,29|
|       17,82|
|        0,92|
|      290,65|
+------------+
only showing top 5 rows



## Statistiques descriptives par type d'énergie

In [13]:
# Convertir value en double (en remplacant la virgule par un point)
df_conso_numeric = df_conso_raw.withColumn(
    "conso_clean",
    F.regexp_replace(F.col("consommation"), ",", ".").cast("double")
)

# Statistiques par polluant (en ignorant les valeurs nulles)
stats_by_energy = df_conso_numeric.filter(F.col("conso_clean").isNotNull()) \
    .groupBy("type_energie") \
    .agg(
        F.count("*").alias("count"),
        F.round(F.mean("conso_clean"), 2).alias("mean"),
        F.round(F.stddev("conso_clean"), 2).alias("stddev"),
        F.round(F.min("conso_clean"), 2).alias("min"),
        F.round(F.max("conso_clean"), 2).alias("max"),
        F.round(F.expr("percentile(conso_clean, 0.5)"), 2).alias("median")
    ) \
    .orderBy("type_energie")

print("Statistiques par type d'énergie:")
stats_by_energy.show()

Statistiques par type d'énergie:
+------------+-------+------+-------+--------+--------+------+
|type_energie|  count|  mean| stddev|     min|     max|median|
+------------+-------+------+-------+--------+--------+------+
|         eau|2573156|204.36|2398.57| -657.01|49999.23|  7.52|
| electricite|2573364|430.64|2429.51|-4003.35|49999.13|108.56|
|         gaz|2573373|560.94|2465.81|-5963.49|49999.49|160.57|
+------------+-------+------+-------+--------+--------+------+



In [14]:
# Identifier les valeurs aberrantes
print("Valeurs negatives:")
df_conso_numeric.filter(F.col("conso_clean") < 0).groupBy("type_energie").count().show()

print("\nValeurs > 15000 :")
df_conso_numeric.filter(F.col("conso_clean") > 15000).groupBy("type_energie").count().show()

Valeurs negatives:
+------------+-----+
|type_energie|count|
+------------+-----+
|         eau|12945|
|         gaz|12997|
| electricite|12968|
+------------+-----+


Valeurs > 15000 :
+------------+-----+
|type_energie|count|
+------------+-----+
|         eau|12810|
|         gaz|12764|
| electricite|12986|
+------------+-----+



## Bâtiment avec le plus de mesure

In [15]:
# Chargement des bâtiments
df_batiments = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(BATIMENTS_PATH)

df_batiments.show(10)

+-----------+-------------------+-----------+-------+----------+------------------+------------------+------------------+
|batiment_id|                nom|       type|commune|surface_m2|annee_construction|classe_energetique|nb_occupants_moyen|
+-----------+-------------------+-----------+-------+----------+------------------+------------------+------------------+
|    BAT0001|      Ecole Paris 1|      ecole|  Paris|      1926|              1978|                 E|               225|
|    BAT0002|      Ecole Paris 2|      ecole|  Paris|      1156|              2004|                 C|               402|
|    BAT0003|      Ecole Paris 3|      ecole|  Paris|      1695|              2014|                 D|               219|
|    BAT0004|Mediatheque Paris 4|mediatheque|  Paris|       907|              2019|                 C|               121|
|    BAT0005|    Piscine Paris 5|    piscine|  Paris|      3913|              1950|                 G|               242|
|    BAT0006|     Mairie

In [None]:
# Nombre d'enregistrements par bâtiments
records_by_bat = df_conso_raw.groupBy("batiment_id") \
    .count() \
    .orderBy(F.desc("count"))

# Joindre avec les infos des stations
records_with_info = records_by_bat.join(
    df_batiments,
    on="batiment_id",
    how="left"
).select(
    "batiment_id", "nom", "commune", "type", "count"
)

print("Top 1 des bâtiments avec le plus d'enregistrements:")
records_with_info.show(1)

Top 10 des bâtiments avec le plus d'enregistrements:
+-----------+--------------------+----------+------+-----+
|batiment_id|                 nom|   commune|  type|count|
+-----------+--------------------+----------+------+-----+
|    BAT0073|Mairie Strasbourg 73|Strasbourg|mairie|53156|
+-----------+--------------------+----------+------+-----+
only showing top 1 row



## Audit de qualité des données

In [21]:
# Resume des problemes
total = df_conso_raw.count()

# Valeurs non numeriques
non_numeric = df_conso_raw.filter(
    ~F.col("consommation").rlike("^-?[0-9]+[.,]?[0-9]*$")
).count()

# Valeurs avec virgule
with_comma = df_conso_raw.filter(F.col("consommation").contains(",")).count()

# Valeurs negatives (apres conversion)
negative = df_conso_numeric.filter(F.col("conso_clean") < 0).count()

# Valeurs aberrantes > 15000
outliers = df_conso_numeric.filter(F.col("conso_clean") > 15000).count()

# Doublons
duplicates = total - df_conso_raw.dropDuplicates(["batiment_id", "timestamp", "type_energie"]).count()


print(f"Total enregistrements: {total:,}")
print()
print(f"Problemes identifies:")
print(f"  - Valeurs non numeriques: {non_numeric:,} ({non_numeric/total*100:.2f}%)")
print(f"  - Valeurs avec virgule decimale: {with_comma:,} ({with_comma/total*100:.2f}%)")
print(f"  - Valeurs negatives: {negative:,} ({negative/total*100:.2f}%)")
print(f"  - Valeurs aberrantes (>15000): {outliers:,} ({outliers/total*100:.2f}%)")
print(f"  - Doublons: {duplicates:,} ({duplicates/total*100:.2f}%)")
print(f"  - Formats de dates multiples: 4 formats differents detectes")

Total enregistrements: 7,758,868

Problemes identifies:
  - Valeurs non numeriques: 38,975 (0.50%)
  - Valeurs avec virgule decimale: 925,392 (11.93%)
  - Valeurs negatives: 38,910 (0.50%)
  - Valeurs aberrantes (>15000): 38,560 (0.50%)
  - Doublons: 152,134 (1.96%)
  - Formats de dates multiples: 4 formats differents detectes


In [22]:
spark.stop()