# Apache Spark : Analyse des donn√©es climatiques mondiales

## Objectif
Analyser les tendances climatiques mondiales √† l'aide de Spark, y compris le nettoyage des donn√©es, l'EDA et l'extraction d'informations.

### Jeu de donn√©es
**Global Surface Summary of the Day (GSOD)** provenant de NOAA.
Source: https://www.ncei.noaa.gov/data/global-summary-of-the-day/archive/

---
## 1. T√©l√©chargement des donn√©es

Les donn√©es GSOD sont disponibles par ann√©e. Chaque archive tar.gz contient des fichiers CSV pour toutes les stations.

In [None]:
import os
import tarfile
import requests
from pathlib import Path

# Cr√©er le r√©pertoire pour les donn√©es
data_dir = Path('/home/jovyan/work/data')
data_dir.mkdir(exist_ok=True)

# Liste des ann√©es √† t√©l√©charger (ajustez selon vos besoins)
years = [2019, 2020, 2021, 2022, 2023]

def download_year(year):
    """T√©l√©charge et extrait les donn√©es pour une ann√©e donn√©e."""
    url = f"https://www.ncei.noaa.gov/data/global-summary-of-the-day/archive/{year}.tar.gz"
    tar_path = data_dir / f"{year}.tar.gz"
    extract_dir = data_dir / str(year)
    
    # T√©l√©charger si pas d√©j√† fait
    if not tar_path.exists():
        print(f"T√©l√©chargement de {year}...")
        response = requests.get(url, stream=True)
        with open(tar_path, 'wb') as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        print(f"‚úì {year} t√©l√©charg√©")
    
    # Extraire
    if not extract_dir.exists():
        print(f"Extraction de {year}...")
        with tarfile.open(tar_path, 'r:gz') as tar:
            tar.extractall(path=extract_dir)
        print(f"‚úì {year} extrait")
    
    return extract_dir

# T√©l√©charger les donn√©es (d√©commentez pour ex√©cuter)
# for year in years:
#     download_year(year)

print("\n‚ö†Ô∏è NOTE: Si le t√©l√©chargement ne fonctionne pas dans le notebook, vous pouvez:")
print("1. T√©l√©charger manuellement depuis: https://www.ncei.noaa.gov/data/global-summary-of-the-day/archive/")
print("2. Placer les fichiers .tar.gz dans /home/jovyan/work/data/")
print("3. Extraire avec: tar -xzf ANN√âE.tar.gz -C /home/jovyan/work/data/ANN√âE/")

---
## 2. Initialisation de Spark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

# Cr√©er la session Spark en mode local
spark = SparkSession.builder \
    .appName("Analyse Climatique GSOD") \
    .master("local[*]") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .getOrCreate()

# Configurer le niveau de log
spark.sparkContext.setLogLevel("WARN")

print(f"Spark version: {spark.version}")
print(f"‚úì Session Spark initialis√©e")

---
## 3. Chargement et exploration des donn√©es

Les fichiers GSOD contiennent les colonnes suivantes:
- STATION: Identifiant de la station
- DATE: Date de l'observation
- LATITUDE, LONGITUDE: Coordonn√©es
- ELEVATION: Altitude
- NAME: Nom de la station
- TEMP: Temp√©rature moyenne (¬∞F)
- TEMP_ATTRIBUTES: Attributs temp√©rature
- DEWP: Point de ros√©e
- SLP: Pression au niveau de la mer
- STP: Pression √† la station
- VISIB: Visibilit√©
- WDSP: Vitesse du vent
- MXSPD: Vitesse maximale du vent
- GUST: Rafale
- MAX: Temp√©rature maximale
- MIN: Temp√©rature minimale
- PRCP: Pr√©cipitations
- SNDP: Profondeur de neige
- FRSHTT: Indicateurs m√©t√©o (brouillard, pluie, neige, gr√™le, orage, tornade)

