In [1]:
%pip install -q -r requirements.txt

Note: you may need to restart the kernel to use updated packages.


In [2]:
%load_ext autoreload
%autoreload 1
from imports import *
from functions import *

In [3]:
import dask.dataframe as dd

# get all stations and some metadata as a Dask DataFrame
stations_df = api.stations()
# parse the response as a Dask DataFrame
stations_df = dd.from_pandas(api.stations(as_df=True), npartitions=4)

print(len(stations_df))

146


# Filter Buoys by Remarks

In [4]:
import dask.dataframe as dd
import numpy as np

access_error_url_list = []

# Liste de mots √† rechercher dans la colonne "Remark"
blacklist = ["Failure", "ceased", "failed", "recovered", "stopped", 'adrift']
stations_id_set = set()

print(f'Avant Filtre: {stations_df.shape[0].compute()}')  # Utiliser .compute() pour √©valuer l'expression et obtenir le nombre r√©el

# Liste pour collecter les indices √† supprimer
indices_a_supprimer = []

# Fonction qui sera appliqu√©e √† chaque partition
def filter_rows(df):
    indices_a_supprimer_partition = []
    for idx, row in df.iterrows():
        station_id = row["Station"]
        station_Location = row["Hull No./Config and Location"]  # Extraire la valeur de la cellule pour chaque ligne

        # V√©rifier si station_Location n'est pas NaN
        if isinstance(station_Location, str) and ")" in station_Location:
            station_name = station_Location.split(')')[1].rstrip(" )")  # On enl√®ve l'espace et la parenth√®se en fin de cha√Æne
        else:
            station_name = station_Location.strip() if isinstance(station_Location, str) else ""  # Si pas de ")", on garde toute la cha√Æne

        station_name = station_name.rstrip(" )").replace("(", "").replace(")", "").strip()

        # Nettoyage final pour enlever toute parenth√®se ou espace en fin de nom
        station_name = station_name.rstrip(" )")

        # V√©rifier si "Remark" n'est pas NaN et si un des √©l√©ments de blacklist est dans "Remark"
        if isinstance(row["Remark"], str) and any(blacklist_word.lower() in row["Remark"].lower() for blacklist_word in blacklist):
            # Ajouter l'index √† la liste
            indices_a_supprimer_partition.append(idx)
            url = get_buoy_url(station_id)
            access_error_url_list.append(url)
    
    # Retourner les lignes restantes apr√®s suppression
    return df.loc[~df.index.isin(indices_a_supprimer_partition)]

# D√©finir un meta (structure de donn√©es pour Dask)
meta = stations_df.head(0)  # Cela va retourner la structure des colonnes sans les donn√©es.

# Appliquer le filtre sur toutes les partitions
stations_df = stations_df.map_partitions(filter_rows, meta=meta)

# Utiliser .compute() pour obtenir la taille r√©elle apr√®s le filtrage
print(f'Apr√®s Filtre: {stations_df.shape[0].compute()}')  # Utiliser .compute() ici aussi pour √©valuer le r√©sultat


Avant Filtre: 146
Apr√®s Filtre: 43


# Build Buoys_datas Dict

In [5]:
# Dictionnaire pour stocker les DataFrames, cl√© : ID de la bou√©e, valeur : DataFrame
buoy_datas = {}
buoy_list = []

# Parcours de chaque bou√©e dans stations_df
for index, row in stations_df.iterrows():
    buoy_id = row['Station']
    metadata = get_station_metadata(buoy_id)

    # ‚úÖ R√©cup√©rer les donn√©es sous forme de dictionnaire
    buoy_info = parse_buoy_json(metadata)

    # ‚úÖ Stocker directement les donn√©es dans buoy_datas
    buoy_datas[buoy_id] = buoy_info
    buoy_list.append(buoy_id)

# Affichage du nombre de bou√©es r√©ussies et √©chou√©es
print(f"Nombre de bou√©es trait√©es : {len(buoy_datas)}\n")

# Afficher le contenu de buoy_datas

first_key = next(iter(buoy_datas))
first_key
buoy_datas[first_key]


üîç D√©but du parsing de la bou√©e...
üåç Zone de la station : southwest pass, la
üÜî Station ID : BURL1
‚úÖ Coordonn√©es extraites : Latitude = 28.91N, Longitude = 89.43W
üåä Water Depth : N/A
üå°Ô∏è Sea Temp Depth : None
üå¨Ô∏è Barometer Elevation (m): 13.4
üí® Anemometer Height : 38
üå§Ô∏è Air Temp Height : 13.7
üîó URL de la bou√©e : https://www.ndbc.noaa.gov/station_page.php?station=BURL1
‚úÖ Parsing termin√© !


