Import des librairies n√©cessaires √† la pr√©paration des donn√©es

pip install fsspec[hdfs] pandas geopandas shapely pyproj geopy

In [17]:
import pandas as pd
import os
import glob
from IPython.display import display

R√©cup√©ration du fichier avec la correspondance id_station et coordonn√©es GPS pour ne garder que les fichiers des stations pr√®s de NYC

Phase 1 : Initialisation de l'Environnement

In [18]:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pandas as pd
import requests
import json
from math import radians, cos, sin, asin, sqrt

# --- 1. Configuration de Robustesse et Initialisation Spark ---
# Augmentation des timeouts et allocation de m√©moire stricte pour √©viter les crashs JVM
spark = SparkSession.builder \
    .appName("DataLake_NOAA_NYC_Prep") \
    .config("spark.executor.memory", "3g") \
    .config("spark.driver.memory", "2g") \
    .config("spark.network.timeout", "800s") \
    .config("spark.rpc.askTimeout", "800s") \
    .getOrCreate()

# --- 2. D√©finition des Param√®tres G√©ographiques et HDFS ---
# Bo√Æte englobante de la r√©gion de NYC
MIN_LAT, MAX_LAT = 40.0, 41.5
MIN_LON, MAX_LON = -75.0, -73.0

# Chemin HDFS BRUT (Corrig√© avec l'h√¥te et le port du namenode)
RAW_OUTPUT_PATH = "hdfs://namenode:9000/user/mathis/datalake/noaa_gsod_nyc_raw_2005_2023.parquet"

# Plage d'ann√©es
START_YEAR = 2005
END_YEAR = 2023

print("Session Spark initialis√©e avec les configurations de robustesse.")

Session Spark initialis√©e avec les configurations de robustesse.


Phase 2 : M√©tadonn√©es et Identification des Stations NOAA

In [19]:
# --- 1. T√©l√©chargement des M√©tadonn√©es des Stations (via Pandas car petit fichier) ---
stations_url = "https://www.ncei.noaa.gov/pub/data/noaa/isd-history.csv"
pdf_stations = pd.read_csv(stations_url,
                         dtype={'USAF': str, 'WBAN': str})

pdf_stations['STN_ID'] = pdf_stations['USAF'].str.strip() + pdf_stations['WBAN'].str.strip()
pdf_stations = pdf_stations.rename(columns={'LAT': 'LATITUDE', 'LON': 'LONGITUDE'})
pdf_stations = pdf_stations.dropna(subset=['LATITUDE', 'LONGITUDE', 'STATION NAME'])
spark_stations_df = spark.createDataFrame(pdf_stations)

# --- 2. Filtrage G√©ographique ---
nyc_stations_spark = spark_stations_df.filter(
    (F.col('LATITUDE') >= MIN_LAT) & (F.col('LATITUDE') <= MAX_LAT) &
    (F.col('LONGITUDE') >= MIN_LON) & (F.col('LONGITUDE') <= MAX_LON)
)

# R√©cup√©ration de la liste des IDs pertinents (pour filtrage par nom de fichier)
relevant_station_ids = [row.STN_ID for row in nyc_stations_spark.select("STN_ID").collect()]

print(f"\n‚úÖ {nyc_stations_spark.count()} Stations NOAA pertinentes trouv√©es pr√®s de New York.")
# Gardons ce DataFrame pour la jointure des coordonn√©es plus tard


‚úÖ 93 Stations NOAA pertinentes trouv√©es pr√®s de New York.


Phase 2 bis : T√©l√©chargement des donn√©es

In [22]:
import fsspec
import requests
import time
from tqdm import tqdm

# --- Param√®tres de configuration ---
BASE_URL = "https://www.ncei.noaa.gov/data/global-summary-of-the-day/access"
HDFS_BASE_DIR = "/user/mathis/raw/noaa_gsod"  # dossier cible HDFS
START_YEAR = 2005
END_YEAR = 2023

# Connexion HDFS via fsspec
fs = fsspec.filesystem("hdfs", host="namenode", port=9000)

# V√©rification de la liste des stations
if "relevant_station_ids" not in locals():
    print("‚ö†Ô∏è ATTENTION: La liste 'relevant_station_ids' n'est pas d√©finie.")
    exit()

print(f"D√©marrage du t√©l√©chargement NOAA GSOD dans HDFS pour {len(relevant_station_ids)} stations.")

total_files = len(relevant_station_ids) * (END_YEAR - START_YEAR + 1)

# --- Barre de progression ---
with tqdm(total=total_files, desc="T√©l√©chargement GSOD vers HDFS") as pbar:

    for year in range(START_YEAR, END_YEAR + 1):
        year_dir = f"{HDFS_BASE_DIR}/{year}"

        # Cr√©e le dossier HDFS si n√©cessaire
        if not fs.exists(year_dir):
            fs.mkdir(year_dir)

        for station_id in relevant_station_ids:

            file_name = f"{station_id}.csv"
            hdfs_path = f"{year_dir}/{file_name}"
            remote_url = f"{BASE_URL}/{year}/{file_name}"

            # V√©rifie si le fichier existe d√©j√† dans HDFS
            if fs.exists(hdfs_path):
                pbar.update(1)
                continue

            try:
                # T√©l√©chargement HTTP
                response = requests.get(remote_url, timeout=10)
                response.raise_for_status()

                # √âcriture dans HDFS
                with fs.open(hdfs_path, "wb") as f:
                    f.write(response.content)

                pbar.update(1)
                time.sleep(0.1)

            except requests.exceptions.HTTPError as errh:
                if response.status_code == 404:
                    # Station sans donn√©es cette ann√©e ‚Üí normal
                    pass
                else:
                    print(f"\n‚ùå Erreur HTTP pour {remote_url}: {errh}")

            except requests.exceptions.RequestException as e:
                print(f"\n‚ùå Erreur Connexion/Timeout pour {remote_url}: {e}")

