# TP – Analyse de la qualité de l'air
## Etape 1 – Exploration et chargement Spark

### Imports

In [1]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql import types as T

### Créer une session Spark locale:

In [2]:
try:
    spark = SparkSession.builder \
        .appName("TP - Analyse de la qualité de l'air - Etape 1: Exploration et chargement Spark") \
        .master("local[*]") \
        .getOrCreate()
    print(f"\n[ok]: Spark local session creation successful.\n")
except Exception as e:
    print(f"\n[ko]: Spark local session creation failed: {e}\n")
    sys.exit(1)

spark.sparkContext.setLogLevel("ERROR") # keep only error+


[ok]: Spark local session creation successful.



### Charger `air_quality_raw.csv` en DataFrame Spark:

In [3]:
data_file_air_quality = "../data/air_quality_raw.csv"
try:
    df_air = spark.read \
            .option("header", "true") \
            .option("inferSchema", "true") \
            .option("sep", ",") \
            .csv(data_file_air_quality)
    print(f"\n[ok]: read source data file successfull: '{data_file_air_quality}'\n")
except Exception as e:
    print(f"\n[ko]: read source data file failed: {e}\n")
    sys.exit(1)


[ok]: read source data file successfull: '../data/air_quality_raw.csv'



### Afficher le schéma inféré et identifier les problèmes de typage:

In [4]:
df_air.printSchema()

root
 |-- station_id: string (nullable = true)
 |-- timestamp: string (nullable = true)
 |-- pollutant: string (nullable = true)
 |-- value: string (nullable = true)
 |-- unit: string (nullable = true)



Problèmes de typage:
- timestamp: est en string => doit être en timestamp (TimestampType)
- value: est en string => doit être en double (DoubleType)

Solutions pour les problèmes de typages:


In [5]:
## timestamp (parse them to yyyy-MM-dd HH:mm:ss):
df_air = df_air.withColumn(
    "timestamp",
    F.coalesce(
        F.to_timestamp("timestamp", "MM/dd/yyyy HH:mm:ss"),   # e.g. 05/14/2024 04:00:00
        F.to_timestamp("timestamp", "yyyy-MM-dd'T'HH:mm:ss"), # e.g. 2024-06-05T01:00:00
        F.to_timestamp("timestamp", "dd/MM/yyyy HH:mm"),       # e.g. 18/03/2024 12:00
        F.to_timestamp("timestamp", "yyyy-MM-dd HH:mm:ss")    # e.g. 2024-05-23 11:00:00 (to keep the right format)
    )
)
## value:
df_air = df_air.withColumn(
    "value",
    F.regexp_replace("value", ",", ".").cast(T.DoubleType())
)

### Nouveau schéma sans problèmes de typage:

In [6]:
df_air.printSchema()

root
 |-- station_id: string (nullable = true)
 |-- timestamp: timestamp (nullable = true)
 |-- pollutant: string (nullable = true)
 |-- value: double (nullable = true)
 |-- unit: string (nullable = true)



### Calculer des statistiques descriptives par polluant:

In [7]:
## Statistiques descriptives
df_air.groupBy("pollutant").agg(
    F.count("value").alias("count"),
    F.mean("value").alias("mean"),
    F.stddev("value").alias("stddev"),
    F.min("value").alias("min"),
    F.max("value").alias("max")
).show()

+---------+------+------------------+------------------+-------+-------+
|pollutant| count|              mean|            stddev|    min|    max|
+---------+------+------------------+------------------+-------+-------+
|     PM10|204133| 70.10033718213137|335.04846834319403|-132.57|4999.15|
|      NO2|204093| 78.08748526407079|  336.511461014181|-163.48|4998.69|
|       O3|204220|108.78000078346876| 337.0055311006316|-269.95|4999.37|
|       CO|204062| 33.71101126128331|  341.164993171565|  -2.57|4996.48|
|    PM2.5|204189| 55.31060331359669|336.76201626367117| -81.23|4999.69|
|      SO2|204178|40.728096709733634|342.75159712088293| -26.57|4997.95|
+---------+------+------------------+------------------+-------+-------+



### Compter les valeurs nulles par colonne:

In [11]:
df_air.select([
    F.count(F.when(F.col(c).isNull(), c)).alias(c)
    for c in df_air.columns
]).show()

+----------+---------+---------+-----+----+
|station_id|timestamp|pollutant|value|unit|
+----------+---------+---------+-----+----+
|         0|        0|        0| 6076|   0|
+----------+---------+---------+-----+----+



### Identifier les stations avec le plus d'enregistrements:

In [20]:
gp_station = df_air.groupBy("station_id").count().orderBy(F.desc("count")).show()


+----------+-----+
|station_id|count|
+----------+-----+
|    ST0032|26264|
|    ST0012|26244|
|    ST0028|26241|
|    ST0003|26239|
|    ST0029|26235|
|    ST0024|26235|
|    ST0020|26235|
|    ST0037|26233|
|    ST0023|26233|
|    ST0042|26224|
|    ST0015|26221|
|    ST0035|26221|
|    ST0007|26219|
|    ST0044|26207|
|    ST0030|26204|
|    ST0017|26199|
|    ST0014|26199|
|    ST0010|26199|
|    ST0009|26198|
|    ST0040|26198|
+----------+-----+
only showing top 20 rows



Les stations top 3 des stations avec le plus d'enregistrements: 
- La station **ST0032** avec 26264 enregistrement;
- La station **ST0012** avec 26244 enregistrement;
- La station **ST0028** avec 26241 enregistrement.

*La station qui a le moins d'enregistrements est la ST0040 avec 26198 enregistrement.*

### Synthèse des problèmes de qualité identifiés:
Problèmes de typage :
- timestamp: est en string => doit être en timestamp (TimestampType) ==> pas de forme unique valide en spark
- value: est en string => doit être en double (DoubleType) ==> pas de forme unique valide en spark (parfois des ,)

Valeurs négatives dans la colonne value (des polluants), ce qui est pas logique ;
Valeurs nulls également dans la colonne value (des polluants).