In [None]:
# D√©finir le sch√©ma pour les donn√©es GSOD
gsod_schema = StructType([
    StructField("STATION", StringType(), True),
    StructField("DATE", StringType(), True),
    StructField("LATITUDE", DoubleType(), True),
    StructField("LONGITUDE", DoubleType(), True),
    StructField("ELEVATION", DoubleType(), True),
    StructField("NAME", StringType(), True),
    StructField("TEMP", DoubleType(), True),
    StructField("TEMP_ATTRIBUTES", IntegerType(), True),
    StructField("DEWP", DoubleType(), True),
    StructField("DEWP_ATTRIBUTES", IntegerType(), True),
    StructField("SLP", DoubleType(), True),
    StructField("SLP_ATTRIBUTES", IntegerType(), True),
    StructField("STP", DoubleType(), True),
    StructField("STP_ATTRIBUTES", IntegerType(), True),
    StructField("VISIB", DoubleType(), True),
    StructField("VISIB_ATTRIBUTES", IntegerType(), True),
    StructField("WDSP", DoubleType(), True),
    StructField("WDSP_ATTRIBUTES", IntegerType(), True),
    StructField("MXSPD", DoubleType(), True),
    StructField("GUST", DoubleType(), True),
    StructField("MAX", DoubleType(), True),
    StructField("MAX_ATTRIBUTES", StringType(), True),
    StructField("MIN", DoubleType(), True),
    StructField("MIN_ATTRIBUTES", StringType(), True),
    StructField("PRCP", DoubleType(), True),
    StructField("PRCP_ATTRIBUTES", StringType(), True),
    StructField("SNDP", DoubleType(), True),
    StructField("FRSHTT", StringType(), True)
])

In [None]:
# Charger tous les fichiers CSV
# Ajustez le chemin selon votre structure de r√©pertoires
data_path = "/home/jovyan/work/data/*/*.csv"

df = spark.read.csv(
    data_path,
    header=True,
    schema=gsod_schema
)

print(f"Nombre total d'enregistrements: {df.count():,}")
print(f"Nombre de partitions: {df.rdd.getNumPartitions()}")

# Afficher le sch√©ma
df.printSchema()

In [None]:
# Afficher quelques enregistrements
df.show(5, truncate=False)

In [None]:
# Statistiques descriptives
df.select("TEMP", "PRCP", "MAX", "MIN").describe().show()

---
## 4. Nettoyage des donn√©es

Les valeurs manquantes ou invalides dans GSOD sont souvent cod√©es comme 9999.9

In [None]:
# Remplacer les valeurs 9999.9 par null et cr√©er l'ann√©e
df_clean = df \
    .withColumn("TEMP", when(col("TEMP") == 9999.9, None).otherwise(col("TEMP"))) \
    .withColumn("PRCP", when(col("PRCP") == 99.99, None).otherwise(col("PRCP"))) \
    .withColumn("MAX", when(col("MAX") == 9999.9, None).otherwise(col("MAX"))) \
    .withColumn("MIN", when(col("MIN") == 9999.9, None).otherwise(col("MIN"))) \
    .withColumn("year", year(col("DATE"))) \
    .withColumn("month", month(col("DATE"))) \
    .withColumn("TEMP_C", (col("TEMP") - 32) * 5.0 / 9.0) \
    .withColumn("MAX_C", (col("MAX") - 32) * 5.0 / 9.0) \
    .withColumn("MIN_C", (col("MIN") - 32) * 5.0 / 9.0) \
    .withColumn("PRCP_MM", col("PRCP") * 25.4)

# Filtrer les enregistrements avec des temp√©ratures valides
df_clean = df_clean.filter(col("TEMP").isNotNull())

print(f"Enregistrements apr√®s nettoyage: {df_clean.count():,}")

# Mettre en cache pour optimiser les requ√™tes suivantes
df_clean.cache()
df_clean.count()  # Trigger cache

print("‚úì Donn√©es nettoy√©es et mises en cache")

In [None]:
# V√©rifier les valeurs manquantes
from pyspark.sql.functions import count, when, isnan, col

df_clean.select(
    [count(when(col(c).isNull(), c)).alias(c) for c in ["TEMP_C", "PRCP_MM", "MAX_C", "MIN_C"]]
).show()

---
## 5. Analyse avec Spark SQL

Cr√©ons une vue temporaire pour utiliser SQL

In [None]:
# Cr√©er une vue temporaire
df_clean.createOrReplaceTempView("climate_data")

print("‚úì Vue 'climate_data' cr√©√©e")

### Question 1: Quelle a √©t√© l'ann√©e la plus froide enregistr√©e, et quelle √©tait la temp√©rature moyenne ?

In [None]:
# Requ√™te SQL pour trouver l'ann√©e la plus froide
coldest_year_sql = """
SELECT 
    year,
    ROUND(AVG(TEMP_C), 2) as avg_temperature_c,
    COUNT(*) as num_records
FROM climate_data
WHERE TEMP_C IS NOT NULL
GROUP BY year
ORDER BY avg_temperature_c ASC
LIMIT 1
"""

