# Etape 1 - Exploration et chargement Spark

**Objectif** : Charger et explorer les donnees de qualite de l'air avec PySpark

---
---

## Imports

In [1]:
import os
from pathlib import Path
from pyspark.sql import SparkSession
from pyspark.sql import functions as F

---

## Chemins des données

In [2]:
DATA_DIR = (Path.cwd() / ".." / "data").resolve()
AIR_QUALITY_PATH = os.path.join(DATA_DIR, "air_quality_raw.csv")
STATIONS_PATH = os.path.join(DATA_DIR, "stations.csv")
WEATHER_PATH = os.path.join(DATA_DIR, "weather_raw.csv")

---

## 1.1 Création d'une session Spark locale

In [3]:
## Creer une session Spark locale
spark = SparkSession.builder \
    .appName("TP Qualite Air - Exploration") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .config("spark.sql.shuffle.partitions", "8") \
    .getOrCreate()

## Reduire les logs
spark.sparkContext.setLogLevel("WARN")

## Affichage de l'actuelle version de Spark & de l'url de Spark UI
print(f"Spark version: {spark.version}")
print(f"Spark UI: {spark.sparkContext.uiWebUrl}")

Spark version: 3.5.7
Spark UI: http://joel:4040


---

## 1.2 Chargement du fichier air_quality_raw.csv en DataFrame Spark (les données de la qualité de l'air)

In [4]:
## Chargement du CSV avec inference de schema
df_air_raw = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(AIR_QUALITY_PATH)

print("- Fichier de la qualité de l'air 'air_quality_raw.csv' a été chargé avec succès.")

## Information sur le nombre de lignes et de colonnes
print("- Nombre de lignes :", df_air_raw.count())
print("- Nombre de colonnes :", len(df_air_raw.columns))

## Apercu des donnees
print("- Apercu des donnees :")
df_air_raw.show(10, truncate=False)

- Fichier de la qualité de l'air 'air_quality_raw.csv' a été chargé avec succès.
- Nombre de lignes : 1230951
- Nombre de colonnes : 5
- Apercu des donnees :
+----------+-------------------+---------+-----+-----+
|station_id|timestamp          |pollutant|value|unit |
+----------+-------------------+---------+-----+-----+
|ST0040    |2024-01-07T05:00:00|O3       |79.29|ug/m3|
|ST0004    |06/09/2024 18:00:00|O3       |41.58|ug/m3|
|ST0027    |2024-05-23 11:00:00|PM10     |29.20|ug/m3|
|ST0002    |18/03/2024 12:00   |SO2      |7.72 |ug/m3|
|ST0035    |2024-06-11T08:00:00|O3       |29.87|ug/m3|
|ST0023    |19/04/2024 10:00   |O3       |30.07|ug/m3|
|ST0035    |19/03/2024 05:00   |PM2.5    |14.12|ug/m3|
|ST0001    |2024-05-19T13:00:00|PM10     |17,09|ug/m3|
|ST0014    |10/03/2024 20:00   |CO       |0.29 |mg/m3|
|ST0004    |2024-01-28T16:00:00|NO2      |25,69|ug/m3|
+----------+-------------------+---------+-----+-----+
only showing top 10 rows



---

## 1.3 Affichage du schéma inféré et identification des problèmes de typage

### 1.3.1 Affichage du schéma inféré

In [5]:
df_air_raw.printSchema()

root
 |-- station_id: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- pollutant: string (nullable = true)
 |-- value: string (nullable = true)
 |-- unit: string (nullable = true)



### 1.3.2 Identification des problèmes de typage
- **timestamp**

la colonne ``timestamp`` est en string car Il y'a differents formats de timestamp :

In [6]:
print("Exemples de formats de timestamp:")
df_air_raw.select("timestamp").distinct().show(20, truncate=False)

