# Traitement de données
---
Dans cette partie, nous nous chargerons d'anlyser et de traiter les données de plusieurs maniere:
- Nettoyage 
- Transformation
- Normalisation
---
Nous utiliserons pour cela pySpark pour l'analyse de ces données.

## Bibliothéques et librairie utiles
Nous utiliserons les librairies suivantes :
1. **`pandas`** : Utilisé pour manipuler et analyser des données sous forme de tableaux (DataFrames), principalement en mémoire.

2. **`os`** : Permet d'interagir avec le système de fichiers, par exemple pour manipuler les chemins de fichiers.

3. **`IPython.display`** : Fournit des outils pour afficher des objets dans un environnement IPython, comme des tableaux, des graphiques ou des images.

4. **`pyspark.sql`** : Fournit des fonctions et classes pour travailler avec Apache Spark, une plateforme de traitement de données distribuées. `SparkSession` est l'interface principale pour interagir avec Spark, et `col`, `max`, `min` sont des fonctions pour manipuler des données dans Spark DataFrame.

In [None]:
import pandas as pd
import os
from IPython.display import display
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, max, min, to_date

import requests
import json
import time

Nous allons récupérer les données de la NASA avant de les changer un fichier CSV 

Collecte et mis à jours

In [33]:
def get_json_data():
    #Donnée Sénégal-Dakar
    lat = "14.692"
    lon = "-17.4467"
    start_date = "20220101"
    end_date = "20240101"
    url = f"https://power.larc.nasa.gov/api/temporal/daily/point?parameters=T2M,RH2M,T2MWET,PRECTOT,WS10M,WD10M,T2MDEW,V10M,PS,QV2M,U10M&community=AG&longitude={lon}&latitude={lat}&start={start_date}&end={end_date}&format=json"
    
    response = requests.get(url)
    data = response.json()
    return data["properties"]["parameter"]


def convert_to_df(parameters):
    # Créer une structure de dictionnaire pour stocker les données par date
    data = {'Date': []}

    # Ajouter les colonnes pour chaque paramètre
    for param in parameters.keys():
        data[param] = []

    # Remplir les données
    for date in parameters['T2M'].keys():  # Itérer sur les dates communes
        data['Date'].append(date)  # Ajouter la date
        for param in parameters.keys():
            data[param].append(parameters[param].get(date, None))  # Ajouter la valeur pour chaque paramètre

    # Créer un DataFrame
    df = pd.DataFrame(data)
    return df

In [34]:
data = get_json_data()
df = convert_to_df(data)
df.to_csv("Nasa_POWER_dataset.csv",columns=df.columns,index=False)

Importons le dataset et affichons les 5 premieres lignes du dataset

In [35]:
dataset_path = "Nasa_POWER_dataset.csv"
df = pd.read_csv(dataset_path)
df.head(5)

Unnamed: 0,Date,T2MWET,WD10M,T2M,PS,RH2M,U10M,T2MDEW,QV2M,WS10M,V10M,PRECTOTCORR
0,20220101,21.41,57.7,25.52,101.26,60.78,-4.7,17.3,12.26,5.74,-2.97,0.0
1,20220102,21.14,49.3,25.01,101.29,62.47,-4.53,17.27,12.23,6.3,-3.89,0.0
2,20220103,20.66,43.1,24.54,101.21,62.79,-3.29,16.78,11.86,5.01,-3.51,0.0
3,20220104,20.69,42.8,24.44,101.11,64.11,-2.41,16.95,12.04,3.73,-2.61,0.0
4,20220105,20.7,52.0,24.35,101.2,64.04,-3.58,17.04,12.08,4.82,-2.8,0.01


Créons notre Session Spark

In [36]:
spark = SparkSession.builder \
    .appName("Manipulation weatherHistory avec Spark") \
    .getOrCreate()

Testons la session spark

In [37]:
df = spark.read.csv(dataset_path, header=True, inferSchema=True)
display(df.limit(5).toPandas())