coldest_year = spark.sql(coldest_year_sql)
coldest_year.show()

# Avec DataFrame API
coldest_year_df = df_clean \
    .groupBy("year") \
    .agg(
        round(avg("TEMP_C"), 2).alias("avg_temperature_c"),
        count("*").alias("num_records")
    ) \
    .orderBy("avg_temperature_c") \
    .limit(1)

print("\nAvec DataFrame API:")
coldest_year_df.show()

### Question 2: Quelle station a contribu√© avec le plus grand nombre d'enregistrements ?

In [None]:
# Requ√™te SQL pour trouver la station la plus active
top_station_sql = """
SELECT 
    STATION,
    NAME,
    LATITUDE,
    LONGITUDE,
    COUNT(*) as num_records
FROM climate_data
GROUP BY STATION, NAME, LATITUDE, LONGITUDE
ORDER BY num_records DESC
LIMIT 10
"""

top_stations = spark.sql(top_station_sql)
top_stations.show(10, truncate=False)

# Avec DataFrame API
top_stations_df = df_clean \
    .groupBy("STATION", "NAME", "LATITUDE", "LONGITUDE") \
    .agg(count("*").alias("num_records")) \
    .orderBy(col("num_records").desc()) \
    .limit(10)

print("\nAvec DataFrame API:")
top_stations_df.show(10, truncate=False)

### Analyses suppl√©mentaires

In [None]:
# Tendances annuelles de temp√©rature
yearly_stats = spark.sql("""
SELECT 
    year,
    ROUND(AVG(TEMP_C), 2) as avg_temp_c,
    ROUND(AVG(MAX_C), 2) as avg_max_c,
    ROUND(AVG(MIN_C), 2) as avg_min_c,
    ROUND(AVG(PRCP_MM), 2) as avg_prcp_mm,
    COUNT(*) as num_records
FROM climate_data
WHERE TEMP_C IS NOT NULL
GROUP BY year
ORDER BY year
""")

yearly_stats.show()

In [None]:
# Patterns mensuels
monthly_patterns = spark.sql("""
SELECT 
    month,
    ROUND(AVG(TEMP_C), 2) as avg_temp_c,
    ROUND(AVG(PRCP_MM), 2) as avg_prcp_mm,
    COUNT(*) as num_records
FROM climate_data
WHERE TEMP_C IS NOT NULL
GROUP BY month
ORDER BY month
""")

monthly_patterns.show(12)

---
## 6. Visualisations

### Question 1: Graphe des temp√©ratures moyennes mondiales

In [None]:
import matplotlib.pyplot as plt
import seaborn as sns
import pandas as pd

# Configuration du style
sns.set_style("whitegrid")
plt.rcParams['figure.figsize'] = (14, 6)

# Convertir en Pandas pour la visualisation
yearly_stats_pd = yearly_stats.toPandas()

# Graphique des temp√©ratures
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 6))

# Temp√©rature moyenne par ann√©e
ax1.plot(yearly_stats_pd['year'], yearly_stats_pd['avg_temp_c'], 
         marker='o', linewidth=2, markersize=8, color='#e74c3c')
ax1.fill_between(yearly_stats_pd['year'], 
                  yearly_stats_pd['avg_min_c'], 
                  yearly_stats_pd['avg_max_c'], 
                  alpha=0.3, color='#e74c3c')
ax1.set_xlabel('Ann√©e', fontsize=12)
ax1.set_ylabel('Temp√©rature (¬∞C)', fontsize=12)
ax1.set_title('√âvolution des temp√©ratures moyennes mondiales', fontsize=14, fontweight='bold')
ax1.grid(True, alpha=0.3)

# Temp√©rature par mois (pattern saisonnier)
monthly_patterns_pd = monthly_patterns.toPandas()
months = ['Jan', 'F√©v', 'Mar', 'Avr', 'Mai', 'Jun', 'Jul', 'Ao√ª', 'Sep', 'Oct', 'Nov', 'D√©c']
ax2.bar(monthly_patterns_pd['month'], monthly_patterns_pd['avg_temp_c'], 
        color='#3498db', alpha=0.7)