Exemples de formats de timestamp:
+-------------------+
|timestamp          |
+-------------------+
|2024-01-07T05:00:00|
|2024-05-19T13:00:00|
|05/26/2024 14:00:00|
|2024-04-12T09:00:00|
|2024-01-31T14:00:00|
|2024-06-02 20:00:00|
|05/18/2024 07:00:00|
|2024-04-26T22:00:00|
|2024-03-31T02:00:00|
|28/04/2024 22:00   |
|2024-03-22 10:00:00|
|2024-04-02 00:00:00|
|2024-03-30 18:00:00|
|16/03/2024 02:00   |
|04/22/2024 22:00:00|
|03/03/2024 02:00:00|
|2024-06-28 11:00:00|
|12/05/2024 12:00   |
|17/01/2024 03:00   |
|2024-03-28T08:00:00|
+-------------------+
only showing top 20 rows



- **value**

La colonne ``value`` est en string également car elle contient des valeurs avec des virgules et des valeurs textuelles :

In [7]:
## Les valeurs qui ne peuvent pas etre converties en nombre
df_non_numeric = df_air_raw.filter(
    ~F.col("value").rlike("^-?[0-9]+[.,]?[0-9]*$")
)

print(f"- Nombre de valeurs non numeriques : {df_non_numeric.count():,}")

print(f"- Differents valeurs non numeriques :")
df_non_numeric.select("value").distinct().show()

- Nombre de valeurs non numeriques : 6,076
- Differents valeurs non numeriques :
+-----+
|value|
+-----+
| null|
|  N/A|
|error|
|  ---|
+-----+



In [8]:
## Valeurs avec virgule comme separateur decimal
df_with_comma = df_air_raw.filter(F.col("value").contains(","))
print(f"- Nombre de valeurs avec virgule : {df_with_comma.count():,}")

## Affichages d'exemples
print(f"- Exemples :")
df_with_comma.select("value").show(5)

- Nombre de valeurs avec virgule : 184,556
- Exemples :
+-----+
|value|
+-----+
|17,09|
|25,69|
|15,42|
|29,14|
|12,82|
+-----+
only showing top 5 rows



---

## 1.4 Calcul des statistiques descriptives par polluant

Initialement on a la colonne ``value`` en string. 

Pour calculer les statistiques descriptives, on va créer une dataFrame 'df_air_numeric' qui contient les mêmes données que 'df_air_raw' plus une colonne 'value_clean' qui contient les valeurs de la colonne 'value_clean' converties en double.

In [9]:
## Nombre des valeurs Nulls initiales dans df_air_raw
df_air_raw.filter(F.col("value").isNull()).count()

## Ajouter de la colonne 'value_clean' dans la dataFrame df_air_raw
## & convertion de value en double (en remplacant la virgule par un point)
df_air_numeric = df_air_raw.withColumn(
    "value_clean",
    F.regexp_replace(F.col("value"), ",", ".").cast("double")
)

## Apercu de la df_air_numeric
print("- Apercu de la df_air_numeric :")
df_air_numeric.show(10)

## Nombre des valeurs Nulls dans df_air_numeric
print("- Nombre des valeurs Nulls dans la colonne 'value_clean' :", df_air_numeric.filter(F.col("value_clean").isNull()).count())

## Calcul des statistiques descriptives par polluant (en ignorant les valeurs nulles de 'value_clean')
stats_by_pollutant = df_air_numeric.filter(F.col("value_clean").isNotNull()) \
    .groupBy("pollutant") \
    .agg(
        F.count("*").alias("count"),
        F.round(F.mean("value_clean"), 2).alias("mean"),
        F.round(F.stddev("value_clean"), 2).alias("stddev"),
        F.round(F.min("value_clean"), 2).alias("min"),
        F.round(F.max("value_clean"), 2).alias("max"),
        F.round(F.expr("percentile(value_clean, 0.5)"), 2).alias("median")
    ) \
    .orderBy("pollutant")

print("\n- Statistiques par polluant (en ignorant les valeurs nulles) :")
stats_by_pollutant.show()

