In [0]:
%sql
/* Création d'un catalogue dedidé pour le travail  */
CREATE CATALOG climat;


In [0]:
%sql
/* Création des schemas dans le catalogue dedidé */
CREATE SCHEMA climat.raw;
CREATE SCHEMA climat.silver;
CREATE SCHEMA climat.gold;

In [0]:
%sql
/* Création d'un volume pour stocker les données originales */
CREATE VOLUME climat.raw.gsod_raw;

##Étape 1 — Ingestion (RAW)
###Télécharger les fichiers GSOD (.tar.gz) depuis le site NOAA et les stocker

In [0]:
#Télécharger les données natives de NOAA entre 2000 à 2024
import requests

volume_path = "/Volumes/climat/raw/gsod_raw/"
years = range(2000, 2025)

for year in years:
    url = f"https://www.ncei.noaa.gov/data/global-summary-of-the-day/archive/{year}.tar.gz"
    dest = f"{volume_path}{year}.tar.gz"

    print(f"Téléchargement de {year}...")

    r = requests.get(url, stream=True)

    if r.status_code == 200:
        with open(dest, "wb") as f:
            for chunk in r.iter_content(1024):
                f.write(chunk)
        print(f"✔️ {year}.tar.gz téléchargé")
    else:
        print(f"❌ Impossible de télécharger {year} (code {r.status_code})")

##Étape 2:Décompression des données (RAW → BRONZE)

In [0]:

import tarfile
import os

volume_path = "/Volumes/climat/raw/gsod_raw/"

# Lister tous les fichiers .tar.gz dans le Volume
archives = [f.name for f in dbutils.fs.ls(volume_path) if f.name.endswith(".tar.gz")]

for archive_file in archives:
    archive_path = os.path.join(volume_path, archive_file)
    year = archive_file.replace(".tar.gz", "")
    extract_path = os.path.join(volume_path, year)  # chaque année dans son dossier

    print(f"⏳ Extraction de {archive_file} dans {extract_path}...")

    os.makedirs(extract_path, exist_ok=True)

    with tarfile.open(archive_path, "r:gz") as tar:
        tar.extractall(path=extract_path)

    print(f"✔️ {archive_file} extrait avec succès !")

In [0]:
volume_path = "/Volumes/climat/raw/gsod_raw/"

df = (
    spark.read
         .option("header", True)
         .option("inferSchema", True)
         .csv(f"{volume_path}*/**/*.csv")
)

In [0]:
display(dbutils.fs.ls("/Volumes/climat/raw/gsod_raw/"))

In [0]:
display(dbutils.fs.ls("/Volumes/climat/raw/gsod_raw/2020/"))

In [0]:
df_test = spark.read.option("header", True).csv("dbfs:/Volumes/climat/raw/gsod_raw/2020/*.csv")
df_test.show(5)

##Étape 3 : Sélectionner un echantillon de données à étudier et créer la partie Bronze

In [0]:
from pyspark.sql.functions import col

# Définir les années d'échantillo  et les chemins CSV
years = [2020, 2021, 2022, 2023, 2024]
paths = [f"dbfs:/Volumes/climat/raw/gsod_raw/{y}/*.csv" for y in years]

# Lire les CSV
df = (
    spark.read
         .option("header", True)
         .option("inferSchema", True)
         .csv(paths)
)

# Sélectionner seulement les colonnes essentielles
df_selected = df.select(
    col("STATION"),
    col("NAME"),
    col("DATE").cast("date"),
    col("TEMP").cast("float"),
    col("PRCP").cast("float"),
    col("LATITUDE").cast("float"),
    col("LONGITUDE").cast("float")
)

# Vérifier
df_selected.show(5)
df_selected.printSchema()


In [0]:
# Créer le schéma bronze
spark.sql("CREATE SCHEMA IF NOT EXISTS climat.bronze")

# Enregistrer la table Delta
df_selected.write.format("delta") \
           .mode("overwrite") \
           .option("overwriteSchema", "true") \
           .saveAsTable("climat.bronze.gsod_bronze_2020_2024")

In [0]:
# Nombre total de lignes
spark.sql("SELECT COUNT(*) FROM climat.bronze.gsod_bronze_2020_2024").show()

# Années disponibles dans la table
spark.sql("""
SELECT DISTINCT YEAR(DATE) AS year 
FROM climat.bronze.gsod_bronze_2020_2024 
ORDER BY year
""").show()

##Analyse préliminaire 
###Présentation + Valeurs nulles + Statistiques descriptives

In [0]:
#Charger la table dans un DataFrame PySpark
df = spark.table("climat.bronze.gsod_bronze_2020_2024")

In [0]:
df.show(5)
df.printSchema()
print("Nombre de lignes :", df.count())

