In [0]:
# =====================================================
# NOTEBOOK : Nettoyage des données météo
# ETAPE : Data Cleaning
# PROJET : Big Data - Spark & Climat Sénégal
# =====================================================

## Contrôle qualité et nettoyage des données météorologiques
1.1 Évaluation de la qualité des données

Avant toute analyse, un contrôle qualité des bases de données météorologiques a été réalisé afin d’identifier les problèmes susceptibles d’affecter la fiabilité des résultats. Cette étape s’est concentrée principalement sur :

- la présence de valeurs manquantes (missing values),
- la couverture régionale des observations,
- la cohérence des variables météorologiques clés.

Le nombre de valeurs manquantes par variable a été calculé à l’aide de fonctions d’agrégation sous Apache Spark, ce qui a permis d’identifier les variables les plus affectées par des données incomplètes, notamment les précipitations (prcp), la vitesse du vent (wspd) et la pression atmosphérique (pres).

1.2 Gestion des valeurs manquantes

Une stratégie de nettoyage différenciée a été adoptée en fonction du rôle des variables dans l’analyse.

a) Traitement des variables critiques

Les variables suivantes ont été considérées comme indispensables à l’analyse :

- la date d’observation (time),
- la région (region),
- la température moyenne (tavg),
- la température minimale (tmin),
- la température maximale (tmax).

Toute observation comportant des valeurs manquantes sur ces variables a été supprimée, car leur absence compromet l’interprétation météorologique et économique des résultats.

b) Imputation des variables secondaires

Pour les variables présentant des valeurs manquantes mais jugées importantes pour l’analyse climatique, une imputation statistique a été appliquée :

la vitesse du vent (wspd) et la pression atmosphérique (pres) ont été imputées par la moyenne régionale correspondante ;

les précipitations (prcp) ont été remplacées par zéro lorsque la valeur était manquante, ce qui correspond à l’hypothèse d’absence de pluie les jours non observés.

Le calcul des moyennes régionales a été effectué avant toute suppression d’observations afin de conserver un maximum d’information.

1.3 Vérification post-nettoyage

Après l’imputation, un contrôle systématique a été réalisé pour vérifier la persistance éventuelle de valeurs manquantes. Les observations contenant encore des valeurs manquantes sur les variables critiques ont été éliminées afin de garantir une base finale sans valeurs manquantes sur les indicateurs essentiels.

La base de données nettoyée a ensuite été sauvegardée au format Delta Lake, permettant des performances optimales pour les analyses ultérieures sur Databricks.

1.4 Limites de la stratégie d’imputation

Bien que l’imputation par moyenne régionale permette de limiter la perte d’information, elle présente certaines limites :

- elle réduit la variabilité intra-régionale des données climatiques ;
- elle peut lisser des événements météorologiques extrêmes ;
- elle suppose une homogénéité climatique à l’intérieur de chaque région.

Ces limites sont prises en compte dans l’interprétation des résultats économiques et climatiques, et seront discutées dans la section dédiée à l’analyse et aux perspectives.

1️⃣ Chargement du volume brute

In [0]:
df_weather_raw = spark.read.table("weather_senegal_raw")

print("Volume brut :", df_weather_raw.count())
df_weather_raw.printSchema()

Volume brut : 65731
root
 |-- time: timestamp (nullable = true)
 |-- tavg: double (nullable = true)
 |-- tmin: double (nullable = true)
 |-- tmax: double (nullable = true)
 |-- prcp: double (nullable = true)
 |-- snow: double (nullable = true)
 |-- wdir: double (nullable = true)
 |-- wspd: double (nullable = true)
 |-- wpgt: double (nullable = true)
 |-- pres: double (nullable = true)
 |-- tsun: double (nullable = true)
 |-- region: string (nullable = true)



Le volume brut comprend 65731 éléments

In [0]:
# 3.3️⃣ Imputation par moyenne régionale (wspd, pres)
# Imports nécessaires pour cela
from pyspark.sql.functions import avg, when, col, sum as spark_sum

In [0]:
# 0) On lit proprement la table brute
df_weather_raw = spark.read.table("weather_senegal_raw")