- Apercu de la df_air_numeric :
+----------+-------------------+---------+-----+-----+-----------+
|station_id|          timestamp|pollutant|value| unit|value_clean|
+----------+-------------------+---------+-----+-----+-----------+
|    ST0040|2024-01-07T05:00:00|       O3|79.29|ug/m3|      79.29|
|    ST0004|06/09/2024 18:00:00|       O3|41.58|ug/m3|      41.58|
|    ST0027|2024-05-23 11:00:00|     PM10|29.20|ug/m3|       29.2|
|    ST0002|   18/03/2024 12:00|      SO2| 7.72|ug/m3|       7.72|
|    ST0035|2024-06-11T08:00:00|       O3|29.87|ug/m3|      29.87|
|    ST0023|   19/04/2024 10:00|       O3|30.07|ug/m3|      30.07|
|    ST0035|   19/03/2024 05:00|    PM2.5|14.12|ug/m3|      14.12|
|    ST0001|2024-05-19T13:00:00|     PM10|17,09|ug/m3|      17.09|
|    ST0014|   10/03/2024 20:00|       CO| 0.29|mg/m3|       0.29|
|    ST0004|2024-01-28T16:00:00|      NO2|25,69|ug/m3|      25.69|
+----------+-------------------+---------+-----+-----+-----------+
only showing top 10 rows

- No

In [10]:
## Identification des valeurs aberrantes
print("- Valeurs negatives:")
df_air_numeric.filter(F.col("value_clean") < 0).groupBy("pollutant").count().show()

print("- Valeurs > 1000 ug/m3:")
df_air_numeric.filter(F.col("value_clean") > 1000).groupBy("pollutant").count().show()

- Valeurs negatives:
+---------+-----+
|pollutant|count|
+---------+-----+
|      SO2| 2018|
|       O3| 2062|
|      NO2| 2033|
|     PM10| 2040|
|       CO| 2087|
|    PM2.5| 2070|
+---------+-----+

- Valeurs > 1000 ug/m3:
+---------+-----+
|pollutant|count|
+---------+-----+
|      SO2| 2066|
|       O3| 2080|
|      NO2| 2031|
|     PM10| 2017|
|       CO| 2070|
|    PM2.5| 2063|
+---------+-----+



---

## 1.5 Comptage des valeurs nulles par colonne

In [11]:
null_counts = df_air_raw.select([
    F.count(F.when(F.col(c).isNull() | (F.col(c) == ""), c)).alias(c)
    for c in df_air_raw.columns
])

print("- Nombre de valeurs nulles/vides par colonne:")
null_counts.show()

- Nombre de valeurs nulles/vides par colonne:
+----------+---------+---------+-----+----+
|station_id|timestamp|pollutant|value|unit|
+----------+---------+---------+-----+----+
|         0|        0|        0|    0|   0|
+----------+---------+---------+-----+----+



---

## 1.6 Identification des stations avec le plus d'enregistrements

In [12]:
## station ids avec le plus d'enregistrements (affichage par ordre décroissant du nombre des enregistrements)
print("- Les stations ids avec le plus d'enregistrements :")
df_stations_enregistrements_count = df_air_raw.groupBy('station_id').count().withColumnRenamed("count", "nbre_d_enregistrements").orderBy(F.col("count").desc())
df_stations_enregistrements_count.show(10)

- Les stations ids avec le plus d'enregistrements :
+----------+----------------------+
|station_id|nbre_d_enregistrements|
+----------+----------------------+
|    ST0032|                 26264|
|    ST0012|                 26244|
|    ST0028|                 26241|
|    ST0003|                 26239|
|    ST0029|                 26235|
|    ST0024|                 26235|
|    ST0020|                 26235|
|    ST0037|                 26233|
|    ST0023|                 26233|
|    ST0042|                 26224|
+----------+----------------------+
only showing top 10 rows



In [13]:
## Chargement du CSV des stations avec inference de schema
df_stations = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv(STATIONS_PATH)

print("- Fichier de des stations 'stations.csv' a été chargé avec succès.")

## Information sur le nombre de lignes et de colonnes
print("- Nombre de lignes :", df_stations.count())
print("- Nombre de colonnes :", len(df_stations.columns))

## Apercu des donnees
print("- Apercu des donnees :")
df_stations.show(10, truncate=False)