Unnamed: 0,Date,T2MWET,WD10M,T2M,PS,RH2M,U10M,T2MDEW,QV2M,WS10M,V10M,PRECTOTCORR
0,20220101,21.41,57.7,25.52,101.26,60.78,-4.7,17.3,12.26,5.74,-2.97,0.0
1,20220102,21.14,49.3,25.01,101.29,62.47,-4.53,17.27,12.23,6.3,-3.89,0.0
2,20220103,20.66,43.1,24.54,101.21,62.79,-3.29,16.78,11.86,5.01,-3.51,0.0
3,20220104,20.69,42.8,24.44,101.11,64.11,-2.41,16.95,12.04,3.73,-2.61,0.0
4,20220105,20.7,52.0,24.35,101.2,64.04,-3.58,17.04,12.08,4.82,-2.8,0.01


Déterminons la longueur du dataset
- le dataset a 12 variable
- le dataset a 96453 lignes

In [38]:
df.toPandas().shape

(731, 12)

Etudions les types des variables

In [39]:
df.printSchema()

root
 |-- Date: integer (nullable = true)
 |-- T2MWET: double (nullable = true)
 |-- WD10M: double (nullable = true)
 |-- T2M: double (nullable = true)
 |-- PS: double (nullable = true)
 |-- RH2M: double (nullable = true)
 |-- U10M: double (nullable = true)
 |-- T2MDEW: double (nullable = true)
 |-- QV2M: double (nullable = true)
 |-- WS10M: double (nullable = true)
 |-- V10M: double (nullable = true)
 |-- PRECTOTCORR: double (nullable = true)



Etudions le pourcentage de valeur null
- on constate qu'il n'y a pas de valeur manquante

In [40]:
total_rows = df.count()

for column in df.columns:
    null_count = df.filter(col(column).isNull()).count()
    percent = (null_count / total_rows) * 100 if total_rows > 0 else 0
    print(f"Colonne: {column} -> {percent:.2f}% de valeurs nulles - valeur null: {null_count}")

Colonne: Date -> 0.00% de valeurs nulles - valeur null: 0
Colonne: T2MWET -> 0.00% de valeurs nulles - valeur null: 0
Colonne: WD10M -> 0.00% de valeurs nulles - valeur null: 0
Colonne: T2M -> 0.00% de valeurs nulles - valeur null: 0
Colonne: PS -> 0.00% de valeurs nulles - valeur null: 0
Colonne: RH2M -> 0.00% de valeurs nulles - valeur null: 0
Colonne: U10M -> 0.00% de valeurs nulles - valeur null: 0
Colonne: T2MDEW -> 0.00% de valeurs nulles - valeur null: 0
Colonne: QV2M -> 0.00% de valeurs nulles - valeur null: 0
Colonne: WS10M -> 0.00% de valeurs nulles - valeur null: 0
Colonne: V10M -> 0.00% de valeurs nulles - valeur null: 0
Colonne: PRECTOTCORR -> 0.00% de valeurs nulles - valeur null: 0


In [41]:
for column in df.columns:
    min_value = df.agg(min(col(column))).collect()[0][0]
    max_value = df.agg(max(col(column))).collect()[0][0]
    print(f"Colone {column} -> Max: {max_value} - Min: {min_value}")

Colone Date -> Max: 20240101 - Min: 20220101
Colone T2MWET -> Max: 27.74 - Min: 16.32
Colone WD10M -> Max: 360.0 - Min: 0.1
Colone T2M -> Max: 29.81 - Min: 17.85
Colone PS -> Max: 101.71 - Min: 100.65
Colone RH2M -> Max: 97.68 - Min: 50.74
Colone U10M -> Max: 8.05 - Min: -7.15
Colone T2MDEW -> Max: 26.42 - Min: 12.53
Colone QV2M -> Max: 21.48 - Min: 9.09
Colone WS10M -> Max: 9.54 - Min: 1.47
Colone V10M -> Max: 7.97 - Min: -9.4
Colone PRECTOTCORR -> Max: 61.03 - Min: 0.0


Conversion des dates

In [42]:
df = df.withColumn('Date', to_date(col('Date'), 'yyyyMMdd'))

Suppression des lignes contenant des nan

In [43]:
df_cleaned = df.dropna()

In [44]:
display(df.limit(5).toPandas())