print("\n‚úÖ T√©l√©chargement termin√© ‚Äî fichiers GSOD stock√©s dans HDFS.")


OSError: Prior attempt to load libhdfs failed

In [14]:
import fsspec
import pandas as pd

# --- Configuration HDFS ---
HDFS_BASE_DIR = "/user/mathis/raw/noaa_gsod"
TARGET_YEAR = 2005

# Connexion HDFS
fs = fsspec.filesystem("hdfs", host="namenode", port=9000)

# 1. Lister les fichiers CSV pour l'ann√©e cible dans HDFS
file_pattern = f"{HDFS_BASE_DIR}/{TARGET_YEAR}/*.csv"
all_files_2005 = fs.glob(file_pattern)

if not all_files_2005:
    print(f"‚ùå Aucun fichier CSV trouv√© dans HDFS : {file_pattern}")

else:
    # 2. Trier les fichiers par nom (comme en local)
    all_files_2005.sort()
    first_file_path = all_files_2005[0]
    file_name = first_file_path.split("/")[-1]

    print(f"üìÅ Fichier HDFS cibl√© : {first_file_path}")

    # 3. Lecture des 5 premi√®res lignes via pandas
    try:
        with fs.open(first_file_path, "rb") as f:
            df_head = pd.read_csv(f, nrows=5)

        print(f"\n--- HEAD du fichier {file_name} ---")
        display(df_head)

    except Exception as e:
        print(f"‚ùå Erreur lors de la lecture du fichier CSV HDFS : {e}")


Fichier local cibl√© : /home/jovyan/work/data/noaa_gsod/2005/72224799999.csv

--- HEAD du fichier 72224799999.csv ---


Unnamed: 0,STATION,DATE,LATITUDE,LONGITUDE,ELEVATION,NAME,TEMP,TEMP_ATTRIBUTES,DEWP,DEWP_ATTRIBUTES,...,MXSPD,GUST,MAX,MAX_ATTRIBUTES,MIN,MIN_ATTRIBUTES,PRCP,PRCP_ATTRIBUTES,SNDP,FRSHTT
0,72224799999,2005-02-18,40.633,-74.667,32.0,"SOMERSET, NJ US",27.6,24,12.6,24,...,16.9,23.9,39.9,,24.1,,0.0,H,999.9,1000
1,72224799999,2005-02-19,40.633,-74.667,32.0,"SOMERSET, NJ US",23.0,24,3.9,24,...,13.0,20.0,32.0,,15.1,,0.0,I,999.9,0
2,72224799999,2005-02-20,40.633,-74.667,32.0,"SOMERSET, NJ US",27.3,24,13.4,24,...,7.0,999.9,37.9,,15.1,,0.0,I,999.9,0
3,72224799999,2005-02-21,40.633,-74.667,32.0,"SOMERSET, NJ US",32.4,24,29.4,24,...,9.9,16.9,37.9,,17.1,,0.41,D,999.9,101000
4,72224799999,2005-02-22,40.633,-74.667,32.0,"SOMERSET, NJ US",36.6,24,26.6,24,...,14.0,19.0,44.6,*,32.0,*,0.09,B,999.9,0


Phase 3 : Ingestion Cibl√©e et Persistance (Couche Raw)

In [19]:
import fsspec
from pyspark.sql.types import *

# --- Configuration des chemins ---
HDFS_BASE_DIR = "/user/mathis/raw/noaa_gsod"
RAW_OUTPUT_PATH = "hdfs://namenode:9000/user/mathis/datalake/noaa_gsod_nyc_raw_2005_2023.parquet"
START_YEAR = 2005
END_YEAR = 2023

# --- Connexion HDFS ---
fs = fsspec.filesystem("hdfs", host="namenode", port=9000)

# --- 1. Construction de la liste des fichiers GSOD existants dans HDFS ---
gsod_data_paths = []

for year in range(START_YEAR, END_YEAR + 1):
    for station_id in relevant_station_ids:
        hdfs_path = f"{HDFS_BASE_DIR}/{year}/{station_id}.csv"
        if fs.exists(hdfs_path):
            # IMPORTANT : Spark doit recevoir un chemin HDFS complet
            gsod_data_paths.append(f"hdfs://namenode:9000{hdfs_path}")

if not gsod_data_paths:
    raise FileNotFoundError("‚ùå Aucun fichier GSOD trouv√© dans HDFS.")

print(f"üìÅ Total de {len(gsod_data_paths)} fichiers GSOD trouv√©s dans HDFS pour lecture Spark.")

