# Transformation des données infoclimat

**Auteur :**  Steve Caron  
**Date de création :** 2023/08/23  
**Présentation :** Ce notebook transforme des données stockées dans un bucket GCP et les envoie vers BigQuery


**Prérequis :**  
* Un bucket gcp contenant des données au format CSV
* Une table BigQuery contenant les information sur les stations météo (code, nom, coordonnées)  
* Une table BigQuery pouvant accueillir les données


**Prérequis :** 
- Un bucket gcp pour le stockage de données. (BUCKET_NAME)
- Une table BigQuerry contenant la liste des station (TABLE_ID_INPUT)
- Une table BigQuerry sur laquelle envoyer les données finales (ID_TABLE_OUTPUT)

**Inputs :** 

**Params:**
* BUCKET_NAME : nom du bucket GCP, doit être dans le même projet.
* TABLE_ID_INPUT : ID de la table BigQuery, doit être dans le même projet.
* Table_ID_OUTPUT : ID de la table BigQuery, doit être dans le même projet.

# Import des librairies

In [1]:
from google.cloud import bigquery
from google.cloud import storage
import os
from pyspark.sql.functions import col,sum,avg,max,min,round,lit
from pyspark.sql.types import StringType,BooleanType,DateType,FloatType,IntegerType,StructType,StructField,DecimalType

# Definition des Paramètres

In [2]:
NOM_BUCKET = 'code_de_source_lake'
NOM_REPERTOIRE = "infoclimat"
ID_TABLE_INPUT = "code-de-source.donnees_code_de_source.stations_meteo"
ID_TABLE_OUTPUT = "code-de-source.donnees_code_de_source.donnees_meteo"
NOM_BUCKET_TEMP = "traitement_meteo_temp"

# Recupération des fichiers dans le Bucket

In [3]:
# Initialisation de la liste de fichier
noms_fichier = []

# Definition du delimiter
delimiter = None

# Initialisation du client
storage_client = storage.Client()

# Note: Client.list_blobs requires at least package version 1.17.0.
blobs = storage_client.list_blobs(NOM_BUCKET, prefix=NOM_REPERTOIRE, delimiter=delimiter)

# Parcourir les objets (fichiers) dans le bucket
for blob in blobs:
    noms_fichier.append("gs://{}/{}".format(NOM_BUCKET,blob.name))

In [4]:
# print(noms_fichier)

# Lecture des fichier avec Spark

In [5]:
donnees_brut = spark.read \
    .option("header", "true") \
    .csv(noms_fichier)

23/08/23 13:56:26 WARN GhfsStorageStatistics: Detected potential high latency for operation op_get_file_status. latencyMs=497; previousMaxLatencyMs=422; operationCount=50; context=gs://code_de_source_lake/infoclimat/météo_000EN_2022.csv
23/08/23 13:56:26 WARN GhfsStorageStatistics: Detected potential high latency for operation op_get_file_status. latencyMs=583; previousMaxLatencyMs=497; operationCount=50; context=gs://code_de_source_lake/infoclimat/météo_000EN_2023.csv
23/08/23 13:56:26 WARN GhfsStorageStatistics: Detected potential high latency for operation op_get_file_status. latencyMs=618; previousMaxLatencyMs=583; operationCount=50; context=gs://code_de_source_lake/infoclimat/météo_000EN_2017.csv
23/08/23 13:56:26 WARN GhfsStorageStatistics: Detected potential high latency for operation op_get_file_status. latencyMs=730; previousMaxLatencyMs=618; operationCount=50; context=gs://code_de_source_lake/infoclimat/météo_000BS_2020.csv
                                                    

In [6]:
# donnees_brut.show(1)
# print(type(donnees_brut))

# Renommer les colonnes

In [7]:
donnees_renommees = donnees_brut.withColumnRenamed("station_id","code_station")\
           .withColumnRenamed("dh_utc","date")\
           .withColumnRenamed("vent_moyen","vitesse_du_vent")\
           .withColumnRenamed("vent_direction","direction_du_vent")\
           .withColumnRenamed("pluie_3h","precipitation_3h")\
           .withColumnRenamed("pluie_1h","precipitation_1h")

In [8]:
# donnees_renommees.printSchema()
# print(type(donnees_renommees))

# Typage des colonnes

In [9]:
donnees_typees = donnees_renommees.withColumn("date",col("date").cast(DateType()))\
    .withColumn("temperature",col("temperature").cast(DecimalType(10,1)))\
    .withColumn("pression",col("pression").cast(IntegerType()))\
    .withColumn("humidite",col("humidite").cast(IntegerType()))\
    .withColumn("point_de_rosee",col("point_de_rosee").cast(DecimalType(10,1)))\
    .withColumn("vitesse_du_vent",col("vitesse_du_vent").cast(DecimalType(10,2)))\
    .withColumn("vent_rafales",col("vent_rafales").cast(DecimalType(10,2)))\
    .withColumn("direction_du_vent",col("direction_du_vent").cast(IntegerType()))\
    .withColumn("precipitation_3h",col("precipitation_3h").cast(DecimalType(10,1)))\
    .withColumn("precipitation_1h",col("precipitation_1h").cast(DecimalType(10,1)))

In [10]:
#  donnees_typees.printSchema()
# donnees_typees.show(1)
# print(type(donnees_typees))

# Ajout d'une colone pour la temperature max, la temperature min, la pression max, la pression min, la vitesse du vent max

In [11]:
donnees_ajout_col = donnees_typees.withColumn("temperature_max",col("temperature"))\
.withColumn("temperature_min",col("temperature"))\
.withColumn("pression_max",col("pression"))\
.withColumn("pression_min",col("pression"))\
.withColumn("vitesse_du_vent_max",col("vitesse_du_vent"))