- Fichier de des stations 'stations.csv' a été chargé avec succès.
- Nombre de lignes : 47
- Nombre de colonnes : 6
- Apercu des donnees :
+----------+------------------------+---------+---------+--------+------------+
|station_id|station_name            |city     |lat      |lon     |station_type|
+----------+------------------------+---------+---------+--------+------------+
|ST0001    |Paris-urbaine-1         |Paris    |48.809101|2.329703|urbaine     |
|ST0002    |Paris-periurbaine-2     |Paris    |48.828921|2.375847|periurbaine |
|ST0003    |Paris-industrielle-3    |Paris    |48.87427 |2.391418|industrielle|
|ST0004    |Lyon-urbaine-1          |Lyon     |45.773049|4.788878|urbaine     |
|ST0005    |Lyon-periurbaine-2      |Lyon     |45.72337 |4.808966|periurbaine |
|ST0006    |Lyon-industrielle-3     |Lyon     |45.774202|4.841825|industrielle|
|ST0007    |Marseille-urbaine-1     |Marseille|43.288452|5.364721|urbaine     |
|ST0008    |Marseille-periurbaine-2 |Marseille|43.274319|5.40

In [14]:
## Affichage des stations (avec leurs données) avec le plus d'enregistrements (ordre décroissant)
df_stations_enregistrements_count.join(df_stations, on="station_id", how="left").orderBy(F.col("nbre_d_enregistrements").desc()).show()

+----------+----------------------+--------------------+----------+---------+---------+------------+
|station_id|nbre_d_enregistrements|        station_name|      city|      lat|      lon|station_type|
+----------+----------------------+--------------------+----------+---------+---------+------------+
|    ST0032|                 26264|Strasbourg-periur...|Strasbourg|48.563516| 7.708719| periurbaine|
|    ST0012|                 26244| Marseille-urbaine-6| Marseille|43.256721| 5.357793|     urbaine|
|    ST0028|                 26241|Nantes-periurbaine-2|    Nantes|47.267352|  -1.5396| periurbaine|
|    ST0003|                 26239|Paris-industrielle-3|     Paris| 48.87427| 2.391418|industrielle|
|    ST0029|                 26235|Nantes-industriel...|    Nantes|47.224095|-1.535139|industrielle|
|    ST0024|                 26235|Lille-industrielle-3|     Lille|50.646218| 3.077482|industrielle|
|    ST0020|                 26235|Bordeaux-periurba...|  Bordeaux|44.816739|-0.621221| per

---

## 1.7 Synthese des problemes de qualite identifies

In [15]:
## Nombre total des enregistrements
total_enregistrements = df_air_raw.count()

## Valeurs non numeriques
non_numeric = df_air_raw.filter(
    ~F.col("value").rlike("^-?[0-9]+[.,]?[0-9]*$")
).count()

## Valeurs avec virgule
with_comma = df_air_raw.filter(F.col("value").contains(",")).count()

## Valeurs negatives (apres conversion)
negative = df_air_numeric.filter(F.col("value_clean") < 0).count()

## Valeurs aberrantes > 1000
outliers = df_air_numeric.filter(F.col("value_clean") > 1000).count()

## Doublons
duplicates = total_enregistrements - df_air_raw.dropDuplicates(["station_id", "timestamp", "pollutant"]).count()


print(f"Total enregistrements : {total_enregistrements:,}")
print()
print(f"Problemes identifies :")
print(f"  - Valeurs non numeriques: {non_numeric:,} ({non_numeric/total_enregistrements*100:.2f}%)")
print(f"  - Valeurs avec virgule decimale: {with_comma:,} ({with_comma/total_enregistrements*100:.2f}%)")
print(f"  - Valeurs negatives: {negative:,} ({negative/total_enregistrements*100:.2f}%)")
print(f"  - Valeurs aberrantes (>1000): {outliers:,} ({outliers/total_enregistrements*100:.2f}%)")
print(f"  - Doublons: {duplicates:,} ({duplicates/total_enregistrements*100:.2f}%)")
print(f"  - Formats de dates multiples : 4 formats differents detectes")

Total enregistrements : 1,230,951

Problemes identifies :
  - Valeurs non numeriques: 6,076 (0.49%)
  - Valeurs avec virgule decimale: 184,556 (14.99%)
  - Valeurs negatives: 12,310 (1.00%)
  - Valeurs aberrantes (>1000): 12,327 (1.00%)
  - Doublons: 24,136 (1.96%)
  - Formats de dates multiples : 4 formats differents detectes


---

## Fermer la session Spark

In [16]:
spark.stop()