In [0]:
#Afficher les valeurs nulles par colonne
from pyspark.sql.functions import col, sum
null_counts = df.select([
    sum(col(c).isNull().cast("int")).alias(c) for c in df.columns
])

null_counts.show()

In [0]:
#Description statistique des colonnes numériques
df.describe(["TEMP", "PRCP", "LATITUDE", "LONGITUDE"]).show()

## ÉTAPE 5: Création de la couche SILVER

###Définir les règles de qualité
-Règles météo réalistes GSOD

-TEMP ∈ [-80°C ; 60°C]

-PRCP ≥ 0 et PRCP ≤ 500 mm/jour

-LATITUDE ∈ [-90 ; 90]

-LONGITUDE ∈ [-180 ; 180]

In [0]:
#filtre uniquement ce qui est clairement impossible
from pyspark.sql.functions import col

df_filtered = df.filter(
    (col("TEMP") > -80) & (col("TEMP") < 60) &
    (col("PRCP") >= 0) & (col("PRCP") <= 500) &
    (col("LATITUDE").between(-90, 90)) &
    (col("LONGITUDE").between(-180, 180))
)

In [0]:
#Retirer uniquement les lignes sans info essentielle
df_clean = df_filtered.dropna(subset=["STATION", "DATE", "TEMP", "LATITUDE", "LONGITUDE"])


In [0]:
#Ajouter une colonne YEAR (pour faciliter la partie EDA)
from pyspark.sql.functions import year

df_clean = df_clean.withColumn("YEAR", year(col("DATE")))

In [0]:
from pyspark.sql.functions import regexp_replace, col

# Nettoyer la colonne NAME dans df_clean
df_clean = df_clean.withColumn(
    "NAME",
    regexp_replace(col("NAME"), r"(, NO$|, TX US$)", "")
)

# Vérifier le résultat
df_clean.select("STATION", "NAME").show(5, truncate=False)

# Sauvegarde en TABLE DELTA (Silver)
df_clean.write.format("delta").mode("overwrite").saveAsTable(
    "climat.silver.gsod_silver_2020_2024"
)

In [0]:
#Vérification immédiate
spark.sql("SELECT * FROM climat.silver.gsod_silver_2020_2024 LIMIT 5").show()


In [0]:
#Comptage Bronze vs Silver :
print("Bronze :", df.count())
print("Silver :", df_clean.count())

##Étape 6 : EDA (Analyse Exploratoire des Données)

In [0]:
#charger la table Silver
df_silver = spark.table("climat.silver.gsod_silver_2020_2024")
df_silver.show(5)

##Température moyenne par année
Détecter une tendance climatique

In [0]:
from pyspark.sql.functions import avg

temp_year = (
    df_silver.groupBy("YEAR")
             .agg(avg("TEMP").alias("avg_temp"))
             .orderBy("YEAR")
)

temp_year.show()  

In [0]:
pip install matplotlib

In [0]:
#Import de la librairie matplotlib
from matplotlib import pyplot as plt

In [0]:
# Convertir en Pandas pour tracer le graphique
temp_year_pd = temp_year.toPandas()

# Tracer la température moyenne par année
plt.figure(figsize=(10,5))
plt.plot(temp_year_pd["YEAR"], temp_year_pd["avg_temp"], marker='o', linestyle='-')
plt.title("Température moyenne par année (2020-2024)")
plt.xlabel("Année")
plt.ylabel("Température moyenne (°C)")
plt.grid(True)
plt.show()

##Histogramme de distribution de la température

In [0]:
import pandas as pd
import matplotlib.pyplot as plt

pdf = df_silver.select("TEMP").toPandas()

plt.hist(pdf["TEMP"], bins=50)
plt.title("Distribution des températures")
plt.xlabel("Température (°C)")
plt.ylabel("Fréquence")
plt.show()

##Détection des anomalies (z-score)
_Objectif : identifier les températures anormales ou extrêmes_

In [0]:
from pyspark.sql.functions import mean, stddev, col

stats = df_silver.select(
    mean("TEMP").alias("mean_temp"),
    stddev("TEMP").alias("std_temp")
).collect()[0]

mean_temp = stats["mean_temp"]
std_temp = stats["std_temp"]

df_anomalies = df_silver.filter(
    (col("TEMP") > mean_temp + 3*std_temp) |
    (col("TEMP") < mean_temp - 3*std_temp)
)

df_anomalies.show()

### Analyse des tendances temporelles
_Température moyenne par mois/année_

In [0]:
from pyspark.sql.functions import month, year, avg
df_silver = spark.table("climat.silver.gsod_silver_2020_2024")

temp_month = (
    df_silver
        .groupBy(year("DATE").alias("YEAR"), month("DATE").alias("MONTH"))
        .agg(avg("TEMP").alias("avg_temp"))
        .orderBy("YEAR", "MONTH")
)