# --- 2. Sch√©ma GSOD ---
gsod_schema = StructType([
    StructField("STATION", StringType(), True),
    StructField("DATE", StringType(), True),
    StructField("LATITUDE", DoubleType(), True), 
    StructField("LONGITUDE", DoubleType(), True),
    StructField("ELEVATION", DoubleType(), True),
    StructField("NAME", StringType(), True),
    StructField("TEMP", DoubleType(), True),
    StructField("TEMP_ATTRIBUTES", StringType(), True),
    StructField("DEWP", DoubleType(), True),
    StructField("DEWP_ATTRIBUTES", StringType(), True),
    StructField("SLP", DoubleType(), True),
    StructField("SLP_ATTRIBUTES", StringType(), True),
    StructField("STP", DoubleType(), True),
    StructField("STP_ATTRIBUTES", StringType(), True),
    StructField("VISIB", DoubleType(), True),
    StructField("VISIB_ATTRIBUTES", StringType(), True),
    StructField("WDSP", DoubleType(), True),
    StructField("WDSP_ATTRIBUTES", StringType(), True),
    StructField("MXSPD", DoubleType(), True),
    StructField("GUST", DoubleType(), True),
    StructField("MAX", DoubleType(), True),
    StructField("MAX_ATTRIBUTES", StringType(), True),
    StructField("MIN", DoubleType(), True),
    StructField("MIN_ATTRIBUTES", StringType(), True),
    StructField("PRCP", DoubleType(), True),
    StructField("PRCP_ATTRIBUTES", StringType(), True),
    StructField("SNDP", DoubleType(), True),
    StructField("FRSHHT", StringType(), True),
])

# --- 3. Lecture Spark directement depuis HDFS ---
print(f"üì• Lecture distribu√©e des fichiers GSOD depuis HDFS...")

all_gsod_data = spark.read.csv(
    gsod_data_paths,
    header=True,
    schema=gsod_schema,
    sep=','
)

nyc_gsod_data = all_gsod_data.withColumnRenamed("STATION", "ID_STATION")

print(f"üìä Nombre de lignes charg√©es : {nyc_gsod_data.count()}")

# --- 4. Persistance de la couche brute ---
print(f"\nüíæ Sauvegarde sur HDFS dans : {RAW_OUTPUT_PATH}")

nyc_gsod_data.write.mode("overwrite").parquet(RAW_OUTPUT_PATH)

print("‚úÖ Donn√©es GSOD NYC brutes sauvegard√©es dans HDFS.")


Total de 552 fichiers existants seront lus par Spark.

Sauvegarde de la copie BRUTE filtr√©e (2005-2023) dans : hdfs://namenode:9000/user/mathis/datalake/noaa_gsod_nyc_raw_2005_2023.parquet...
‚úÖ Copie brute sauvegard√©e sur HDFS. Le traitement peut se poursuivre.


Phase 4 : Nettoyage et Jointure des Coordonn√©es (ETL)

In [20]:
import pyspark.sql.functions as F
from pyspark.sql.types import *

# --- Chemins de configuration ---
RAW_OUTPUT_PATH = "hdfs://namenode:9000/user/mathis/datalake/noaa_gsod_nyc_raw_2005_2023.parquet"
# nyc_stations_spark doit √™tre le DataFrame des stations filtr√©es de la Phase 2

# Lecture du fichier Parquet depuis HDFS pour la couche de Traitement/Nettoyage (ETL)
data_for_cleaning = spark.read.parquet(RAW_OUTPUT_PATH)

# --- 1. Nettoyage et Renommage des Colonnes ---
renaming_map = {
    "DATE": "DATE_OBSERVATION",
    "TEMP": "TEMP_MOYENNE_F",
    "DEWP": "POINT_ROSEE_F",
    "PRCP": "PRECIPITATION_POUCES",
    "MIN": "TEMP_MIN_F",
    "MAX": "TEMP_MAX_F",
    "WDSP": "VITESSE_VENT_NOEUDS",
    "VISIB": "VISIBILITE_MILLES",
    "SLP": "PRESSION_ATM_MER",
    "FRSHHT": "PHENOMENES"
}

gsod_renamed_df = data_for_cleaning
for old_name, new_name in renaming_map.items():
    if old_name in gsod_renamed_df.columns:
        gsod_renamed_df = gsod_renamed_df.withColumnRenamed(old_name, new_name)


# --- 2. Jointure des Coordonn√©es des Stations ---
# Ajoute la LATITUDE_STATION et LONGITUDE_STATION pour le calcul de distance
clean_final_gsod_df = gsod_renamed_df.join(
    nyc_stations_spark.select(
        'STN_ID', 
        F.col('LATITUDE').alias('LATITUDE_STATION'), 
        F.col('LONGITUDE').alias('LONGITUDE_STATION')
    ), 
    on=[F.col("ID_STATION") == F.col("STN_ID")],
    how='left'
).drop("STN_ID").drop("LATITUDE").drop("LONGITUDE") # Supprime les colonnes brutes des coordonn√©es


print("\nDonn√©es NOAA nettoy√©es et enrichies des coordonn√©es des stations (Couche ETL Compl√®te).")
clean_final_gsod_df.printSchema()


