# Projet Apache Spark : Analyse des données climatiques mondiales

## Objectif :
Analyser les tendances climatiques mondiales à l'aide de Spark, y compris le nettoyage des données, l'EDA et l'extraction d'informations.

### Jeu de données :
[Global Surface Summary of the Day (GSOD) provenant de NOAA](https://www.ncei.noaa.gov/access/metadata/landing-page/bin/iso?id=gov.noaa.ncdc:C00516)


In [5]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql import functions as F
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.window import Window
from pyspark.sql.functions import col

In [2]:
from pyspark.sql import SparkSession

# Créer ou obtenir une session Spark active
spark = SparkSession.builder \
    .appName("Meteo Analysis") \
    .config("spark.network.timeout", "14400s") \
    .config("spark.executor.memory", "10g") \
    .config("spark.driver.memory", "10g") \
    .getOrCreate()

# Définir le chemin vers les fichiers CSV contenant les données des cyclistes
path = "./data/*/*.csv"

# Charger les données CSV dans un DataFrame Spark
climat = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(path)

# Afficher les types de données (schéma) de chaque colonne dans le DataFrame
print(climat.dtypes)

[('STATION', 'string'), ('DATE', 'date'), ('LATITUDE', 'double'), ('LONGITUDE', 'double'), ('ELEVATION', 'double'), ('NAME', 'string'), ('TEMP', 'double'), ('TEMP_ATTRIBUTES', 'double'), ('DEWP', 'double'), ('DEWP_ATTRIBUTES', 'double'), ('SLP', 'double'), ('SLP_ATTRIBUTES', 'double'), ('STP', 'double'), ('STP_ATTRIBUTES', 'double'), ('VISIB', 'double'), ('VISIB_ATTRIBUTES', 'double'), ('WDSP', 'double'), ('WDSP_ATTRIBUTES', 'double'), ('MXSPD', 'double'), ('GUST', 'double'), ('MAX', 'double'), ('MAX_ATTRIBUTES', 'string'), ('MIN', 'double'), ('MIN_ATTRIBUTES', 'string'), ('PRCP', 'double'), ('PRCP_ATTRIBUTES', 'string'), ('SNDP', 'double'), ('FRSHTT', 'int')]


---
## Exploration du jeu de données


In [None]:
# Charger les données
# Charger le jeu de données GSOD dans Spark en tant que DataFrame (Remplacer par le chemin réel)
# df = spark.read.csv('chemin/vers/dataset.csv', header=True, inferSchema=True)

In [6]:
# Inspection du schéma et calcul des statistiques
climat.printSchema()
climat.count(), len(climat.columns)

root
 |-- STATION: string (nullable = true)
 |-- DATE: date (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- ELEVATION: double (nullable = true)
 |-- NAME: string (nullable = true)
 |-- TEMP: double (nullable = true)
 |-- TEMP_ATTRIBUTES: double (nullable = true)
 |-- DEWP: double (nullable = true)
 |-- DEWP_ATTRIBUTES: double (nullable = true)
 |-- SLP: double (nullable = true)
 |-- SLP_ATTRIBUTES: double (nullable = true)
 |-- STP: double (nullable = true)
 |-- STP_ATTRIBUTES: double (nullable = true)
 |-- VISIB: double (nullable = true)
 |-- VISIB_ATTRIBUTES: double (nullable = true)
 |-- WDSP: double (nullable = true)
 |-- WDSP_ATTRIBUTES: double (nullable = true)
 |-- MXSPD: double (nullable = true)
 |-- GUST: double (nullable = true)
 |-- MAX: double (nullable = true)
 |-- MAX_ATTRIBUTES: string (nullable = true)
 |-- MIN: double (nullable = true)
 |-- MIN_ATTRIBUTES: string (nullable = true)
 |-- PRCP: double (nullable = tru

(19622973, 28)

**Questions :**
1. Combien d'enregistrements contient le jeu de données ?

In [13]:
climat.count()

19622973


2. Quels sont les noms et types de colonnes ?


In [4]:
climat.printSchema()

root
 |-- STATION: string (nullable = true)
 |-- DATE: date (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- ELEVATION: double (nullable = true)
 |-- NAME: string (nullable = true)
 |-- TEMP: double (nullable = true)
 |-- TEMP_ATTRIBUTES: double (nullable = true)
 |-- DEWP: double (nullable = true)
 |-- DEWP_ATTRIBUTES: double (nullable = true)
 |-- SLP: double (nullable = true)
 |-- SLP_ATTRIBUTES: double (nullable = true)
 |-- STP: double (nullable = true)
 |-- STP_ATTRIBUTES: double (nullable = true)
 |-- VISIB: double (nullable = true)
 |-- VISIB_ATTRIBUTES: double (nullable = true)
 |-- WDSP: double (nullable = true)
 |-- WDSP_ATTRIBUTES: double (nullable = true)
 |-- MXSPD: double (nullable = true)
 |-- GUST: double (nullable = true)
 |-- MAX: double (nullable = true)
 |-- MAX_ATTRIBUTES: string (nullable = true)
 |-- MIN: double (nullable = true)
 |-- MIN_ATTRIBUTES: string (nullable = true)
 |-- PRCP: double (nullable = tru


3. Combien de stations météorologiques uniques sont présentes ?

In [15]:
climat.select("STATION").distinct().count()

12948

---
## Nettoyage des données


In [16]:
# Identifier les colonnes avec des valeurs manquantes et les traiter
missing_values = climat.select([F.count(F.when(F.col(c).isNull(), c)) for c in climat.columns])

In [17]:
# Supprimer les lignes avec des valeurs critiques manquantes
climat_annee_delete = climat.dropna(subset=['TEMP', 'DATE'])

**Questions :**
1. Quelles colonnes ont le plus de valeurs manquantes ?


In [18]:
# Calculer le nombre de valeurs manquantes pour chaque colonne
missing_values = climat.select(
    [count(when(col(c).isNull(), c)).alias(c) for c in climat.columns]
)

# Afficher chaque ligne (ici une seule ligne avec les résultats pour chaque colonne)
for row in missing_values.collect():
    print(row.asDict())

{'STATION': 0, 'DATE': 0, 'LATITUDE': 61659, 'LONGITUDE': 61659, 'ELEVATION': 63375, 'NAME': 61657, 'TEMP': 0, 'TEMP_ATTRIBUTES': 0, 'DEWP': 0, 'DEWP_ATTRIBUTES': 0, 'SLP': 0, 'SLP_ATTRIBUTES': 0, 'STP': 0, 'STP_ATTRIBUTES': 0, 'VISIB': 0, 'VISIB_ATTRIBUTES': 0, 'WDSP': 0, 'WDSP_ATTRIBUTES': 0, 'MXSPD': 0, 'GUST': 0, 'MAX': 0, 'MAX_ATTRIBUTES': 0, 'MIN': 0, 'MIN_ATTRIBUTES': 0, 'PRCP': 0, 'PRCP_ATTRIBUTES': 0, 'SNDP': 0, 'FRSHTT': 0}



2. Après nettoyage, combien d'enregistrements restent ?


In [19]:
# Supprimer les lignes avec des valeurs manquantes dans n'importe quelle colonne
drop_climat = climat.dropna()

# Compter le nombre d'enregistrements restants après nettoyage
remaining_records = drop_climat.count()

print(f"Nombre d'enregistrements après nettoyage : {remaining_records}")

Nombre d'enregistrements après nettoyage : 19559598



3. Quelle technique avez-vous utilisée pour traiter les valeurs manquantes dans la colonne des précipitations ?

In [20]:
#Pour l'ensemble de notre étude, nous avons choisi d'adopter la méthode d'imputation des 
#valeurs manquantes en utilisant une valeur fixe, telle que la moyenne, la médiane, ou d'autres 
#valeurs appropriées.

# Récupérer la liste des colonnes numériques
numeric_columns = [field.name for field in climat.schema.fields if isinstance(field.dataType, (IntegerType, DoubleType))]

# Calculer la moyenne de chaque colonne numérique et remplir les valeurs manquantes
mean_values = climat.select([F.mean(c).alias(c) for c in numeric_columns]).collect()[0].asDict()

# Remplacer les valeurs manquantes pour chaque colonne par la moyenne
climat_cleaned = climat.fillna(mean_values)

# Afficher le nombre d'enregistrements après nettoyage
remaining_records = climat_cleaned.count()
print(f"Nombre d'enregistrements après nettoyage : {remaining_records}")

Nombre d'enregistrements après nettoyage : 19622973


---
## Transformation des données


In [21]:
def remove_unwanted_columns(climat_cleaned: DataFrame, columns_to_remove: list) -> DataFrame:
    """
    Supprime les colonnes spécifiées d'un DataFrame PySpark.
    
    :param df: DataFrame d'entrée
    :param columns_to_remove: Liste des colonnes à supprimer
    :return: DataFrame nettoyé sans les colonnes spécifiées
    """
    return climat_cleaned.drop(*columns_to_remove)

# Liste des colonnes à supprimer
columns_to_remove = [
    "TEMP_ATTRIBUTES",
    "DEWP_ATTRIBUTES",
    "SLP_ATTRIBUTES",
    "STP_ATTRIBUTES",
    "VISIB_ATTRIBUTES",
    "WDSP_ATTRIBUTES",
    "MAX_ATTRIBUTES",
    "MIN_ATTRIBUTES",
    "PRCP_ATTRIBUTES"
]

# Suppression des colonnes dans le DataFrame climat
climat_col_cleaned = remove_unwanted_columns(climat_cleaned, columns_to_remove)
#Rajout de la colonne Celcius
climat_col_cleaned = climat_col_cleaned.withColumn("TEMP_C", bround((col("TEMP") - 32) / 1.8, 1))
 
# Afficher le DataFrame avec les colonnes 'TEMP' et 'TEMP_C' arrondies
climat_col_cleaned.select("TEMP", "TEMP_C").show()
# Afficher les colonnes restantes pour vérifier
print("Colonnes restantes :", climat_col_cleaned.columns)

+----+------+
|TEMP|TEMP_C|
+----+------+
|63.3|  17.4|
|65.0|  18.3|
|42.6|   5.9|
|26.9|  -2.8|
|34.4|   1.3|
|44.2|   6.8|
|35.3|   1.8|
|26.9|  -2.8|
|40.0|   4.4|
|43.7|   6.5|
|30.0|  -1.1|
|31.8|  -0.1|
|38.0|   3.3|
|42.0|   5.6|
|32.7|   0.4|
|25.2|  -3.8|
|34.7|   1.5|
|33.6|   0.9|
|36.6|   2.6|
|40.8|   4.9|
+----+------+
only showing top 20 rows

Colonnes restantes : ['STATION', 'DATE', 'LATITUDE', 'LONGITUDE', 'ELEVATION', 'NAME', 'TEMP', 'DEWP', 'SLP', 'STP', 'VISIB', 'WDSP', 'MXSPD', 'GUST', 'MAX', 'MIN', 'PRCP', 'SNDP', 'FRSHTT', 'TEMP_C']


In [22]:
# Transformer le jeu de données
# Ajouter de nouvelles colonnes pour l'année, le mois et le jour
climat_transformed = (
    climat_col_cleaned
    .withColumn('year', F.year(F.col('DATE')))  # Extraire l'année
    .withColumn('month', F.month(F.col('DATE')))  # Extraire le mois
    .withColumn('day', F.dayofmonth(F.col('DATE')))  # Extraire le jour
)

# Classifier les jours chauds (les données sont en degrées F°)
climat_transformed = climat_transformed.withColumn('is_hot', F.col('TEMP') > 68)

climat_transformed.show()

+-----------+----------+---------+----------+---------+--------------------+----+----+------+-----+-----+----+-----+-----+----+----+----+-----+------+------+----+-----+---+------+
|    STATION|      DATE| LATITUDE| LONGITUDE|ELEVATION|                NAME|TEMP|DEWP|   SLP|  STP|VISIB|WDSP|MXSPD| GUST| MAX| MIN|PRCP| SNDP|FRSHTT|TEMP_C|year|month|day|is_hot|
+-----------+----------+---------+----------+---------+--------------------+----+----+------+-----+-----+----+-----+-----+----+----+----+-----+------+------+----+-----+---+------+
|72401599999|2022-01-01|37.074194|-77.957528|    133.8|ALLEN C PERKINSON...|63.3|60.7|9999.9|999.9|  9.5| 8.5| 15.0| 25.1|75.2|53.1| 0.0|999.9| 10000|  17.4|2022|    1|  1| false|
|72401599999|2022-01-02|37.074194|-77.957528|    133.8|ALLEN C PERKINSON...|65.0|61.2|9999.9|999.9|  9.5|10.5| 21.0| 28.9|75.2|56.1|0.29|999.9| 10000|  18.3|2022|    1|  2| false|
|72401599999|2022-01-03|37.074194|-77.957528|    133.8|ALLEN C PERKINSON...|42.6|41.8|1013.3|999.9| 

**Questions :**
1. Quels sont les 5 premiers enregistrements après le parsing de la colonne `date` ?


In [23]:
climat_transformed.show(5)

+-----------+----------+---------+----------+---------+--------------------+----+----+------+-----+-----+----+-----+-----+----+----+----+-----+------+------+----+-----+---+------+
|    STATION|      DATE| LATITUDE| LONGITUDE|ELEVATION|                NAME|TEMP|DEWP|   SLP|  STP|VISIB|WDSP|MXSPD| GUST| MAX| MIN|PRCP| SNDP|FRSHTT|TEMP_C|year|month|day|is_hot|
+-----------+----------+---------+----------+---------+--------------------+----+----+------+-----+-----+----+-----+-----+----+----+----+-----+------+------+----+-----+---+------+
|72401599999|2022-01-01|37.074194|-77.957528|    133.8|ALLEN C PERKINSON...|63.3|60.7|9999.9|999.9|  9.5| 8.5| 15.0| 25.1|75.2|53.1| 0.0|999.9| 10000|  17.4|2022|    1|  1| false|
|72401599999|2022-01-02|37.074194|-77.957528|    133.8|ALLEN C PERKINSON...|65.0|61.2|9999.9|999.9|  9.5|10.5| 21.0| 28.9|75.2|56.1|0.29|999.9| 10000|  18.3|2022|    1|  2| false|
|72401599999|2022-01-03|37.074194|-77.957528|    133.8|ALLEN C PERKINSON...|42.6|41.8|1013.3|999.9| 


2. Combien de jours ont été classés comme 'chauds' ?


In [24]:
# Compter le nombre de jours chauds (is_hot = true)
hot_days_count = climat_transformed.filter(F.col("is_hot") == True).count()

# Afficher le résultat
print(f"Nombre total de jours classés comme 'chauds' : {hot_days_count}")

Nombre total de jours classés comme 'chauds' : 6799157



3. Quelles transformations ont été appliquées au jeu de données ?

In [None]:
# Nous avons supprimé les colonnes qui nous semblaient inutiles, géré les valeurs manquantes, 
#ajouté de nouvelles colonnes, et créé une colonne supplémentaire intitulée is_hot et une autre colonne pour
# transformer les degrées Fahrenheit en degrées Celsius

---
##  Analyse exploratoire des données (EDA)


In [None]:
# Calculer les moyennes annuelles et autres statistiques
# avg_temp_by_year = df_transformed.groupBy('year').avg('temperature')
#avg_temp_by_year.show()

**Questions :**
1. Quelle est la température moyenne mondiale pour 2020 ?


In [25]:
# Définir le chemin vers les fichiers CSV contenant les données des cyclistes
climat_2020 = climat_transformed.filter(F.col('year') == 2020)

avg_temp_2020 = climat_2020.agg({"TEMP": "mean", "TEMP_C": "mean"}).withColumnRenamed("avg(TEMP)", "Mean_Temperature_Fahrenheit") \
                                                                   .withColumnRenamed("avg(TEMP_C)", "Mean_Temperature_Celcius")

# Afficher les résultats
avg_temp_2020.show()


+---------------------------+------------------------+
|Mean_Temperature_Fahrenheit|Mean_Temperature_Celcius|
+---------------------------+------------------------+
|         55.551539799548614|      13.084178913164052|
+---------------------------+------------------------+




2. Quelles sont les 5 stations ayant enregistré les températures moyennes les plus élevées, et où sont-elles situées ?


In [26]:

# Calculer la température moyenne par station et obtenir les autres informations
avg_temp_by_station = climat_transformed.groupBy('STATION', 'NAME', 'LATITUDE', 'LONGITUDE').agg({"TEMP": "mean", "TEMP_C": "mean"}).withColumnRenamed("avg(TEMP)", "Mean_Temperature_Fahrenheit") \
                                                                                                                                    .withColumnRenamed("avg(TEMP_C)", "Mean_Temperature_Celcius")
# Trier par température moyenne (ordre décroissant) et prendre les 5 premières stations
top_5_stations = avg_temp_by_station.orderBy(col('Mean_Temperature_Fahrenheit').desc()).limit(5)
 
# Afficher les résultats
top_5_stations.show()

+-----------+----------------+---------+-----------+---------------------------+------------------------+
|    STATION|            NAME| LATITUDE|  LONGITUDE|Mean_Temperature_Fahrenheit|Mean_Temperature_Celcius|
+-----------+----------------+---------+-----------+---------------------------+------------------------+
|42770099999| NARSINGHPUR, IN|    22.95| 79.1833333|                      100.0|                    37.8|
|61437099999|     AKJOUJT, MR|    19.75|-14.3666667|                      95.42|      35.239999999999995|
|67976099999|      RUPIKE, ZI|   -20.55| 31.0833333|                       95.2|                    35.1|
|41216599999|SIR ABU NAIR, AE|25.216778|  54.233778|                       94.8|                    34.9|
|41216199999|AL HAMRA AUX, AE|24.073981|  52.463644|                       94.1|                    34.5|
+-----------+----------------+---------+-----------+---------------------------+------------------------+




3. Comment les précipitations mondiales ont-elles changé au cours des 50 dernières années ?

In [27]:
 
# Calculer la somme des précipitations mondiales par année
# Calculer la température moyenne par station et obtenir les autres informations
total_precipitation = climat_transformed.groupBy('year').agg({"PRCP": "sum"}).withColumnRenamed("sum(PRCP)", "total_precipitation")
                                                                                                                                
# Trier par température moyenne (ordre décroissant) et prendre les 5 premières stations
precip_by_year = total_precipitation.orderBy(col('year').desc()).limit(50)

# Afficher l'évolution des précipitations par année
precip_by_year.orderBy('year').show()

+----+--------------------+
|year| total_precipitation|
+----+--------------------+
|2020| 3.088141948999998E7|
|2021|3.0669664580000024E7|
|2022|3.1215037149999894E7|
|2023|3.3003350469999846E7|
|2024|2.7592773610000014E7|
+----+--------------------+



---
## Requêtes avancées


In [None]:
# Enregistrer comme vue SQL temporaire et exécuter des requêtes
climat_transformed.createOrReplaceTempView('climate')
# spark.sql('SELECT ...')

**Questions :**
1. Quelle a été l'année la plus froide enregistrée, et quelle était la température moyenne ?


In [28]:
# Calculer la température moyenne par année
coldest_month = spark.sql('''
    SELECT year, AVG(TEMP) AS Mean_Temperature_Fahrenheit, AVG(TEMP_C) AS Mean_Temperature_Celcius
    FROM climate
    GROUP BY year
    ORDER BY Mean_Temperature_Fahrenheit ASC
    LIMIT 1
''')

# Afficher le mois le plus froid et sa température moyenne
coldest_month.show()

AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `climate` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS.; line 3 pos 9;
'GlobalLimit 1
+- 'LocalLimit 1
   +- 'Sort ['Mean_Temperature_Fahrenheit ASC NULLS FIRST], true
      +- 'Aggregate ['year], ['year, 'AVG('TEMP) AS Mean_Temperature_Fahrenheit#1778, 'AVG('TEMP_C) AS Mean_Temperature_Celcius#1779]
         +- 'UnresolvedRelation [climate], [], false



2. Quelle station a contribué avec le plus grand nombre d'enregistrements ?


In [29]:
# Trouver la station avec le plus grand nombre d'enregistrements
most_records_station = spark.sql('''
    SELECT STATION, NAME, COUNT(*) AS Nombre_Enregistrement
    FROM climate
    GROUP BY STATION, NAME
    ORDER BY Nombre_Enregistrement DESC
    LIMIT 1
''')

# Afficher le résultat
most_records_station.show(truncate=False)

AnalysisException: [TABLE_OR_VIEW_NOT_FOUND] The table or view `climate` cannot be found. Verify the spelling and correctness of the schema and catalog.
If you did not qualify the name with a schema, verify the current_schema() output, or qualify the name with the correct schema and catalog.
To tolerate the error on drop use DROP VIEW IF EXISTS or DROP TABLE IF EXISTS.; line 3 pos 9;
'GlobalLimit 1
+- 'LocalLimit 1
   +- 'Sort ['Nombre_Enregistrement DESC NULLS LAST], true
      +- 'Aggregate ['STATION, 'NAME], ['STATION, 'NAME, count(1) AS Nombre_Enregistrement#1780L]
         +- 'UnresolvedRelation [climate], [], false



3. Fournissez la requête SQL utilisée pour trouver les réponses ci-dessus.

In [None]:
#Pour trouver le mois le plus froid et la température moyenne :
    #SELECT year, AVG(TEMP) AS Mean_Temperature_Fahrenheit, AVG(TEMP_C) AS Mean_Temperature_Celcius
    #FROM climate
    #GROUP BY year
    #ORDER BY Mean_Temperature_Fahrenheit ASC
    #LIMIT 1
#Pour trouver la station avec le plus grand nombre d'enregistrements :
    #SELECT STATION, NAME, COUNT(*) AS Nombre_Enregistrement
    #FROM climate
    #GROUP BY STATION, NAME
    #ORDER BY Nombre_Enregistrement DESC
    #LIMIT 1

---
## Visualisation


In [None]:
# Exporter les données vers Pandas et créer des visualisations
# import matplotlib.pyplot as plt
# df_pandas = avg_temp_by_year.toPandas()
# plt.plot(df_pandas['year'], df_pandas['avg_temperature'])

**Questions :**
1. Quelles tendances sont visibles dans le graphique des températures moyennes mondiales ?



2. Quelles anomalies sont présentes dans l'histogramme des précipitations ?



3. Quelles analyses supplémentaires recommanderiez-vous en fonction des visualisations ?

---
## Optimisation et réflexion


In [36]:
# Appliquer le cache et le repartitionnement
# df_transformed.cache()
# df_repartitioned = df_transformed.repartition(4)

**Questions :**
1. Comment la mise en cache a-t-elle amélioré les performances de vos requêtes ?



2. Comment le repartitionnement a-t-il affecté le temps d'exécution des tâches ?



3. Résumez l'information la plus surprenante que vous avez dérivée du jeu de données.