# Transformation des données infoclimat

**Auteur :**  Steve Caron  
**Date de création :** 2023/08/23  
**Présentation :** Ce notebook transforme des données stockées dans un bucket GCP et les envoie vers BigQuery


**Prérequis :** 
- Un bucket gcp pour le stockage de données. (BUCKET_NAME)
- Une table BigQuerry contenant la liste des station du reseau infoclimat (TABLE_ID_INPUT)

**Inputs :** 

**Params:**
* NOM_BUCKET : nom du bucket GCP, doit être dans le même projet.
* NOM_REPERTOIRE : Repertoire du Bucket dans lequel se trouvent les fichiers csv.
* HEADER_CSV : Paramètre qui précise si les fichiers CSV ont des header (oui = True, non=False)
* ID_TABLE_INPUT : ID de la table BigQuery, doit être dans le même projet.
* ID_TABLE_OUTPUT : ID de la table BigQuery, doit être dans le même projet.
* NOM_BUCKET_TEMP : Nom du bucket temporaire pour l'écriture dans BigQuery.
* MODE_ECRITURE : Mode d'écriture de la table BigQuery ( exemple : "overwrite", "append")

# Import des librairies

In [81]:
from google.cloud import bigquery, storage
import os
from pyspark.sql.functions import col,sum,avg,max,min,when,lower
from pyspark.sql.types import StringType,DateType,FloatType,IntegerType,DecimalType

# Definition des Paramètres

In [82]:
NOM_BUCKET = 'code_de_source_lake'
NOM_REPERTOIRE = "infoclimat"
HEADER_CSV = True
ID_TABLE_INPUT = "code-de-source.donnees_code_de_source.stations_meteo"
ID_TABLE_OUTPUT = "code-de-source.donnees_code_de_source.donnees_meteo_test"
NOM_BUCKET_TEMP = "traitement_meteo_temp"
MODE_ECRITURE = "overwrite"

# Recupération des fichiers dans le Bucket

In [83]:
# Initialisation de la liste de fichier
noms_fichier = []

# Definition du delimiter
delimiter = None

# Initialisation du client
storage_client = storage.Client()

# Note: Client.list_blobs requires at least package version 1.17.0.
blobs = storage_client.list_blobs(NOM_BUCKET, prefix=NOM_REPERTOIRE, delimiter=delimiter)

# Parcourir les objets (fichiers) dans le bucket
for blob in blobs:
    noms_fichier.append("gs://{}/{}".format(NOM_BUCKET,blob.name))

# Lecture des fichier avec Spark

In [84]:
donnees_brut = spark.read \
    .option("header", HEADER_CSV) \
    .csv(noms_fichier)

                                                                                

# Renommer les colonnes

In [85]:
donnees_renommees = donnees_brut.withColumnRenamed("station_id","code_station")\
           .withColumnRenamed("dh_utc","date")\
           .withColumnRenamed("vent_moyen","vitesse_du_vent")\
           .withColumnRenamed("vent_direction","direction_du_vent")\
           .withColumnRenamed("pluie_3h","precipitation_3h")\
           .withColumnRenamed("pluie_1h","precipitation_1h")

In [86]:
# donnees_renommees.printSchema()
# print(type(donnees_renommees))

# Nettoyage des lignes avec vitesses de vent mais pas de direction

Dans certain cas, les stations mesures des vitesses de vent sans mesurer la direction du vent. Pouvant entrainer des vitesses de vent négative.  
Nous remplaçons les enregistrement de vent sans direction par la valeur nulle.

In [87]:
donnees_nettoyees = donnees_renommees.withColumn("vitesse_du_vent", \
        when((donnees_renommees["vitesse_du_vent"].isNotNull()) & (donnees_renommees["direction_du_vent"].isNull()), None)\
        .otherwise(donnees_renommees["vitesse_du_vent"]))

# Typage des colonnes

In [88]:
donnees_typees = donnees_nettoyees.withColumn("date",col("date").cast(DateType()))\
    .withColumn("temperature",col("temperature").cast(DecimalType(10,1)))\
    .withColumn("pression",col("pression").cast(IntegerType()))\
    .withColumn("humidite",col("humidite").cast(IntegerType()))\
    .withColumn("point_de_rosee",col("point_de_rosee").cast(DecimalType(10,1)))\
    .withColumn("vitesse_du_vent",col("vitesse_du_vent").cast(DecimalType(10,2)))\
    .withColumn("vent_rafales",col("vent_rafales").cast(DecimalType(10,2)))\
    .withColumn("direction_du_vent",col("direction_du_vent").cast(IntegerType()))\
    .withColumn("precipitation_3h",col("precipitation_3h").cast(DecimalType(10,1)))\
    .withColumn("precipitation_1h",col("precipitation_1h").cast(DecimalType(10,1)))