Donn√©es NOAA nettoy√©es et enrichies des coordonn√©es des stations (Couche ETL Compl√®te).
root
 |-- ID_STATION: string (nullable = true)
 |-- DATE_OBSERVATION: string (nullable = true)
 |-- ELEVATION: double (nullable = true)
 |-- NAME: string (nullable = true)
 |-- TEMP_MOYENNE_F: double (nullable = true)
 |-- TEMP_ATTRIBUTES: string (nullable = true)
 |-- POINT_ROSEE_F: double (nullable = true)
 |-- DEWP_ATTRIBUTES: string (nullable = true)
 |-- PRESSION_ATM_MER: double (nullable = true)
 |-- SLP_ATTRIBUTES: string (nullable = true)
 |-- STP: double (nullable = true)
 |-- STP_ATTRIBUTES: string (nullable = true)
 |-- VISIBILITE_MILLES: double (nullable = true)
 |-- VISIB_ATTRIBUTES: string (nullable = true)
 |-- VITESSE_VENT_NOEUDS: double (nullable = true)
 |-- WDSP_ATTRIBUTES: string (nullable = true)
 |-- MXSPD: double (nullable = true)
 |-- GUST: double (nullable = true)
 |-- TEMP_MAX_F: double (nullable = true)
 |-- MAX_ATTRIBUTES: string (nullable = true)
 |-- TEMP_MIN_F: do

In [15]:
import pyspark.sql.functions as F
from pyspark.sql import SparkSession

# --- Chemin de la Couche Raw sur HDFS ---
RAW_OUTPUT_PATH = "hdfs://namenode:9000/user/mathis/datalake/noaa_gsod_nyc_raw_2005_2023.parquet"

# Recharger la session Spark si elle a √©t√© arr√™t√©e
spark = SparkSession.builder.appName("Check_Raw_Data").getOrCreate()

try:
    # 1. Lecture du DataFrame Brute depuis HDFS
    df_raw_check = spark.read.parquet(RAW_OUTPUT_PATH)
    
    # Renommage de la colonne STATION pour la lisibilit√©
    df_raw_check = df_raw_check.withColumnRenamed("STATION", "ID_STATION")
    
    print(f"Schema des donn√©es brutes ({df_raw_check.count()} enregistrements) :")
    df_raw_check.printSchema()
    
    # 2. Affichage des 5 premi√®res lignes
    # On affiche les colonnes cl√©s (ID, Date, Temp√©rature, Pr√©cipitation)
    print("\n--- HEAD des Donn√©es Brutes Filtr√©es (NOAA 2005-2023) ---")
    df_raw_check.select(
        "ID_STATION",
        "DATE",
        "NAME",
        "TEMP",
        "PRCP",
        "DEWP",
        "SLP",
        "FRSHHT"
    ).show(5, truncate=False)

except Exception as e:
    print(f"‚ùå ERREUR lors de la lecture du fichier HDFS : {e}")

Schema des donn√©es brutes (193170 enregistrements) :
root
 |-- ID_STATION: string (nullable = true)
 |-- DATE: string (nullable = true)
 |-- LATITUDE: double (nullable = true)
 |-- LONGITUDE: double (nullable = true)
 |-- ELEVATION: double (nullable = true)
 |-- NAME: string (nullable = true)
 |-- TEMP: double (nullable = true)
 |-- TEMP_ATTRIBUTES: string (nullable = true)
 |-- DEWP: double (nullable = true)
 |-- DEWP_ATTRIBUTES: string (nullable = true)
 |-- SLP: double (nullable = true)
 |-- SLP_ATTRIBUTES: string (nullable = true)
 |-- STP: double (nullable = true)
 |-- STP_ATTRIBUTES: string (nullable = true)
 |-- VISIB: double (nullable = true)
 |-- VISIB_ATTRIBUTES: string (nullable = true)
 |-- WDSP: double (nullable = true)
 |-- WDSP_ATTRIBUTES: string (nullable = true)
 |-- MXSPD: double (nullable = true)
 |-- GUST: double (nullable = true)
 |-- MAX: double (nullable = true)
 |-- MAX_ATTRIBUTES: string (nullable = true)
 |-- MIN: double (nullable = true)
 |-- MIN_ATTRIBUTES:

Phase 5 : Pr√©paration des Coordonn√©es des Zones de Qualit√© de l'Air

In [16]:
# --- 1. T√©l√©chargement et Extraction du GeoJSON ---
geojson_url = "https://raw.githubusercontent.com/nycehs/NYC_geography/master/UHF42.geo.json"

response = requests.get(geojson_url)
geo_data_raw = response.json()

geo_records = []
for feature in geo_data_raw['features']:
    properties = feature['properties']
    geometry = feature['geometry']
    coords = geometry['coordinates']
    
    try:
        # Approximation du centro√Øde pour la distance
        if geometry['type'] == 'Polygon':
            lon = coords[0][0][0]
            lat = coords[0][0][1]
        elif geometry['type'] == 'MultiPolygon':
            lon = coords[0][0][0][0]
            lat = coords[0][0][0][1]
        else:
            continue
            
        geo_records.append({
            "GEOJOIN_ID": properties['GEOCODE'], 
            "LATITUDE_ZONE": lat,
            "LONGITUDE_ZONE": lon
        })
    except (IndexError, TypeError):
        continue

# --- 2. Cr√©ation du DataFrame Spark des Coordonn√©es des Zones ---
geo_schema = StructType([
    StructField("GEOJOIN_ID", StringType(), False),
    StructField("LATITUDE_ZONE", DoubleType(), True),
    StructField("LONGITUDE_ZONE", DoubleType(), True)
])

geo_df = spark.createDataFrame(geo_records, schema=geo_schema)

print(f"\n{geo_df.count()} zones de qualit√© de l'air (GeoJoin ID) pr√©par√©es avec leurs coordonn√©es.")