ax2.set_xlabel('Mois', fontsize=12)
ax2.set_ylabel('Temp√©rature moyenne (¬∞C)', fontsize=12)
ax2.set_title('Pattern saisonnier des temp√©ratures', fontsize=14, fontweight='bold')
ax2.set_xticks(range(1, 13))
ax2.set_xticklabels(months)
ax2.grid(True, alpha=0.3, axis='y')

plt.tight_layout()
plt.savefig('/home/jovyan/work/temperature_trends.png', dpi=300, bbox_inches='tight')
plt.show()

print("‚úì Graphique des temp√©ratures sauvegard√©: temperature_trends.png")

### Question 2: Graphe des pr√©cipitations mondiales

In [None]:
# Graphique des pr√©cipitations
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 6))

# Pr√©cipitations par ann√©e
ax1.plot(yearly_stats_pd['year'], yearly_stats_pd['avg_prcp_mm'], 
         marker='s', linewidth=2, markersize=8, color='#2ecc71')
ax1.set_xlabel('Ann√©e', fontsize=12)
ax1.set_ylabel('Pr√©cipitations moyennes (mm)', fontsize=12)
ax1.set_title('√âvolution des pr√©cipitations moyennes mondiales', fontsize=14, fontweight='bold')
ax1.grid(True, alpha=0.3)

# Pr√©cipitations par mois
ax2.bar(monthly_patterns_pd['month'], monthly_patterns_pd['avg_prcp_mm'], 
        color='#1abc9c', alpha=0.7)
ax2.set_xlabel('Mois', fontsize=12)
ax2.set_ylabel('Pr√©cipitations moyennes (mm)', fontsize=12)
ax2.set_title('Pattern saisonnier des pr√©cipitations', fontsize=14, fontweight='bold')
ax2.set_xticks(range(1, 13))
ax2.set_xticklabels(months)
ax2.grid(True, alpha=0.3, axis='y')

plt.tight_layout()
plt.savefig('/home/jovyan/work/precipitation_trends.png', dpi=300, bbox_inches='tight')
plt.show()

print("‚úì Graphique des pr√©cipitations sauvegard√©: precipitation_trends.png")

### Question 3: Analyses suppl√©mentaires - R√©chauffement climatique

In [None]:
# Calculer la tendance du r√©chauffement
from scipy import stats

# R√©gression lin√©aire pour la tendance
slope, intercept, r_value, p_value, std_err = stats.linregress(
    yearly_stats_pd['year'], 
    yearly_stats_pd['avg_temp_c']
)

# Cr√©er la ligne de tendance
trend_line = slope * yearly_stats_pd['year'] + intercept

# Visualisation de la tendance
plt.figure(figsize=(14, 8))

plt.subplot(2, 1, 1)
plt.plot(yearly_stats_pd['year'], yearly_stats_pd['avg_temp_c'], 
         marker='o', linewidth=2, markersize=8, label='Temp√©rature observ√©e', color='#e74c3c')
plt.plot(yearly_stats_pd['year'], trend_line, 
         '--', linewidth=2, label=f'Tendance ({slope:.4f}¬∞C/an)', color='#c0392b')
plt.xlabel('Ann√©e', fontsize=12)
plt.ylabel('Temp√©rature (¬∞C)', fontsize=12)
plt.title('Tendance du r√©chauffement climatique', fontsize=14, fontweight='bold')
plt.legend(fontsize=11)
plt.grid(True, alpha=0.3)

# Anomalie par rapport √† la moyenne
plt.subplot(2, 1, 2)
mean_temp = yearly_stats_pd['avg_temp_c'].mean()
anomaly = yearly_stats_pd['avg_temp_c'] - mean_temp
colors = ['#e74c3c' if x > 0 else '#3498db' for x in anomaly]
plt.bar(yearly_stats_pd['year'], anomaly, color=colors, alpha=0.7)
plt.axhline(y=0, color='black', linestyle='-', linewidth=0.8)
plt.xlabel('Ann√©e', fontsize=12)
plt.ylabel('Anomalie de temp√©rature (¬∞C)', fontsize=12)
plt.title(f'Anomalie de temp√©rature (R√©f√©rence: {mean_temp:.2f}¬∞C)', fontsize=14, fontweight='bold')
plt.grid(True, alpha=0.3, axis='y')

plt.tight_layout()
plt.savefig('/home/jovyan/work/warming_trend.png', dpi=300, bbox_inches='tight')
plt.show()

