# Introduction

 Ici on va meregr les bases selon les besoins d'analyses et en fonction des données disponibles.
 On aura une base globale mensuelle et une base globale annuelle

In [0]:
# Bibliothèques nécessaires
from pyspark.sql import functions as F

In [0]:
# On va d'abord créer la base annuelle. Pour cela, on va chargée nos données météorologiques qu'on va merger aux données économiques traitées dans le notebook 03. data_clean_economic

df_weather = spark.read.table("weather_senegal_clean")
df_weather.printSchema()
display(df_weather.limit(5))

root
 |-- region: string (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- tavg: double (nullable = true)
 |-- tmin: double (nullable = true)
 |-- tmax: double (nullable = true)
 |-- prcp: double (nullable = true)
 |-- wspd: double (nullable = true)
 |-- pres: double (nullable = true)



region,time,tavg,tmin,tmax,prcp,wspd,pres
Dakar,2005-01-01T00:00:00.000Z,22.6,18.5,28.0,0.0,16.4,1012.2986111111122
Dakar,2005-01-02T00:00:00.000Z,21.5,18.8,24.0,0.0,17.0,1012.2986111111122
Dakar,2005-01-05T00:00:00.000Z,20.1,18.0,23.0,0.0,16.2,1012.2986111111122
Dakar,2005-01-08T00:00:00.000Z,22.1,19.9,25.0,0.0,21.2,1012.2986111111122
Dakar,2005-01-09T00:00:00.000Z,21.6,19.0,25.0,0.0,19.0,1012.2986111111122


In [0]:
# On crée une variable date 
df_weather = df_weather.withColumn(
    "Year",
    F.year(F.col("time"))
)

In [0]:
# On agrège les données pour qu'elles soient annuelles. On va travailler ici avec les température moyennes, précipitations et pressions.
df_weather_annual = (
    df_weather
    .groupBy("Year")
    .agg(
        F.avg("tavg").alias("avg_temperature"),
        F.sum("prcp").alias("total_precipitation"),
        F.avg("pres").alias("avg_pressure")
    )
)

In [0]:
# Affichons la base
display(df_weather_annual.orderBy("Year"))

Year,avg_temperature,total_precipitation,avg_pressure
2005,28.71919682259484,5224.400000000006,1011.2347672084724
2006,28.531488203266772,3399.0999999999995,1011.2394931916094
2007,28.17781569965869,1702.0,1011.4508161488728
2008,27.88793407886991,3991.7000000000007,1011.3557871285636
2009,27.9764381402679,4925.600000000004,1011.1819275281548
2010,28.764912280701715,5289.700000000004,1011.204935334434
2011,27.861874386653625,3783.8000000000006,1011.29236549284
2012,27.986167146974044,6735.300000000003,1011.2694924187704
2013,28.079271991911074,4491.799999999997,1011.2964570549656
2014,27.59651545036165,3139.8000000000006,1011.414740059626


Les observations météorologiques quotidiennes ont été horodatées à l'aide du temps variable. Les indicateurs annuels ont été construits en extrayant l'année de l'horodatage et en agrégeant les observations quotidiennes en moyennes annuelles (température, pression) et totaux (précipitations).

In [0]:
# Données météo annuelles
df_weather_annual = spark.read.table("weather_senegal_clean") \
    .withColumn("Year", F.year("time")) \
    .groupBy("Year") \
    .agg(
        F.avg("tavg").alias("avg_temperature"),
        F.sum("prcp").alias("total_precipitation"),
        F.avg("pres").alias("avg_pressure")
    )

# Données agricoles (rendements agricoles, surface emblavée, production, produits cultivés)
df_agri = (
    spark.read
    .table("annual_economic_data")
    .drop("Area", "pib_item", "pib_value", "pib_unit")
)

In [0]:
# On vérife les schémas des bases
df_weather_annual.printSchema()
df_agri.printSchema()


root
 |-- Year: integer (nullable = true)
 |-- avg_temperature: double (nullable = true)
 |-- total_precipitation: double (nullable = true)
 |-- avg_pressure: double (nullable = true)

root
 |-- Year: integer (nullable = true)
 |-- Item: string (nullable = true)
 |-- Element: string (nullable = true)
 |-- Unit: string (nullable = true)
 |-- Value: double (nullable = true)
 |-- dep_item: string (nullable = true)
 |-- dep_agri_value: long (nullable = true)
 |-- dep_agri_unit: string (nullable = true)



In [0]:
# Puis on merge les bases 
df_merged = (
    df_agri
    .join(df_weather_annual, on="Year", how="left")
)


In [0]:
# Affichons la base mergée
display(df_merged.orderBy("Year"))

Year,Item,Element,Unit,Value,dep_item,dep_agri_value,dep_agri_unit,avg_temperature,total_precipitation,avg_pressure
2005,Millet,Production,t,608551.0,,,,28.71919682259484,5224.400000000006,1011.2347672084724
2005,"Mangoes, guavas and mangosteens",Production,t,61646.0,,,,28.71919682259484,5224.400000000006,1011.2347672084724
2005,Sesame seed,Production,t,31839.0,,,,28.71919682259484,5224.400000000006,1011.2347672084724
2005,Potatoes,Yield,kg/ha,13653.6,,,,28.71919682259484,5224.400000000006,1011.2347672084724
2005,"Other beans, green",Area harvested,ha,645.0,,,,28.71919682259484,5224.400000000006,1011.2347672084724
2005,Rice,Production,t,279080.0,,,,28.71919682259484,5224.400000000006,1011.2347672084724
2005,Carrots and turnips,Yield,kg/ha,14979.8,,,,28.71919682259484,5224.400000000006,1011.2347672084724
2005,"Onions and shallots, dry (excluding dehydrated)",Yield,kg/ha,25000.0,,,,28.71919682259484,5224.400000000006,1011.2347672084724
2005,"Seed cotton, unginned",Area harvested,ha,38254.0,,,,28.71919682259484,5224.400000000006,1011.2347672084724
2005,Oranges,Area harvested,ha,4000.0,,,,28.71919682259484,5224.400000000006,1011.2347672084724


In [0]:
# On affiche le schéma après avoir mergé
df_merged.printSchema()

root
 |-- Year: integer (nullable = true)
 |-- Item: string (nullable = true)
 |-- Element: string (nullable = true)
 |-- Unit: string (nullable = true)
 |-- Value: double (nullable = true)
 |-- dep_item: string (nullable = true)
 |-- dep_agri_value: long (nullable = true)
 |-- dep_agri_unit: string (nullable = true)
 |-- avg_temperature: double (nullable = true)
 |-- total_precipitation: double (nullable = true)
 |-- avg_pressure: double (nullable = true)



In [0]:
# Sauvegarde de la base annuelle (agriculture + climat)
(
    df_merged
    .write
    .mode("overwrite")  
    .format("delta")     
    .saveAsTable("annual_agri_climate_data")
)

print(" Table Delta créée : annual_agri_climate_data")

 Table Delta créée : annual_agri_climate_data


In [0]:
# Chargeons la table sauvegardée, on visualise puis on compte le nombre d'observations
df_check = spark.read.table("annual_agri_climate_data")
df_check.printSchema()
display(df_check.limit(5))
print("Nombre total d'observations :", df_check.count())


root
 |-- Year: integer (nullable = true)
 |-- Item: string (nullable = true)
 |-- Element: string (nullable = true)
 |-- Unit: string (nullable = true)
 |-- Value: double (nullable = true)
 |-- dep_item: string (nullable = true)
 |-- dep_agri_value: long (nullable = true)
 |-- dep_agri_unit: string (nullable = true)
 |-- avg_temperature: double (nullable = true)
 |-- total_precipitation: double (nullable = true)
 |-- avg_pressure: double (nullable = true)



Year,Item,Element,Unit,Value,dep_item,dep_agri_value,dep_agri_unit,avg_temperature,total_precipitation,avg_pressure
2005,Bananas,Area harvested,ha,939.0,,,,28.71919682259484,5224.400000000006,1011.2347672084724
2005,Bananas,Yield,kg/ha,28026.6,,,,28.71919682259484,5224.400000000006,1011.2347672084724
2005,Bananas,Production,t,26317.0,,,,28.71919682259484,5224.400000000006,1011.2347672084724
2006,Bananas,Area harvested,ha,1222.0,,,,28.531488203266772,3399.0999999999995,1011.2394931916094
2006,Bananas,Yield,kg/ha,24549.9,,,,28.531488203266772,3399.0999999999995,1011.2394931916094


Nombre total d'observations : 2055


Les ensembles de données météorologiques et agricoles ont été fusionnés en utilisant l'année comme clé commune. Les observations météorologiques quotidiennes ont été regroupées en indicateurs annuels, tandis que les indicateurs de production agricole ont été regroupés au niveau national. L'ensemble de données résultant permet d'analyser la variabilité climatique et sa relation avec les résultats de la production agricole au Sénégal sur la période 2005-2025.

## Base économique annuelle : PIB + températures annuelles

In [0]:
# Chargeons la base économique
df_econ = spark.read.table("economics_data")

In [0]:
# On y crée la variable Year
df_econ_year = (
    df_econ
    .withColumn("Year", F.year(F.col("Unnamed_0")))
)

In [0]:
# PIB annuel
df_pib_annual = (
    df_econ_year
    .groupBy("Year")
    .agg(
        F.first("PIB_a_prix_constant").alias("pib_constant")
    )
    .orderBy("Year")
)

In [0]:
# On vérifie
df_pib_annual.printSchema()
display(df_pib_annual)

root
 |-- Year: integer (nullable = true)
 |-- pib_constant: double (nullable = true)



Year,pib_constant
2005,6888.61
2006,7049.17
2007,7248.46
2008,7516.88
2009,7723.75
2010,7985.66
2011,8092.19
2012,8416.12
2013,8619.15
2014,9155.61


In [0]:
# On sauvegarde la base pib annuelle 
(df_pib_annual
 .write
 .mode("overwrite")
 .format("delta")
 .saveAsTable("pib_annual")
)

print("✅ Table Delta créée : pib_annual")


✅ Table Delta créée : pib_annual


In [0]:
# On merge 
df_meteo_annuel = spark.read.table("weather_senegal_clean") \
    .withColumn("Year", F.year("time")) \
    .groupBy("Year") \
    .agg(
        F.avg("tavg").alias("avg_temperature"),
        F.sum("prcp").alias("total_precipitation"),
        F.avg("pres").alias("avg_pressure")
    )

df_pib_annuel = spark.read.table("pib_annual")
df_meteo_pib = (
    df_meteo_annuel
    .join(
        df_pib_annuel,
        on="Year",
        how="left"
    )
)


In [0]:
# Vérification
df_meteo_pib.printSchema()
display(df_meteo_pib.orderBy("Year").limit(10))

root
 |-- Year: integer (nullable = true)
 |-- avg_temperature: double (nullable = true)
 |-- total_precipitation: double (nullable = true)
 |-- avg_pressure: double (nullable = true)
 |-- pib_constant: double (nullable = true)



Year,avg_temperature,total_precipitation,avg_pressure,pib_constant
2005,28.71919682259484,5224.400000000006,1011.2347672084724,6888.61
2006,28.531488203266772,3399.0999999999995,1011.2394931916094,7049.17
2007,28.17781569965869,1702.0,1011.4508161488728,7248.46
2008,27.88793407886991,3991.7000000000007,1011.3557871285636,7516.88
2009,27.9764381402679,4925.600000000004,1011.1819275281548,7723.75
2010,28.764912280701715,5289.700000000004,1011.204935334434,7985.66
2011,27.861874386653625,3783.8000000000006,1011.29236549284,8092.19
2012,27.986167146974044,6735.300000000003,1011.2694924187704,8416.12
2013,28.079271991911074,4491.799999999997,1011.2964570549656,8619.15
2014,27.59651545036165,3139.8000000000006,1011.414740059626,9155.61


In [0]:
# On sauvegarde la base
(df_meteo_pib
 .write
 .mode("overwrite")
 .format("delta")
 .saveAsTable("meteo_pib_annuel")
)

print(" Table Delta créée : meteo_pib_annuel")


 Table Delta créée : meteo_pib_annuel


## Construisons la base mensuelle. Celle-ci va utiliser les données économiques mensuelle (indices des prix...)

In [0]:
# On va d'abord créer la base annuelle. Pour cela, on va chargée nos données météorologiques qu'on va merger aux données économiques traitées dans le notebook 05. data_clean_economic
df_weather = spark.read.table("weather_senegal_clean")

df_weather.printSchema()
display(df_weather.limit(5))

root
 |-- region: string (nullable = true)
 |-- time: timestamp (nullable = true)
 |-- tavg: double (nullable = true)
 |-- tmin: double (nullable = true)
 |-- tmax: double (nullable = true)
 |-- prcp: double (nullable = true)
 |-- wspd: double (nullable = true)
 |-- pres: double (nullable = true)



region,time,tavg,tmin,tmax,prcp,wspd,pres
Dakar,2005-01-01T00:00:00.000Z,22.6,18.5,28.0,0.0,16.4,1012.2986111111122
Dakar,2005-01-02T00:00:00.000Z,21.5,18.8,24.0,0.0,17.0,1012.2986111111122
Dakar,2005-01-05T00:00:00.000Z,20.1,18.0,23.0,0.0,16.2,1012.2986111111122
Dakar,2005-01-08T00:00:00.000Z,22.1,19.9,25.0,0.0,21.2,1012.2986111111122
Dakar,2005-01-09T00:00:00.000Z,21.6,19.0,25.0,0.0,19.0,1012.2986111111122


In [0]:
# On crée les variables year et month
df_weather = df_weather.withColumn(
    "year", F.year(F.col("time"))
).withColumn(
    "month", F.month(F.col("time"))
)


In [0]:
# Agrégeons les données en données mensuelles
df_weather_monthly = (
    df_weather
    .groupBy("region", "year", "month")
    .agg(
        F.avg("tavg").alias("avg_tavg"),
        F.avg("tmin").alias("avg_tmin"),
        F.avg("tmax").alias("avg_tmax"),
        F.sum("prcp").alias("total_prcp"),
        F.avg("wspd").alias("avg_wspd"),
        F.avg("pres").alias("avg_pres")
    )
    .orderBy("region", "year", "month")
)
# Affichons les données
display(df_weather_monthly.limit(5))

region,year,month,avg_tavg,avg_tmin,avg_tmax,total_prcp,avg_wspd,avg_pres
Dakar,2005,1,21.78888888888889,18.994444444444444,26.29444444444444,0.0,16.400000000000002,1012.7439814814816
Dakar,2005,2,20.00952380952381,18.009523809523813,23.928571428571427,4.0,21.500000000000004,1013.1857142857144
Dakar,2005,3,21.892,19.068,26.128,0.0,19.22,1012.152
Dakar,2005,4,22.477272727272727,20.10454545454546,26.027272727272724,0.0,20.609090909090902,1012.2986111111128
Dakar,2005,5,25.357142857142858,23.285714285714285,28.81428571428572,0.0,12.721220784033454,1011.5046957671956


In [0]:
# On compte le nombre d'observations pour vérification
df_weather_monthly.count()

2048

2048 car on va de janvier 2005 à janvier 2025

In [0]:
# Chargeons la base économique
df_econ = spark.read.table("economics_data")
df_econ.show(5)
df_econ.printSchema()
df_econ.count()

+----------+--------------------------------------------------------------+------------------------------------+---------------------------------+-------------------------------------------+----------------------------------------+--------------------------------------+-----------------------------------------------------------------+-------------------------------------------------------------------+------------------------------------------------------------+-------------------+
| Unnamed_0|Indice_de_la_production_industrielle_des_produits_alimentaires|Indice_de_la_production_industrielle|Indice_des_prix_a_la_consommation|Indice_des_prix_de_la_fonction_alimentation|Indice_des_prix_de_la_fonction_Transport|Indice_des_prix_de_la_fonction_Loisirs|Taux_d_inflation_en_glissement_annuel_de_la_fonction_alimentation|Taux_d_inflation_en_glissement_annuel_de_la_fonction_alimentation_1|Taux_d_inflation_en_glissement_annuel_de_la_fonction_Loisirs|PIB_a_prix_constant|
+----------+----------------

240

In [0]:
# On renomme les variables pour avoir des noms plus court 
df_econ_renamed = (
    df_econ
    .withColumnRenamed("Unnamed_0", "date")
    .withColumnRenamed("Indice_de_la_production_industrielle_des_produits_alimentaires", "ipi_alim")
    .withColumnRenamed("Indice_de_la_production_industrielle", "ipi_total")
    .withColumnRenamed("Indice_des_prix_a_la_consommation", "cpi")
    .withColumnRenamed("Indice_des_prix_de_la_fonction_alimentation", "cpi_food")
    .withColumnRenamed("Indice_des_prix_de_la_fonction_Transport", "cpi_transport")
    .withColumnRenamed("Indice_des_prix_de_la_fonction_Loisirs", "cpi_entertain")
    .withColumnRenamed("Taux_d_inflation_en_glissement_annuel_de_la_fonction_alimentation", "infl_food")
    .withColumnRenamed("Taux_d_inflation_en_glissement_annuel_de_la_fonction_alimentation_1", "infl_food_lag1")
    .withColumnRenamed("Taux_d_inflation_en_glissement_annuel_de_la_fonction_Loisirs", "infl_entertain")
    .withColumnRenamed("PIB_a_prix_constant", "pib_constant")
)

df_econ_renamed.printSchema()


root
 |-- date: date (nullable = true)
 |-- ipi_alim: double (nullable = true)
 |-- ipi_total: double (nullable = true)
 |-- cpi: double (nullable = true)
 |-- cpi_food: double (nullable = true)
 |-- cpi_transport: double (nullable = true)
 |-- cpi_entertain: double (nullable = true)
 |-- infl_food: double (nullable = true)
 |-- infl_food_lag1: double (nullable = true)
 |-- infl_entertain: double (nullable = true)
 |-- pib_constant: double (nullable = true)



In [0]:
# On crée la variable mois
df_econ = df_econ_renamed.withColumn("date", F.to_timestamp("date"))

df_econ = (
    df_econ_renamed
    .withColumn("Year", F.year("date"))
    .withColumn("Month", F.month("date"))
)


In [0]:
# On visualise la base
df_econ.select("date", "Year", "Month").show(20)

+----------+----+-----+
|      date|Year|Month|
+----------+----+-----+
|2005-01-01|2005|    1|
|2005-02-01|2005|    2|
|2005-03-01|2005|    3|
|2005-04-01|2005|    4|
|2005-05-01|2005|    5|
|2005-06-01|2005|    6|
|2005-07-01|2005|    7|
|2005-08-01|2005|    8|
|2005-09-01|2005|    9|
|2005-10-01|2005|   10|
|2005-11-01|2005|   11|
|2005-12-01|2005|   12|
|2006-01-01|2006|    1|
|2006-02-01|2006|    2|
|2006-03-01|2006|    3|
|2006-04-01|2006|    4|
|2006-05-01|2006|    5|
|2006-06-01|2006|    6|
|2006-07-01|2006|    7|
|2006-08-01|2006|    8|
+----------+----+-----+
only showing top 20 rows


In [0]:
# On merge les données économiques aux données météorologiques mensuelles.
df_merged = (
    df_econ
    .join(
        df_weather_monthly,
        on=["Year", "Month"],
        how="left"
    )
)


In [0]:
# On vérifie la fusion
df_merged.show(20)
df_merged.printSchema()

+----+-----+----------+--------+---------+----+--------+-------------+-------------+---------+--------------+--------------+------------+-----------+------------------+------------------+------------------+-----------------+------------------+------------------+
|Year|Month|      date|ipi_alim|ipi_total| cpi|cpi_food|cpi_transport|cpi_entertain|infl_food|infl_food_lag1|infl_entertain|pib_constant|     region|          avg_tavg|          avg_tmin|          avg_tmax|       total_prcp|          avg_wspd|          avg_pres|
+----+-----+----------+--------+---------+----+--------+-------------+-------------+---------+--------------+--------------+------------+-----------+------------------+------------------+------------------+-----------------+------------------+------------------+
|2005|    1|2005-01-01|  116.37|   100.76|83.2|    74.1|         74.4|        101.0|      2.0|           2.0|         -0.62|     6888.61|Tambacounda| 26.94545454545455|19.336363636363636| 34.14090909090909|     

In [0]:
# On compte le nombre d'éléments
df_merged.count()

2039

La base finale va de janvier 2005 à décembre 2024

In [0]:
# Sauvegardons la base mensuelle 
(df_merged
 .write
 .mode("overwrite")
 .format("delta")
 .saveAsTable("mensual_data")
)
print("Table Delta créée : mensual_data")


✅ Table Delta créée : mensual_data


### Les bases mensuelles et annuelles seront utilisées dans les les notebook d'analyses et de visualisation