# --- Affichage des premi√®res lignes du DataFrame ---
print("\nPremi√®res lignes du DataFrame des coordonn√©es g√©ographiques :")
geo_df.show(5)


43 zones de qualit√© de l'air (GeoJoin ID) pr√©par√©es avec leurs coordonn√©es.

Premi√®res lignes du DataFrame des coordonn√©es g√©ographiques :
+----------+-------------+--------------+
|GEOJOIN_ID|LATITUDE_ZONE|LONGITUDE_ZONE|
+----------+-------------+--------------+
|       101|    40.905562|    -73.877928|
|       102|    40.889002|    -73.815044|
|       103|     40.88315|    -73.856266|
|       104|    40.821533|    -73.883659|
|       105|    40.859781|    -73.914444|
+----------+-------------+--------------+
only showing top 5 rows



Phase 6 : Calcul de la Distance (Haversine) et Voisin le Plus Proche

In [22]:
from pyspark.sql.window import Window

# Constante du rayon de la Terre en kilom√®tres (km)
R = 6371.0

# Formule de la Haversine (UDF)
def haversine(lon1, lat1, lon2, lat2):
    # Conversion de degr√©s en radians
    lon1_rad, lat1_rad, lon2_rad, lat2_rad = map(radians, [lon1, lat1, lon2, lat2])
    dlon = lon2_rad - lon1_rad
    dlat = lat2_rad - lat1_rad

    a = sin(dlat / 2)**2 + cos(lat1_rad) * cos(lat2_rad) * sin(dlon / 2)**2
    c = 2 * asin(sqrt(a))
    
    return R * c

# Enregistrement de la fonction comme une UDF (User Defined Function) pour Spark
haversine_udf = F.udf(haversine, DoubleType())

# --- 1. Pr√©paration des Coordonn√©es des Stations Uniques ---
stations_coords_df = clean_final_gsod_df.select(
    "ID_STATION", "LATITUDE_STATION", "LONGITUDE_STATION"
).distinct()

# --- 2. Jointure Cart√©sienne (Toutes les zones vs. Toutes les stations) ---
cross_joined_df = geo_df.crossJoin(stations_coords_df)


# --- 3. Calcul de la Distance pour chaque paire ---
distance_df = cross_joined_df.withColumn(
    "DISTANCE_KM",
    haversine_udf(
        F.col("LONGITUDE_ZONE"), 
        F.col("LATITUDE_ZONE"), 
        F.col("LONGITUDE_STATION"), 
        F.col("LATITUDE_STATION")
    )
)

# --- 4. Identification du Voisin le Plus Proche ---
# Trouve la ligne (station) avec la distance minimale pour chaque GEOJOIN_ID
window_spec = Window.partitionBy("GEOJOIN_ID").orderBy(F.col("DISTANCE_KM"))

nearest_station_df = distance_df.withColumn(
    "rank", 
    F.rank().over(window_spec)
).filter(F.col("rank") == 1).drop("rank")


print("\n‚úÖ Table de correspondance (Voisin le plus proche) cr√©√©e :")
nearest_station_df.select(
    "GEOJOIN_ID", 
    "ID_STATION", 
    "DISTANCE_KM"
).show(5, truncate=False)


‚úÖ Table de correspondance (Voisin le plus proche) cr√©√©e :
+----------+-----------+------------------+
|GEOJOIN_ID|ID_STATION |DISTANCE_KM       |
+----------+-----------+------------------+
|0         |74486094789|5.760247595051896 |
|101       |72503014732|14.074131616303024|
|102       |99728099999|9.640847117034312 |
|103       |99728099999|11.103908014210129|
|104       |72503014732|4.739471712784995 |
+----------+-----------+------------------+
only showing top 5 rows



Phase 7 : Jointure Finale et Persistance (Couche Insight)

In [23]:
import pyspark.sql.functions as F

# --- Configuration HDFS ---
# Chemin o√π sera stock√© le jeu de donn√©es final, pr√™t pour l'analyse
INSIGHT_OUTPUT_PATH = "hdfs://namenode:9000/user/mathis/datalake/noaa_gsod_nyc_enriched_clean.parquet"

# --- 1. Jointure d'Enrichissement ---
# On joint toutes les observations m√©t√©o (clean_final_gsod_df) aux informations de GeoJoin ID (nearest_station_df)
# La jointure s'effectue sur l'ID de la station m√©t√©o.

# NOTE: Ce bloc n√©cessite d'avoir ex√©cut√© la Phase 6 juste avant pour que 'nearest_station_df'
# et 'clean_final_gsod_df' soient disponibles en m√©moire.

final_insights_df = clean_final_gsod_df.join(
    nearest_station_df.select("GEOJOIN_ID", 
                              F.col("ID_STATION").alias("NEAREST_STATION_ID"), 
                              "DISTANCE_KM",
                              "LATITUDE_ZONE", 
                              "LONGITUDE_ZONE"),
    # La condition est que l'ID de la station m√©t√©o DOIT correspondre √† l'ID du voisin le plus proche.
    F.col("ID_STATION") == F.col("NEAREST_STATION_ID"),
    "inner" 
).drop("NEAREST_STATION_ID") # On retire cette colonne apr√®s la jointure