üîç D√©but du parsing de la bou√©e...
üåç Zone de la station : grays reef
üÜî Station ID : 41008
‚úÖ Coordonn√©es extraites : Latitude = 31.40N, Longitude = 80.87W
üåä Water Depth : 16 m
üå°Ô∏è Sea Temp Depth : 2
üå¨Ô∏è Barometer Elevation (m): 2.4
üí® Anemometer Height : 3.8
üå§Ô∏è Air Temp Height : 3.4
üîó URL de la bou√©e : https://www.ndbc.noaa.gov/station_page.php?station=41008
‚úÖ Parsing termin√© !


üîç D√©but du parsing de la bou√©e...
üåç Zone de la station : five fingers, ak
üÜî Station ID : FFIA2
‚úÖ Coordonn√©es extraites : Latitude = 57

{'station_zone': 'southwest pass, la',
 'lat_buoy': '28.91N',
 'lon_buoy': '89.43W',
 'Water_depth': 'N/A',
 'sea_temp_depth': None,
 'Barometer_elevation': '13.4',
 'Anemometer_height': '38',
 'Air_temp_height': '13.7',
 'url': 'https://www.ndbc.noaa.gov/station_page.php?station=BURL1'}

# Marine and Meteo Data Collection Process

In [6]:
# üöÄ D√©marrage du processus
print("\nüöÄ D√©marrage du processus de collecte des donn√©es...\n")

# Initialisation des compteurs
marine_data_collected_successfully = marine_data_collected_failed = 0
meteo_data_collected_successfully = meteo_data_collected_failed = 0

success = False
total_stations = stations_df.shape[0]
count = 0

# üîÑ Parcours des bou√©es / stations
for idx, row in stations_df.iterrows():
    buoy_id = row["Station"]

    ######### üåä MARINE DATA #########
    try:
        df_marine = NDBC.realtime_observations(buoy_id)
        if df_marine is None or df_marine.empty:
            marine_data_collected_failed += 1
            continue

        marine_data_collected_successfully += 1
    except Exception as e:
        print(f"‚ö†Ô∏è Erreur collecte marine {buoy_id}: {e}")
        marine_data_collected_failed += 1
        continue

    # Ajout des m√©tadonn√©es
    try:
        buoy_info = buoy_datas.get(buoy_id, {})
        Lat, Lon = buoy_info.get('lat_buoy'), buoy_info.get('lon_buoy')
        if Lat is None or Lon is None:
            raise ValueError(f"Donn√©es manquantes pour {buoy_id}")

        df_marine['Lat'] = Lat
        df_marine['Lon'] = Lon
        df_marine['Water_depth'] = buoy_info.get('Water_depth', None)
        df_marine.columns = ['Datetime' if 'date' in col.lower() or 'time' in col.lower() else col for col in df_marine.columns]
        df_marine['Datetime'] = df_marine['Datetime'].dt.tz_localize(None)

        buoy_datas[buoy_id]["Marine"] = df_marine

        station_zone = safe_get(parse_buoy_json(get_station_metadata(buoy_id)), "station_zone")
        Bronze_Marine_table_Name = f"br_{buoy_id}_marine_{station_zone}".replace('.', '_').replace('-', '_').replace(' ', '_').lower()

    except Exception as e:
        print(f"‚ö†Ô∏è Erreur m√©tadonn√©es marine {buoy_id}: {e}")
        marine_data_collected_failed += 1
        continue

    ######### ‚õÖ METEO DATA #########
    try:
        df_meteo = meteo_api_request([Lat, Lon])
        if df_meteo is None or df_meteo.empty:
            meteo_data_collected_failed += 1
            continue
        
        rename_columns(df_meteo, {'date':'Datetime'})
        df_meteo.columns = ['Datetime' if 'date' in col.lower() or 'time' in col.lower() else col for col in df_meteo.columns]
        df_meteo['Datetime'] = df_meteo['Datetime'].dt.tz_localize(None)
    
        buoy_datas[buoy_id]["Meteo"] = df_meteo
        meteo_data_collected_successfully += 1
    except Exception as e:
        print(f"‚ö†Ô∏è Erreur collecte m√©t√©o {buoy_id}: {e}")
        meteo_data_collected_failed += 1
        continue