print(f"\nüìä Statistiques de tendance:")
print(f"   Taux de r√©chauffement: {slope:.4f}¬∞C par an")
print(f"   R¬≤: {r_value**2:.4f}")
print(f"   P-value: {p_value:.4e}")
print(f"   Augmentation totale sur la p√©riode: {slope * len(yearly_stats_pd):.2f}¬∞C")

if p_value < 0.05:
    print("   ‚úì La tendance est statistiquement significative (p < 0.05)")
else:
    print("   ‚ö† La tendance n'est pas statistiquement significative")

### Question 4: Augmentation de temp√©rature par pays

In [None]:
# Extraire le pays du nom de la station (simplifi√© - bas√© sur les deux derni√®res lettres)
# Note: Une approche plus robuste utiliserait un dataset de g√©olocalisation

# Calculer la temp√©rature moyenne globale sur toute la p√©riode
global_avg_temp = df_clean.select(avg("TEMP_C")).collect()[0][0]

print(f"Temp√©rature moyenne mondiale: {global_avg_temp:.2f}¬∞C")

# Analyser par r√©gion g√©ographique (bas√© sur la latitude)
df_regions = df_clean.withColumn(
    "region",
    when(col("LATITUDE") >= 60, "Arctique/Subarctique")
    .when((col("LATITUDE") >= 30) & (col("LATITUDE") < 60), "Zone temp√©r√©e Nord")
    .when((col("LATITUDE") >= 0) & (col("LATITUDE") < 30), "Zone tropicale Nord")
    .when((col("LATITUDE") >= -30) & (col("LATITUDE") < 0), "Zone tropicale Sud")
    .when((col("LATITUDE") >= -60) & (col("LATITUDE") < -30), "Zone temp√©r√©e Sud")
    .otherwise("Antarctique/Subantarctique")
)

# Calculer l'augmentation par r√©gion
regional_temps = df_regions.groupBy("region", "year") \
    .agg(avg("TEMP_C").alias("avg_temp"))

# Calculer la moyenne par r√©gion
regional_avg = df_regions.groupBy("region") \
    .agg(
        avg("TEMP_C").alias("avg_temp_c"),
        count("*").alias("num_records")
    ) \
    .withColumn(
        "pct_difference_from_global",
        ((col("avg_temp_c") - global_avg_temp) / global_avg_temp * 100)
    ) \
    .orderBy(col("pct_difference_from_global").desc())

regional_avg_pd = regional_avg.toPandas()
regional_avg_pd

In [None]:
# Visualisation par r√©gion
fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(16, 6))

# Temp√©rature moyenne par r√©gion
colors_regional = plt.cm.RdYlBu_r(np.linspace(0, 1, len(regional_avg_pd)))
ax1.barh(regional_avg_pd['region'], regional_avg_pd['avg_temp_c'], color=colors_regional)
ax1.axvline(x=global_avg_temp, color='red', linestyle='--', linewidth=2, label=f'Moyenne mondiale: {global_avg_temp:.2f}¬∞C')
ax1.set_xlabel('Temp√©rature moyenne (¬∞C)', fontsize=12)
ax1.set_title('Temp√©rature moyenne par r√©gion', fontsize=14, fontweight='bold')
ax1.legend(fontsize=10)
ax1.grid(True, alpha=0.3, axis='x')

# Pourcentage de diff√©rence par rapport √† la moyenne mondiale
colors_pct = ['#e74c3c' if x > 0 else '#3498db' for x in regional_avg_pd['pct_difference_from_global']]
ax2.barh(regional_avg_pd['region'], regional_avg_pd['pct_difference_from_global'], color=colors_pct, alpha=0.7)
ax2.axvline(x=0, color='black', linestyle='-', linewidth=0.8)
ax2.set_xlabel('Diff√©rence par rapport √† la moyenne mondiale (%)', fontsize=12)
ax2.set_title('√âcart de temp√©rature par r√©gion', fontsize=14, fontweight='bold')
ax2.grid(True, alpha=0.3, axis='x')

# Ajouter les valeurs sur les barres
for i, (idx, row) in enumerate(regional_avg_pd.iterrows()):
    ax2.text(row['pct_difference_from_global'], i, f" {row['pct_difference_from_global']:.1f}%", 
             va='center', fontsize=10)

plt.tight_layout()
plt.savefig('/home/jovyan/work/regional_analysis.png', dpi=300, bbox_inches='tight')
plt.show()

print("‚úì Graphique r√©gional sauvegard√©: regional_analysis.png")