# --- 2. S√©lection et Ordre Final ---
# Cr√©e le sch√©ma final pour l'analyse, en pla√ßant les cl√©s d'analyse en t√™te
final_insights_df = final_insights_df.select(
    "GEOJOIN_ID", 
    "DATE_OBSERVATION", 
    # Mesures M√©t√©o
    "TEMP_MOYENNE_F", 
    "PRECIPITATION_POUCES", 
    "VITESSE_VENT_NOEUDS", 
    "PHENOMENES",
    # M√©ta-donn√©es de jointure
    "ID_STATION", 
    "DISTANCE_KM",
    "LATITUDE_ZONE", 
    "LONGITUDE_ZONE" # Coordonn√©es de la zone (utile pour la cartographie)
)

print("\nStructure du DataFrame Final (M√©t√©o attribu√©e √† chaque zone de qualit√© de l'air) :")
final_insights_df.printSchema()
final_insights_df.show(5)

# --- 3. Persistance de la Couche Insight sur HDFS ---
print(f"\nSauvegarde du DataFrame final Nettoy√©/Enrichi (Couche Insight) dans : {INSIGHT_OUTPUT_PATH}...")
# Sauvegarde au format Parquet
final_insights_df.write.mode("overwrite").parquet(INSIGHT_OUTPUT_PATH)
print("‚úÖ Jointure finale et persistance termin√©es. Le jeu de donn√©es m√©t√©o est pr√™t pour l'analyse.")