In [89]:
#  donnees_typees.printSchema()
# donnees_typees.show(1)

# Ajout d'une colone pour la temperature max, la temperature min, la pression max, la pression min, la vitesse du vent max

In [90]:
donnees_ajout_col = donnees_typees.withColumn("temperature_max",col("temperature"))\
.withColumn("temperature_min",col("temperature"))\
.withColumn("pression_max",col("pression"))\
.withColumn("pression_min",col("pression"))\
.withColumn("vitesse_du_vent_max",col("vitesse_du_vent"))

In [91]:
# donnees_ajout_col.printSchema()
# print(type(donnees_ajout_col))

# Aggregation par date

Je fais une aggrégation par date et par code station pour n'avoir qu'une seule mesure par jour, et non pas une mesure toutes les 20minutes ou une mesure par heure ...

In [92]:
donnees_aggregees = donnees_ajout_col.groupBy("code_station","date") \
    .agg(avg("temperature").cast(DecimalType(10,1)).alias("temperature"), \
         min("temperature_min").alias("temperature_min"), \
         max("temperature_max").alias("temperature_max"), \
         avg("pression").cast(IntegerType()).alias("pression"),\
         min("pression_min").cast(IntegerType()).alias("pression_min"), \
         max("pression_max").cast(IntegerType()).alias("pression_max"), \
         avg("humidite").cast(IntegerType()).alias("humidite"),\
         avg("point_de_rosee").cast(DecimalType(10,1)).alias("point_de_rosee"),\
         avg("vitesse_du_vent").cast(DecimalType(10,2)).alias("vitesse_du_vent"),\
         max("vitesse_du_vent_max").alias("vitesse_du_vent_max"), \
         avg("direction_du_vent").cast(IntegerType()).alias("direction_du_vent"),\
         sum("precipitation_3h").alias("precipitation")) \
        .orderBy("date")

In [93]:
# donnees_aggregees.show(5)
# print(type(donnees_aggregees))

# Collecte des informations sur les stations

Il faut ajouter à ma table les informations sur les stations climatique. Ces informations étant contenues dans une autre table, je charge cette table pour ensuite faire une jointure des deux tables.

In [94]:
stations = spark.read.format('bigquery') \
  .option('table', ID_TABLE_INPUT) \
  .load()

#Je selectionne uniquement les colonnes que je veux garder.
stations = stations.select("code_station", "nom_station", "coordonnees_x", "coordonnees_y")

In [95]:
# stations.printSchema()
# stations.show(5)
# print(type(stations))

# Jointure des tables

In [96]:
data_jointes = donnees_aggregees.join(stations,
                                      donnees_aggregees["code_station"] == stations["code_station"]
                                     ).drop(stations[0])

In [97]:
# data_jointes.printSchema()

# Renommer les nouvelles colonnes

In [98]:
data_jointes_renommees = data_jointes.withColumnRenamed("coordonnees_x","longitude_x")\
                                     .withColumnRenamed("coordonnees_y","latitude_y")\
                                     .withColumnRenamed("nom_station","localite")

In [99]:
# data_jointes_renommees.printSchema()

# Typage des nouvelles colonnes

In [100]:
data_jointes_typees = data_jointes_renommees.select("date","localite",\
                                   "longitude_x","latitude_y",\
                                   "temperature","temperature_min","temperature_max",\
                                   "pression","pression_min","pression_max",\
                                   "humidite","point_de_rosee",\
                                   "vitesse_du_vent","vitesse_du_vent_max","direction_du_vent",\
                                   "precipitation")\
                                    .withColumn("precipitation",col("precipitation").cast(DecimalType(10,1)))\
                                    .withColumn("humidite",col("humidite").cast(DecimalType(10,1)))\
                                    .withColumn("vitesse_du_vent",col("vitesse_du_vent").cast(DecimalType(10,1)))\
                                    .withColumn("vitesse_du_vent_max",col("vitesse_du_vent_max").cast(DecimalType(10,1)))\
                                    .withColumn("longitude_x",col("longitude_x").cast(StringType()))\
                                    .withColumn("latitude_y",col("latitude_y").cast(StringType()))\
                                    .withColumn("localite",lower(col("localite")))


In [101]:
# data_jointes_typees.printSchema()
# data_jointes_typees.show(1)

# Enregistrement dans une Table

In [102]:
data_jointes_typees.write \
  .format("bigquery") \
  .option("table",ID_TABLE_OUTPUT)\
  .option("temporaryGcsBucket",NOM_BUCKET_TEMP) \
  .mode(MODE_ECRITURE) \
  .save()

print(f"Les données sont enregistrées dans la table {ID_TABLE_OUTPUT}")

                                                                                

Les données sont enregistrées dans la table code-de-source.donnees_code_de_source.donnees_meteo_test