---
## 7. Conclusions et analyses suppl√©mentaires recommand√©es

### Analyses recommand√©es:

1. **Analyse temporelle avanc√©e:**
   - D√©composition saisonni√®re (trend, saisonnalit√©, r√©sidus)
   - D√©tection de points de rupture dans les s√©ries temporelles
   - Pr√©visions avec mod√®les ARIMA/Prophet

2. **Analyse spatiale:**
   - Cartographie des hotspots de r√©chauffement
   - Analyse de corr√©lation spatiale (Moran's I)
   - Identification des zones √† risque √©lev√©

3. **Analyse des √©v√©nements extr√™mes:**
   - Fr√©quence et intensit√© des vagues de chaleur
   - √âv√©nements de pr√©cipitations extr√™mes
   - P√©riodes de s√©cheresse prolong√©e

4. **Corr√©lations avec d'autres facteurs:**
   - Altitude et temp√©rature
   - Proximit√© oc√©anique et variabilit√© climatique
   - Latitude et amplitude thermique

5. **Machine Learning:**
   - Pr√©diction des temp√©ratures futures
   - Classification des r√©gimes climatiques
   - D√©tection d'anomalies

### Isolation des effets du r√©chauffement climatique:

Pour isoler les effets du r√©chauffement:
- Comparer les donn√©es r√©centes (2020-2023) avec des p√©riodes de r√©f√©rence (1980-2000)
- Analyser les tendances √† long terme (plusieurs d√©cennies)
- Utiliser des mod√®les de r√©gression pour contr√¥ler les variations naturelles
- Comparer avec les mod√®les climatiques du GIEC
- Analyser les indicateurs secondaires (fonte des glaces, √©l√©vation du niveau de la mer, etc.)

In [None]:
# Analyse des √©v√©nements extr√™mes
extreme_events = spark.sql("""
SELECT 
    year,
    COUNT(CASE WHEN MAX_C > 40 THEN 1 END) as days_above_40c,
    COUNT(CASE WHEN MIN_C < -20 THEN 1 END) as days_below_minus20c,
    COUNT(CASE WHEN PRCP_MM > 100 THEN 1 END) as heavy_rainfall_days,
    MAX(MAX_C) as max_temp_recorded,
    MIN(MIN_C) as min_temp_recorded
FROM climate_data
WHERE MAX_C IS NOT NULL AND MIN_C IS NOT NULL
GROUP BY year
ORDER BY year
""")

extreme_events.show()

In [None]:
# Visualisation des √©v√©nements extr√™mes
extreme_pd = extreme_events.toPandas()

fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(16, 12))

# Jours > 40¬∞C
ax1.bar(extreme_pd['year'], extreme_pd['days_above_40c'], color='#e74c3c', alpha=0.7)
ax1.set_title('Jours avec temp√©ratures > 40¬∞C', fontsize=12, fontweight='bold')
ax1.set_ylabel('Nombre de jours', fontsize=11)
ax1.grid(True, alpha=0.3, axis='y')

# Jours < -20¬∞C
ax2.bar(extreme_pd['year'], extreme_pd['days_below_minus20c'], color='#3498db', alpha=0.7)
ax2.set_title('Jours avec temp√©ratures < -20¬∞C', fontsize=12, fontweight='bold')
ax2.set_ylabel('Nombre de jours', fontsize=11)
ax2.grid(True, alpha=0.3, axis='y')

# Jours de fortes pr√©cipitations
ax3.bar(extreme_pd['year'], extreme_pd['heavy_rainfall_days'], color='#2ecc71', alpha=0.7)
ax3.set_title('Jours avec pr√©cipitations > 100mm', fontsize=12, fontweight='bold')
ax3.set_ylabel('Nombre de jours', fontsize=11)
ax3.set_xlabel('Ann√©e', fontsize=11)
ax3.grid(True, alpha=0.3, axis='y')

# Temp√©ratures record
ax4.plot(extreme_pd['year'], extreme_pd['max_temp_recorded'], 
         marker='o', label='Max record', color='#e74c3c', linewidth=2)
ax4.plot(extreme_pd['year'], extreme_pd['min_temp_recorded'], 
         marker='s', label='Min record', color='#3498db', linewidth=2)
ax4.set_title('Temp√©ratures record par ann√©e', fontsize=12, fontweight='bold')
ax4.set_ylabel('Temp√©rature (¬∞C)', fontsize=11)
ax4.set_xlabel('Ann√©e', fontsize=11)
ax4.legend(fontsize=10)
ax4.grid(True, alpha=0.3)