# Retirer les bou√©es avec des DataFrames vides ou None
buoy_datas = {buoy_id: data for buoy_id, data in buoy_datas.items() 
              if "Marine" in data and data["Marine"] is not None and not data["Marine"].empty
              and "Meteo" in data and data["Meteo"] is not None and not data["Meteo"].empty}

# üîö R√©sum√© final

print("\nüìù R√©sum√© final :")
print(f"üåä Marine - Collecte ‚úÖ {marine_data_collected_successfully} ‚ùå {marine_data_collected_failed}")
print(f"‚õÖ M√©t√©o - Collecte ‚úÖ {meteo_data_collected_successfully} ‚ùå {meteo_data_collected_failed}")

# Afficher la longueur du dictionnaire (nombre de bou√©es avec des donn√©es valides)
print(f"\nüìä Nombre de bou√©es avec des donn√©es valides : {len(buoy_datas)}")


üöÄ D√©marrage du processus de collecte des donn√©es...


üîç D√©but du parsing de la bou√©e...
üåç Zone de la station : southwest pass, la
üÜî Station ID : BURL1
‚úÖ Coordonn√©es extraites : Latitude = 28.91N, Longitude = 89.43W
üåä Water Depth : N/A
üå°Ô∏è Sea Temp Depth : None
üå¨Ô∏è Barometer Elevation (m): 13.4
üí® Anemometer Height : 38
üå§Ô∏è Air Temp Height : 13.7
üîó URL de la bou√©e : https://www.ndbc.noaa.gov/station_page.php?station=BURL1
‚úÖ Parsing termin√© !

üìä station_zone : southwest pass, la
üîÑ Colonne 'date' renomm√©e en 'Datetime'
‚úÖ Colonnes renomm√©es : {'date': 'Datetime'}

üîç D√©but du parsing de la bou√©e...
üåç Zone de la station : grays reef
üÜî Station ID : 41008
‚úÖ Coordonn√©es extraites : Latitude = 31.40N, Longitude = 80.87W
üåä Water Depth : 16 m
üå°Ô∏è Sea Temp Depth : 2
üå¨Ô∏è Barometer Elevation (m): 2.4
üí® Anemometer Height : 3.8
üå§Ô∏è Air Temp Height : 3.4
üîó URL de la bou√©e : https://www.ndbc.noaa.gov/station_page.ph

In [7]:
display_buoys_missing_df_counts(buoy_datas)


üåä Nombre de bou√©es sans donn√©es 'Marine' : 0/41

‚òÅÔ∏è Nombre de bou√©es sans donn√©es 'Meteo' : 0/41


# Data Enrichment with MetaDatas

In [None]:
# Liste des colonnes √† ne pas inclure dans l'ajout
list_not_include = ['lon_buoy', "lat_buoy", "url"]

# Parcours des bou√©es
for buoy_id, value in buoy_datas.items():
    print(f"\nüîç Traitement de la Station ID: {buoy_id}")

    marine_df = buoy_datas[buoy_id]["Marine"]
    meteo_df = buoy_datas[buoy_id]["Meteo"]

    try:
        # R√©cup√©rer les m√©tadonn√©es de la station
        buoy_metadata = get_station_metadata(buoy_id)
        parsed_data = parse_buoy_json(buoy_metadata)

        # Mise √† jour du dictionnaire avec les m√©tadonn√©es
        data = buoy_datas[buoy_id]
        data.update(parsed_data)
        
        # Si marine_df est un DataFrame Pandas, on le convertit en Dask
        if marine_df is not None:
            # V√©rifier si le DataFrame est valide
            if not marine_df.empty:
                marine_df = dd.from_pandas(marine_df, npartitions=4)
                marine_df["Station ID"] = str(buoy_id)  # Ajout de la station ID

                # Ajouter les m√©tadonn√©es
                for key, value in parsed_data.items():
                    if key not in list_not_include:
                        marine_df[key] = value
                        print(f"‚úÖ Colonne '{key}' ajout√©e au DataFrame de la station {buoy_id}")

                # Mise √† jour de la bou√©e avec le DataFrame Dask
                buoy_datas[buoy_id]["Marine"] = marine_df

    except Exception as e:
        print(f"‚ùå Erreur pour la station {buoy_id}: {e}")

# V√©rification de l'ajout des colonnes en prenant un id au hasard
station_id = random.choice(list(buoy_datas.keys()))
marine_df = buoy_datas[station_id]["Marine"]