In [12]:
# donnees_ajout_col.printSchema()
# print(type(donnees_ajout_col))

# Aggregation par date

In [13]:
donnees_aggregees = donnees_ajout_col.groupBy("code_station","date") \
    .agg(avg("temperature").cast(DecimalType(10,1)).alias("temperature"), \
         min("temperature_min").alias("temperature_min"), \
         max("temperature_max").alias("temperature_max"), \
         avg("pression").cast(IntegerType()).alias("pression"),\
         min("pression_min").cast(IntegerType()).alias("pression_min"), \
         max("pression_max").cast(IntegerType()).alias("pression_max"), \
         avg("humidite").cast(IntegerType()).alias("humidite"),\
         avg("point_de_rosee").cast(DecimalType(10,1)).alias("point_de_rosee"),\
         avg("vitesse_du_vent").cast(DecimalType(10,2)).alias("vitesse_du_vent"),\
         max("vitesse_du_vent_max").alias("vitesse_du_vent_max"), \
         avg("direction_du_vent").cast(IntegerType()).alias("direction_du_vent"),\
         sum("precipitation_3h").alias("precipitation")) \
        .orderBy("date")

In [14]:
# donnees_aggregees.show(5)
# print(type(donnees_aggregees))

# Collecte des informations sur les stations

In [15]:
stations = spark.read.format('bigquery') \
  .option('table', ID_TABLE_INPUT) \
  .load()

stations = stations.select("code_station", "nom_station", "coordonnees_x", "coordonnees_y")

In [16]:
# stations.printSchema()
# stations.show(5)
# print(type(stations))

# Jointure des tables

In [17]:
data_jointes = donnees_aggregees.join(stations,
                                      donnees_aggregees["code_station"] == stations["code_station"]
                                     ).drop(stations[0])\
                                     .withColumnRenamed("coordonnees_x","longitude_x")\
                                     .withColumnRenamed("coordonnees_y","latitude_y")\
                                     .withColumnRenamed("nom_station","localite")\
                                     .withColumn("localite",col("localite").cast(StringType()))\
                                     .withColumn("longitude_x",col("longitude_x").cast(FloatType()))\
                                     .withColumn("latitude_y",col("latitude_y").cast(FloatType()))

data_jointes = data_jointes.select("date","localite","code_station",\
                                   "longitude_x","latitude_y",\
                                   "temperature","temperature_min","temperature_max",\
                                   "pression","pression_min","pression_max",\
                                   "humidite","point_de_rosee",\
                                   "vitesse_du_vent","vitesse_du_vent_max","direction_du_vent",\
                                   "precipitation")\
                                    .withColumn("temperature",col("temperature").cast(DecimalType(10,1)))\
                                    .withColumn("temperature",col("temperature").cast(DecimalType(10,1)))\
                                    .withColumn("precipitation",col("precipitation").cast(DecimalType(10,1)))\
                                    .withColumn("humidite",col("humidite").cast(DecimalType(10,1)))\
                                    .withColumn("vitesse_du_vent",col("vitesse_du_vent").cast(DecimalType(10,1)))\
                                    .withColumn("vitesse_du_vent_max",col("vitesse_du_vent_max").cast(DecimalType(10,1)))


In [18]:
data_jointes.printSchema()
# data_finales.show(1)
# print(type(data_finales))

root
 |-- date: date (nullable = true)
 |-- localite: string (nullable = true)
 |-- code_station: string (nullable = true)
 |-- longitude_x: float (nullable = true)
 |-- latitude_y: float (nullable = true)
 |-- temperature: decimal(10,1) (nullable = true)
 |-- temperature_min: decimal(10,1) (nullable = true)
 |-- temperature_max: decimal(10,1) (nullable = true)
 |-- pression: integer (nullable = true)
 |-- pression_min: integer (nullable = true)
 |-- pression_max: integer (nullable = true)
 |-- humidite: decimal(10,1) (nullable = true)
 |-- point_de_rosee: decimal(10,1) (nullable = true)
 |-- vitesse_du_vent: decimal(10,1) (nullable = true)
 |-- vitesse_du_vent_max: decimal(10,1) (nullable = true)
 |-- direction_du_vent: integer (nullable = true)
 |-- precipitation: decimal(10,1) (nullable = true)



# Enregistrement dans une Table

In [19]:
data_jointes.write \
  .format("bigquery") \
  .option("table",ID_TABLE_OUTPUT)\
  .option("temporaryGcsBucket",NOM_BUCKET_TEMP) \
  .mode("append") \
  .save()

23/08/23 13:57:31 WARN GhfsStorageStatistics: Detected potential high latency for operation op_delete. latencyMs=190; previousMaxLatencyMs=0; operationCount=1; context=gs://traitement_meteo_temp/.spark-bigquery-application_1692795747682_0005-d2ab9648-3da5-495c-b065-e81221760d79/_temporary
23/08/23 13:57:31 WARN GhfsStorageStatistics: Detected potential high latency for operation stream_write_close_operations. latencyMs=111; previousMaxLatencyMs=0; operationCount=1; context=gs://traitement_meteo_temp/.spark-bigquery-application_1692795747682_0005-d2ab9648-3da5-495c-b065-e81221760d79/_SUCCESS
ERROR:root:Exception while sending command.
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/py4j-0.10.9.5-src.zip/py4j/clientserver.py", line 516, in send_command
    raise Py4JNetworkError("Answer from Java side is empty")
py4j.protocol.Py4JNetworkError: Answer from Java side is empty

During handling of the above exception, another exception occurred:

Traceback (most recent c