Structure du DataFrame Final (M√©t√©o attribu√©e √† chaque zone de qualit√© de l'air) :
root
 |-- GEOJOIN_ID: string (nullable = false)
 |-- DATE_OBSERVATION: string (nullable = true)
 |-- TEMP_MOYENNE_F: double (nullable = true)
 |-- PRECIPITATION_POUCES: double (nullable = true)
 |-- VITESSE_VENT_NOEUDS: double (nullable = true)
 |-- PHENOMENES: string (nullable = true)
 |-- ID_STATION: string (nullable = true)
 |-- DISTANCE_KM: double (nullable = true)
 |-- LATITUDE_ZONE: double (nullable = true)
 |-- LONGITUDE_ZONE: double (nullable = true)

+----------+----------------+--------------+--------------------+-------------------+----------+-----------+------------------+-------------+--------------+
|GEOJOIN_ID|DATE_OBSERVATION|TEMP_MOYENNE_F|PRECIPITATION_POUCES|VITESSE_VENT_NOEUDS|PHENOMENES| ID_STATION|       DISTANCE_KM|LATITUDE_ZONE|LONGITUDE_ZONE|
+----------+----------------+--------------+--------------------+-------------------+----------+-----------+------------------+------

Phase 8 (A) : T√©l√©chargement et Nettoyage du JSON Socrata

In [9]:
import requests
import json
import os

# --- Configuration des Chemins ---
LOCAL_BASE_DIR = "/home/jovyan/work/data/air_quality"
LOCAL_JSON_PATH = os.path.join(LOCAL_BASE_DIR, "nyc_air_quality_raw.json")
AIR_QUALITY_URL = "https://data.cityofnewyork.us/api/views/c3uy-2p5r/rows.json?accessType=DOWNLOAD"

# Cr√©e le r√©pertoire local si n√©cessaire
os.makedirs(LOCAL_BASE_DIR, exist_ok=True)

# --- 1. T√©l√©chargement et Nettoyage de la structure JSON Socrata ---
print(f"‚¨áÔ∏è T√©l√©chargement du JSON Socrata depuis l'API de NYC...")
try:
    response = requests.get(AIR_QUALITY_URL, timeout=300) # Timeout de 5 minutes
    response.raise_for_status()
    data = response.json()
    
    # La cl√© 'data' contient le tableau des enregistrements bruts que Spark doit lire.
    raw_records = data.get('data', [])

    if not raw_records:
        print("‚ùå Erreur : La cl√© 'data' est vide dans le JSON t√©l√©charg√©. Arr√™t du processus.")
        exit()
    
    # √âcriture du tableau de donn√©es brutes SEULEMENT dans le nouveau fichier JSON.
    # Ceci est essentiel pour que le RDD/toDF fonctionne correctement.
    with open(LOCAL_JSON_PATH, 'w') as f:
        json.dump(raw_records, f)

    print(f"‚úÖ Fichier JSON brut sauvegard√© et nettoy√© structurellement √† : {LOCAL_JSON_PATH}")
    
except Exception as e:
    print(f"‚ùå Erreur lors du t√©l√©chargement/nettoyage : {e}")
    exit()

‚¨áÔ∏è T√©l√©chargement du JSON Socrata depuis l'API de NYC...
‚úÖ Fichier JSON brut sauvegard√© et nettoy√© structurellement √† : /home/jovyan/work/data/air_quality/nyc_air_quality_raw.json


Phase 8 (B) : Ingestion, Normalisation et Pivotage de la Qualit√© de l'Air

In [11]:
import pyspark.sql.functions as F
from pyspark.sql.types import *
import requests
import json
import os

# --- Configurations des Chemins ---
LOCAL_BASE_DIR = "/home/jovyan/work/data/air_quality"
LOCAL_JSON_PATH = os.path.join(LOCAL_BASE_DIR, "nyc_air_quality_raw.json")
AIR_QUALITY_URL = "https://data.cityofnewyork.us/api/views/c3uy-2p5r/rows.json?accessType=DOWNLOAD"
AIR_QUALITY_OUTPUT_PATH = "hdfs://namenode:9000/user/mathis/datalake/nyc_air_quality_pivot.parquet"

# Cr√©e le r√©pertoire local si n√©cessaire
os.makedirs(LOCAL_BASE_DIR, exist_ok=True)


# --- 1. Lecture du tableau JSON en m√©moire Python (Assum√©e r√©ussie) ---
# Ce bloc d√©pend du succ√®s de l'√©tape de t√©l√©chargement et de nettoyage du JSON brut.
with open(LOCAL_JSON_PATH, 'r') as f:
    raw_socrata_array = json.load(f)

# --- 2. D√©finition du Sch√©ma RDD/toDF : Correction du format Socrata ---

# Mappage des index Socrata confirm√©s (Index dans le tableau data)
column_names = {
    17: "DATE_MESURE_BRUTE",         # Index 17 -> Start_Date
    14: "GEOJOIN_ID_BRUT",           # Index 14 -> Geo Join ID
    10: "NOM_POLLUANT",              # Index 10 -> Name (Polluant)
    18: "VALEUR_MESURE_BRUTE"        # Index 18 -> Data Value
}

# D√©finir tous les noms de colonnes jusqu'√† l'indice max (19)
# La correction est ici : range(20) pour obtenir 20 colonnes (0 √† 19)
col_labels = [f"col_{i}" for i in range(20)] # ANCIEN: range(19)

# Mappage des indices Socrata confirm√©s (inchang√©, ils sont tous < 20)
column_names = {
    17: "DATE_MESURE_BRUTE",
    14: "GEOJOIN_ID_BRUT",
    10: "NOM_POLLUANT",
    18: "VALEUR_MESURE_BRUTE"
}

# (Le reste du mappage est inchang√©)
for index, name in column_names.items():
    col_labels[index] = name

# CR√âATION DU SCH√âMA STRUCTUR√â (Tous en StringType)
target_schema = StructType([
    StructField(name, StringType(), True) for name in col_labels
])

# Cr√©ation du DataFrame Spark √† partir du RDD avec le sch√©ma explicite
air_quality_data_raw = spark.sparkContext.parallelize(raw_socrata_array).toDF(target_schema)

print(f"Qualit√© de l'air : {air_quality_data_raw.count()} enregistrements bruts lus.")
air_quality_data_raw.printSchema()


# --- 3. S√©lection et Nettoyage des Colonnes (ETL) ---

air_quality_df = air_quality_data_raw.select(
    "DATE_MESURE_BRUTE",
    "GEOJOIN_ID_BRUT",
    "NOM_POLLUANT",
    "VALEUR_MESURE_BRUTE"
)

air_quality_clean = air_quality_df.filter(
    F.col("VALEUR_MESURE_BRUTE").isNotNull()
).withColumn(
    # Conversion de la valeur de mesure en Double
    "VALEUR_MESURE", F.col("VALEUR_MESURE_BRUTE").cast(DoubleType())
).withColumn(
    # GeoJoin ID en String pour la jointure
    "GEOJOIN_ID", F.col("GEOJOIN_ID_BRUT").cast(StringType())
).withColumn(
    # Conversion de la date (YYYY-MM-DD...) en format YYYYMMDD entier pour la jointure avec NOAA.
    "DATE_OBSERVATION", 
    F.regexp_replace(F.substring(F.col("DATE_MESURE_BRUTE"), 1, 10), "-", "").cast(IntegerType())
).drop("DATE_MESURE_BRUTE", "VALEUR_MESURE_BRUTE", "GEOJOIN_ID_BRUT")

# --- 4. Croisement (Pivot) des Donn√©es de Qualit√© de l'Air ---
# Transforme les lignes de polluants en colonnes
air_quality_pivot = air_quality_clean.groupBy("GEOJOIN_ID", "DATE_OBSERVATION").pivot("NOM_POLLUANT").agg(
    F.mean("VALEUR_MESURE")
)

# Renommage des colonnes pivot√©es pour la clart√©
for col_name in air_quality_pivot.columns:
    if col_name not in ["GEOJOIN_ID", "DATE_OBSERVATION"]:
        # Nettoyage du nom du polluant
        new_col_name = f"QA_{col_name.replace(' ', '_').replace('.', '_').replace('(', '').replace(')', '').replace('/', '_')}_MOYENNE"
        air_quality_pivot = air_quality_pivot.withColumnRenamed(col_name, new_col_name)

print("\nStructure pivot√©e des donn√©es de qualit√© de l'air (pr√™te pour la jointure finale) :")
air_quality_pivot.printSchema()
air_quality_pivot.show(5)

# --- 5. Persistance sur HDFS ---
air_quality_pivot.write.mode("overwrite").parquet(AIR_QUALITY_OUTPUT_PATH)
print(f"‚úÖ Qualit√© de l'air pivot√©e et persist√©e sur HDFS √† : {AIR_QUALITY_OUTPUT_PATH}")

Qualit√© de l'air : 18862 enregistrements bruts lus.
root
 |-- col_0: string (nullable = true)
 |-- col_1: string (nullable = true)
 |-- col_2: string (nullable = true)
 |-- col_3: string (nullable = true)
 |-- col_4: string (nullable = true)
 |-- col_5: string (nullable = true)
 |-- col_6: string (nullable = true)
 |-- col_7: string (nullable = true)
 |-- col_8: string (nullable = true)
 |-- col_9: string (nullable = true)
 |-- NOM_POLLUANT: string (nullable = true)
 |-- col_11: string (nullable = true)
 |-- col_12: string (nullable = true)
 |-- col_13: string (nullable = true)
 |-- GEOJOIN_ID_BRUT: string (nullable = true)
 |-- col_15: string (nullable = true)
 |-- col_16: string (nullable = true)
 |-- DATE_MESURE_BRUTE: string (nullable = true)
 |-- VALEUR_MESURE_BRUTE: string (nullable = true)
 |-- col_19: string (nullable = true)


Structure pivot√©e des donn√©es de qualit√© de l'air (pr√™te pour la jointure finale) :
root
 |-- GEOJOIN_ID: string (nullable = true)
 |-- DATE_OBSERV

Phase 9 : Jointure Finale et Persistance du Jeu de Donn√©es Complet

In [12]:
import pyspark.sql.functions as F

# --- Configuration des Chemins (R√©utilis√©s) ---
# Chemin des donn√©es NOAA enrichies (Couche Insight apr√®s Phase 7)
INSIGHT_OUTPUT_PATH = "hdfs://namenode:9000/user/mathis/datalake/noaa_gsod_nyc_enriched_clean.parquet"
# Chemin des donn√©es Qualit√© de l'Air pivot√©es (Couche Pivot apr√®s Phase 8)
AIR_QUALITY_OUTPUT_PATH = "hdfs://namenode:9000/user/mathis/datalake/nyc_air_quality_pivot.parquet"
# Chemin du jeu de donn√©es final
FINAL_DATASET_PATH = "hdfs://namenode:9000/user/mathis/datalake/nyc_final_air_weather_dataset.parquet"

# --- 1. Lecture des deux sources enrichies depuis HDFS ---
final_insights_df = spark.read.parquet(INSIGHT_OUTPUT_PATH)
air_quality_pivot = spark.read.parquet(AIR_QUALITY_OUTPUT_PATH)

# --- 2. Jointure Finale Temporelle et G√©ographique ---
# La jointure se fait sur deux cl√©s : l'ID de la zone et la Date de l'observation
final_dataset = final_insights_df.join(
    air_quality_pivot,
    on=["GEOJOIN_ID", "DATE_OBSERVATION"],
    how="inner" # Utilisation de 'inner' pour ne garder que les jours o√π les DEUX mesures sont pr√©sentes
)

# --- 3. S√©lection Finale et Sauvegarde ---
# R√©organisation et s√©lection des colonnes pour le jeu de donn√©es d'analyse finale
final_dataset = final_dataset.select(
    "GEOJOIN_ID", 
    "DATE_OBSERVATION", 
    # M√©t√©o
    "TEMP_MOYENNE_F", 
    "PRECIPITATION_POUCES", 
    "VITESSE_VENT_NOEUDS", 
    "PHENOMENES",
    # Qualit√© de l'Air (toutes les colonnes commen√ßant par QA_)
    *[col for col in final_dataset.columns if col.startswith("QA_")],
    # M√©tadonn√©es de jointure
    F.col("DISTANCE_KM").alias("DISTANCE_STATION_KM"),
    "ID_STATION",
    "LATITUDE_ZONE", 
    "LONGITUDE_ZONE" 
)

print("\nStructure du Jeu de Donn√©es Final (M√©t√©o + Qualit√© de l'Air) :")
final_dataset.printSchema()
final_dataset.show(5)

# --- 4. Persistance Finale sur HDFS ---
final_dataset.write.mode("overwrite").parquet(FINAL_DATASET_PATH)

print(f"\n‚úÖ Jeu de donn√©es final (M√©t√©o + Qualit√© de l'Air) cr√©√© et persist√© √† : {FINAL_DATASET_PATH}")
print("Le pipeline d'Ingestion, Persistance et Traitement est termin√©. Votre jeu de donn√©es est pr√™t pour l'Insight. üìä")


Structure du Jeu de Donn√©es Final (M√©t√©o + Qualit√© de l'Air) :
root
 |-- GEOJOIN_ID: string (nullable = true)
 |-- DATE_OBSERVATION: string (nullable = true)
 |-- TEMP_MOYENNE_F: double (nullable = true)
 |-- PRECIPITATION_POUCES: double (nullable = true)
 |-- VITESSE_VENT_NOEUDS: double (nullable = true)
 |-- PHENOMENES: string (nullable = true)
 |-- QA_Annual_vehicle_miles_traveled_MOYENNE: double (nullable = true)
 |-- QA_Annual_vehicle_miles_traveled_cars_MOYENNE: double (nullable = true)
 |-- QA_Annual_vehicle_miles_traveled_trucks_MOYENNE: double (nullable = true)
 |-- QA_Asthma_emergency_department_visits_due_to_PM2_5_MOYENNE: double (nullable = true)
 |-- QA_Asthma_emergency_departments_visits_due_to_Ozone_MOYENNE: double (nullable = true)
 |-- QA_Asthma_hospitalizations_due_to_Ozone_MOYENNE: double (nullable = true)
 |-- QA_Boiler_Emissions-_Total_NOx_Emissions_MOYENNE: double (nullable = true)
 |-- QA_Boiler_Emissions-_Total_PM2_5_Emissions_MOYENNE: double (nullable = tr