if marine_df is not None:
    print("\nColonnes ajout√©es au DataFrame de la station", station_id)
    print(marine_df.columns.to_list())  # Affiche les colonnes sans faire de .compute()



üîç Traitement de la Station ID: BURL1


RuntimeError: Failed to generate metadata for df. This operation may not be supported by the current backend.

In [None]:
display_row_values(df_marine)

In [None]:
display(df_marine.columns)
display(df_meteo.columns)

# Handle Null Values

In [None]:
important_columns_oceanography = [
    'wind_direction',             
    'wind_speed',                 
    'wave_height',                   
    'pressure',                   
    'air_temperature',            
    'water_temperature',          
    'Datetime',
    'Lat',
    'Lon'                 
]

important_columns_meteorology = [
    'temperature_2m',             
    'relative_humidity_2m',       
    'dew_point_2m',               
    'precipitation',              
    'pressure_msl',               
    'cloud_cover',                
    'wind_speed_10m',             
    'Datetime'
]

stations_depart = len(buoy_datas)
ignored_buoys = {}  # Dictionary to track ignored buoys and their reasons

for station_id, data in buoy_datas.items():
    print(f"\nüîÑ Nettoyage des donn√©es pour la station {station_id}")

    marine_df = data.get("Marine")
    meteo_df = data.get("Meteo")

    if marine_df is None or meteo_df is None:
        ignored_buoys[station_id] = "Marine DataFrame ou Meteo DataFrame manquant(e)"
        print(f"‚ö†Ô∏è Station {station_id} ignor√©e: Marine DataFrame ou Meteo DataFrame manquant(e)")
        continue

    try:
        # Convertir les pandas DataFrames en Dask DataFrames
        marine_df = dd.from_pandas(marine_df, npartitions=4)
        meteo_df = dd.from_pandas(meteo_df, npartitions=4)

        # Nettoyage des DataFrames
        cleaned_marine_df = handle_null_values(marine_df)
        cleaned_meteo_df = handle_null_values(meteo_df)

        # V√©rification des colonnes importantes apr√®s nettoyage
        marine_columns_ok = all(col in cleaned_marine_df.columns for col in important_columns_oceanography)
        meteo_columns_ok = all(col in cleaned_meteo_df.columns for col in important_columns_meteorology)

        # Track which columns are missing
        missing_marine_columns = [col for col in important_columns_oceanography if col not in cleaned_marine_df.columns]
        missing_meteo_columns = [col for col in important_columns_meteorology if col not in cleaned_meteo_df.columns]

        if missing_marine_columns or missing_meteo_columns:
            ignored_buoys[station_id] = f"Colonnes manquantes: Marine: {missing_marine_columns}, Meteo: {missing_meteo_columns}"
            print(f"‚ö†Ô∏è Station {station_id} ignor√©e: Colonnes manquantes - Marine: {missing_marine_columns}, Meteo: {missing_meteo_columns}")
            continue

        # Ajouter le DataFrame nettoy√© au dictionnaire des r√©sultats
        buoy_datas[station_id]['Cleaned Marine'] = cleaned_marine_df
        buoy_datas[station_id]['Cleaned Meteo'] = cleaned_meteo_df
        print(f"‚úÖ Nettoyage r√©ussi pour la station {station_id} ({cleaned_marine_df.shape[0].compute()} lignes)")

    except Exception as e:
        ignored_buoys[station_id] = f"Erreur lors du nettoyage: {e}"
        print(f"‚ùå Erreur lors du nettoyage pour {station_id}: {e}")

# üî• Suppression des stations ignor√©es du dictionnaire principal
for station_id in ignored_buoys:
    buoy_datas.pop(station_id, None)

len_cleaned_data = len([data for data in buoy_datas.values() if 'Cleaned Marine' in data and 'Cleaned Meteo' in data])

# R√©sum√© final du nettoyage
print("\nüìä R√âSUM√â DU NETTOYAGE:")
print(f"üìå Stations au d√©part : {stations_depart}")
print(f"‚úÖ Stations nettoy√©es : {len_cleaned_data}")
print(f"üèÅ Stations restantes apr√®s filtrage :")

for station_id, reason in ignored_buoys.items():
    print(f"üõë Station {station_id} ignor√©e: {reason}")

print(f"\nüßπ Cl√©s restantes dans buoy_datas apr√®s purge : {len(buoy_datas)} (attendu : {len_cleaned_data})")