# 1) Sélection des colonnes utiles cela évite les "void" et l'index
COLUMNS_KEEP = ["time","region","tavg","tmin","tmax","prcp","wspd","pres"]
df_weather_sel = df_weather_raw.select([c for c in COLUMNS_KEEP if c in df_weather_raw.columns])

In [0]:
# 2) On calcule des moyennes régionales AVANT suppression de lignes critiques
regional_means = (
    df_weather_sel
    .groupBy("region")
    .agg(
        avg("wspd").alias("mean_wspd"),
        avg("pres").alias("mean_pres")
    )
)

# On affiche les moyennes pour contrôle
display(regional_means.limit(50))

region,mean_wspd,mean_pres
Dakar,15.145636464702594,1012.2986111111122
Diourbel,9.512413793103438,1011.28908839779
Ziguinchor,7.915902891434776,1011.9252761257438
Tambacounda,10.798035660320355,1010.7027090144796
Saint-Louis,12.199503448275864,1012.0026280018124
Matam,9.616464237516867,1010.5124913733592
Kolda,7.617667356797793,1010.3424033149178
Kédougou,8.938233264320235,1010.6020041465088
Kaolack,11.837954701441294,1011.419461697724


In [0]:
# 3) Suppression des lignes critiques (time, region, tavg, tmin, tmax)
df_weather_nokeynull = df_weather_sel.dropna(subset=["time","region","tavg","tmin","tmax"])

# 4) Jointure avec les moyennes régionales
df_joined = df_weather_nokeynull.join(regional_means, on="region", how="left")

In [0]:
# 5) Imputation : remplacer NULLs de wspd et pres par la moyenne régionale
df_imputed = (
    df_joined
    .withColumn("wspd", when(col("wspd").isNull(), col("mean_wspd")).otherwise(col("wspd")))
    .withColumn("pres", when(col("pres").isNull(), col("mean_pres")).otherwise(col("pres")))
    # remplacer prcp manquante par 0 (si tu souhaites cette stratégie)
    .withColumn("prcp", when(col("prcp").isNull(), 0.0).otherwise(col("prcp")))
    .drop("mean_wspd", "mean_pres")  # on n'a plus besoin des colonnes intermédiaires
)

In [0]:
# 6) Vérification post-imputation : compte des valeurs manquantes restantes
missing_check_after = df_imputed.select([
    spark_sum(col(c).isNull().cast("int")).alias(c)
    for c in df_imputed.columns
])

print("Nombre de lignes après nettoyage :", df_imputed.count())
display(missing_check_after)

# 7) Supprimer toute ligne restante avec NULLs sur variables critiques (si tu veux garantir 0 NA)
critical_cols = ["time","region","tavg","tmin","tmax","prcp","wspd","pres"]
df_weather_clean = df_imputed.dropna(subset=critical_cols)

print("Nombre de lignes après suppression des restants NA sur colonnes critiques :", df_weather_clean.count())

Nombre de lignes après nettoyage : 43110


region,time,tavg,tmin,tmax,prcp,wspd,pres
0,0,0,0,0,0,0,0


Nombre de lignes après suppression des restants NA sur colonnes critiques : 43110


Après traitement des valeurs manquantes, le volume a 43110 observations

In [0]:
# 8) On sauvegarde la base finale (Delta)
(df_weather_clean
 .write
 .mode("overwrite")
 .format("delta")
 .saveAsTable("weather_senegal_clean")
)

# Affichage rapide
spark.read.table("weather_senegal_clean").limit(5).show()

+------+-------------------+----+----+----+----+----+------------------+
|region|               time|tavg|tmin|tmax|prcp|wspd|              pres|
+------+-------------------+----+----+----+----+----+------------------+
| Dakar|2005-01-01 00:00:00|22.6|18.5|28.0| 0.0|16.4|1012.2986111111122|
| Dakar|2005-01-02 00:00:00|21.5|18.8|24.0| 0.0|17.0|1012.2986111111122|
| Dakar|2005-01-05 00:00:00|20.1|18.0|23.0| 0.0|16.2|1012.2986111111122|
| Dakar|2005-01-08 00:00:00|22.1|19.9|25.0| 0.0|21.2|1012.2986111111122|
| Dakar|2005-01-09 00:00:00|21.6|19.0|25.0| 0.0|19.0|1012.2986111111122|
+------+-------------------+----+----+----+----+----+------------------+