temp_month.show()

### Tracer l’évolution mensuelle avec matplotlib

In [0]:
pdf_temp_month = temp_month.toPandas()

import matplotlib.pyplot as plt

plt.figure(figsize=(12,6))

for year_val in pdf_temp_month['YEAR'].unique():
    df_year = pdf_temp_month[pdf_temp_month['YEAR'] == year_val]
    plt.plot(df_year['MONTH'], df_year['avg_temp'], marker='o', label=str(year_val))

plt.title("Évolution mensuelle de la température moyenne")
plt.xlabel("Mois")
plt.ylabel("Température moyenne (°C)")
plt.xticks(range(1,13))
plt.legend(title="Année")
plt.grid(True)
plt.show()

##Étape 7 : Création de la couche GOLD et Agrégations

In [0]:
#Charger la table Silver
df_silver = spark.table("climat.silver.gsod_silver_2020_2024")

In [0]:
#Ajouter des colonnes utiles pour Gold
from pyspark.sql.functions import month, when, col

df_gold = df_silver.withColumn("MONTH", month(col("DATE"))) \
    .withColumn(
        "TEMP_ANOMALY",
        when((col("TEMP") > 50) | (col("TEMP") < -20), 1).otherwise(0)
    ) \
    .withColumn(
        "PRCP_ANOMALY",
        when(col("PRCP") > 100, 1).otherwise(0)
    )

In [0]:
#Conversion en float
from pyspark.sql.functions import col

df_gold = df_gold \
    .withColumn("TEMP", col("TEMP").cast("float")) \
    .withColumn("PRCP", col("PRCP").cast("float")) \
    .withColumn("TEMP_ANOMALY", col("TEMP_ANOMALY").cast("int")) \
    .withColumn("PRCP_ANOMALY", col("PRCP_ANOMALY").cast("int"))

In [0]:
#Température moyenne annuelle par station
from pyspark.sql.functions import avg, sum

temp_year_station = df_gold.groupBy("YEAR", "STATION", "NAME","LATITUDE", "LONGITUDE") \
    .agg(
        avg("TEMP").alias("avg_temp"),
        avg("PRCP").alias("avg_prcp"),
        sum("TEMP_ANOMALY").alias("nb_temp_anomalies"),
        sum("PRCP_ANOMALY").alias("nb_prcp_anomalies")
    ) \
    .orderBy("YEAR", "STATION")

temp_year_station.show(5)

In [0]:
#Température et précipitations mensuelles (moyenne par mois)
from pyspark.sql.functions import month, when, col

df_gold = df_silver.withColumn("MONTH", month(col("DATE"))) \
    .withColumn(
        "TEMP_ANOMALY",
        when((col("TEMP") > 50) | (col("TEMP") < -20), 1).otherwise(0)
    ) \
    .withColumn(
        "PRCP_ANOMALY",
        when(col("PRCP") > 100, 1).otherwise(0)
    )

In [0]:
#Température et précipitations mensuelles (moyenne par mois)
temp_year_station = df_gold.groupBy("YEAR", "STATION", "NAME","LATITUDE", "LONGITUDE") \
    .agg(
        avg("TEMP").alias("avg_temp"),
        avg("PRCP").alias("avg_prcp"),
        sum("TEMP_ANOMALY").alias("nb_temp_anomalies"),
        sum("PRCP_ANOMALY").alias("nb_prcp_anomalies")
    ) \
    .orderBy("YEAR", "STATION")

In [0]:
temp_month_station = df_gold.groupBy("YEAR", "MONTH", "STATION", "NAME","LATITUDE", "LONGITUDE") \
    .agg(
        avg("TEMP").alias("avg_temp"),
        avg("PRCP").alias("avg_prcp")
    ) \
    .orderBy("YEAR", "MONTH", "STATION")

###Sauvegarde de la GOLD en table Delta

In [0]:
# GOLD annuelle par station
temp_year_station.write.format("delta").mode("overwrite").saveAsTable(
    "climat.gold.gsod_gold_yearly2"
)

# GOLD mensuelle par station
temp_month_station.write.format("delta").mode("overwrite").saveAsTable(
    "climat.gold.gsod_gold_monthly2"
)

####Vérification rapide

In [0]:
#Vérification rapide
spark.sql("SELECT * FROM climat.gold.gsod_gold_yearly2 LIMIT 5").show()
spark.sql("SELECT * FROM climat.gold.gsod_gold_monthly2 LIMIT 5").show()

###ÉTAPE 8:Création du Dashboard et visualisations interactives

In [0]:
df_yearly = spark.table("climat.gold.gsod_gold_yearly")
df_monthly = spark.table("climat.gold.gsod_gold_monthly")