plt.tight_layout()
plt.savefig('/home/jovyan/work/extreme_events.png', dpi=300, bbox_inches='tight')
plt.show()

print("‚úì Graphique des √©v√©nements extr√™mes sauvegard√©: extreme_events.png")

---
## 8. R√©sum√© des requ√™tes SQL utilis√©es

In [None]:
print("""
=== REQU√äTES SQL PRINCIPALES ===

1. ANN√âE LA PLUS FROIDE:
SELECT year, ROUND(AVG(TEMP_C), 2) as avg_temperature_c, COUNT(*) as num_records
FROM climate_data
WHERE TEMP_C IS NOT NULL
GROUP BY year
ORDER BY avg_temperature_c ASC
LIMIT 1

2. STATION LA PLUS ACTIVE:
SELECT STATION, NAME, LATITUDE, LONGITUDE, COUNT(*) as num_records
FROM climate_data
GROUP BY STATION, NAME, LATITUDE, LONGITUDE
ORDER BY num_records DESC
LIMIT 10

3. TENDANCES ANNUELLES:
SELECT year, 
       ROUND(AVG(TEMP_C), 2) as avg_temp_c,
       ROUND(AVG(MAX_C), 2) as avg_max_c,
       ROUND(AVG(MIN_C), 2) as avg_min_c,
       ROUND(AVG(PRCP_MM), 2) as avg_prcp_mm,
       COUNT(*) as num_records
FROM climate_data
WHERE TEMP_C IS NOT NULL
GROUP BY year
ORDER BY year

4. PATTERNS MENSUELS:
SELECT month,
       ROUND(AVG(TEMP_C), 2) as avg_temp_c,
       ROUND(AVG(PRCP_MM), 2) as avg_prcp_mm,
       COUNT(*) as num_records
FROM climate_data
WHERE TEMP_C IS NOT NULL
GROUP BY month
ORDER BY month

5. √âV√âNEMENTS EXTR√äMES:
SELECT year,
       COUNT(CASE WHEN MAX_C > 40 THEN 1 END) as days_above_40c,
       COUNT(CASE WHEN MIN_C < -20 THEN 1 END) as days_below_minus20c,
       COUNT(CASE WHEN PRCP_MM > 100 THEN 1 END) as heavy_rainfall_days,
       MAX(MAX_C) as max_temp_recorded,
       MIN(MIN_C) as min_temp_recorded
FROM climate_data
WHERE MAX_C IS NOT NULL AND MIN_C IS NOT NULL
GROUP BY year
ORDER BY year
""")

---
## 9. Nettoyage et fermeture

In [None]:
# Exporter les r√©sultats principaux
yearly_stats.coalesce(1).write.mode("overwrite").csv(
    "/home/jovyan/work/results/yearly_statistics", 
    header=True
)

regional_avg.coalesce(1).write.mode("overwrite").csv(
    "/home/jovyan/work/results/regional_analysis", 
    header=True
)

print("‚úì R√©sultats export√©s dans /home/jovyan/work/results/")

In [None]:
# Fermer la session Spark
# spark.stop()
# print("‚úì Session Spark ferm√©e")

---
## Notes finales

### Performance et optimisation:
- Les donn√©es ont √©t√© mises en cache pour acc√©l√©rer les requ√™tes r√©p√©t√©es
- Utilisation de `coalesce()` pour r√©duire le nombre de partitions lors des exports
- Les conversions vers Pandas sont limit√©es aux r√©sultats agr√©g√©s pour la visualisation

### Points d'am√©lioration possibles:
1. Int√©gration avec HDFS pour le stockage distribu√©
2. Utilisation de Spark Structured Streaming pour l'analyse en temps r√©el
3. Int√©gration avec MLlib pour des pr√©dictions avanc√©es
4. Dashboard interactif avec Plotly Dash ou Streamlit
5. Pipeline ETL automatis√© avec Airflow

### Ressources suppl√©mentaires:
- Documentation GSOD: https://www.ncei.noaa.gov/data/global-summary-of-the-day/doc/
- Spark SQL Guide: https://spark.apache.org/docs/latest/sql-programming-guide.html
- Climate Data Analysis: https://www.ipcc.ch/

---
**Auteur:** Analyse climatique avec Apache Spark  
**Date:** 2024  
**Version:** 1.0
"""