Unnamed: 0,Date,T2MWET,WD10M,T2M,PS,RH2M,U10M,T2MDEW,QV2M,WS10M,V10M,PRECTOTCORR
0,2022-01-01,21.41,57.7,25.52,101.26,60.78,-4.7,17.3,12.26,5.74,-2.97,0.0
1,2022-01-02,21.14,49.3,25.01,101.29,62.47,-4.53,17.27,12.23,6.3,-3.89,0.0
2,2022-01-03,20.66,43.1,24.54,101.21,62.79,-3.29,16.78,11.86,5.01,-3.51,0.0
3,2022-01-04,20.69,42.8,24.44,101.11,64.11,-2.41,16.95,12.04,3.73,-2.61,0.0
4,2022-01-05,20.7,52.0,24.35,101.2,64.04,-3.58,17.04,12.08,4.82,-2.8,0.01


# Changeons le nom des colonnes pour qu'ils soit plus parlant

## Traduction des Colonnes

| Colonne         | Nom en BD                      | Traduction en Français                     | Unité |
|----------------|--------------------------------|--------------------------------------------|-------|
| **Date**       | **date**                       | Date                                       | -     |
| **WD10M**      | **direction_vent**             | Direction du vent à 10m                    | °     |
| **T2MWET**     | **temperature_humide**        | Température humide à 2m                    | °C    |
| **PS**         | **pression**                   | Pression de surface                        | kPa   |
| **QV2M**       | **humidite_specifique**        | Humidité spécifique à 2m                   | g/kg  |
| **T2MDEW**     | **temperature_point_rosee**    | Température du point de rosée à 2m         | °C    |
| **V10M**       | **vitesse_vent**               | Vitesse du vent à 10m                      | m/s   |
| **WS10M**      | **intensite_vent**             | Intensité du vent à 10m                    | m/s   |
| **T2M**        | **temperature_air**            | Température de l'air à 2m                  | °C    |
| **RH2M**       | **humidite_relative**          | Humidité relative à 2m                     | %     |
| **U10M**       | **composante_est_ouest_vent**  | Composante Est-Ouest du vent à 10m         | m/s   |
| **PRECTOTCORR**| **precipitations_corrigees**   | Précipitations corrigées totales           | mm    |



In [45]:

# Dictionnaire de renommage des colonnes
rename_dict = {
    "Date": "date",
    "WD10M": "direction_vent",
    "T2MWET": "temperature_humide",
    "PS": "pression",
    "QV2M": "humidite_specifique",
    "T2MDEW": "temperature_point_rosee",
    "V10M": "vitesse_vent",
    "WS10M": "intensite_vent",
    "T2M": "temperature_air",
    "RH2M": "humidite_relative",
    "U10M": "composante_est_ouest_vent",
    "PRECTOTCORR": "precipitations_corrigees"
}

for old_name, new_name in rename_dict.items():
    df = df.withColumnRenamed(old_name, new_name)

display(df.limit(5).toPandas())

Unnamed: 0,date,temperature_humide,direction_vent,temperature_air,pression,humidite_relative,composante_est_ouest_vent,temperature_point_rosee,humidite_specifique,intensite_vent,vitesse_vent,precipitations_corrigees
0,2022-01-01,21.41,57.7,25.52,101.26,60.78,-4.7,17.3,12.26,5.74,-2.97,0.0
1,2022-01-02,21.14,49.3,25.01,101.29,62.47,-4.53,17.27,12.23,6.3,-3.89,0.0
2,2022-01-03,20.66,43.1,24.54,101.21,62.79,-3.29,16.78,11.86,5.01,-3.51,0.0
3,2022-01-04,20.69,42.8,24.44,101.11,64.11,-2.41,16.95,12.04,3.73,-2.61,0.0
4,2022-01-05,20.7,52.0,24.35,101.2,64.04,-3.58,17.04,12.08,4.82,-2.8,0.01


Enregistrement du dataset apres nettoyage, transformation, et normalisation 

In [46]:
dataset_path = "Nasa_POWER_dataset_cleaned.csv"
df.toPandas().to_csv(dataset_path, index=False)