In [None]:
%pip install -q -r requirements.txt

In [None]:
from imports import *
from functions import *
from IPython.core.display import *

In [None]:
path_postgresql_creds = r"C:\Users\f.gionnane\Documents\Data Engineering\Credentials\postgresql_creds.json"
with open(path_postgresql_creds, 'r') as file:
    content = json.load(file)
    user = content["user"]
    password = content["password"]
    host = content["host"]
    port = content["port"]

db = "MyProjects"
schema = "End_To_End_Oceanography_ML"

# Créer l'engine PostgreSQL
engine = create_engine(f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{db}")
conn = engine.connect()

Get Available Stations ID List
Filter Dysfunctional Stations

In [None]:
# get all stations and some metadata as a Pandas DataFrame
stations_df = api.stations()
# parse the response as a dictionary
stations_df = api.stations(as_df=True)
stations_df.head()

In [None]:
access_error_url_list = []

# Liste de mots à rechercher dans la colonne "Remark"
blacklist = ["Failure", "ceased", "failed", "recovered", "stopped", 'adrift']
stations_id_set = set()

print(f'Avant Filtre: {stations_df.shape[0]}')

# Liste pour collecter les indices à supprimer
indices_a_supprimer = []

# Parcours des lignes de la DataFrame
for idx, row in stations_df.iterrows():
    station_id = row["Station"]
    station_Location = row["Hull No./Config and Location"]  # Extraire la valeur de la cellule pour chaque ligne
    
    # Extraction du nom de la station si un ")" est trouvé
    if ")" in station_Location:
        station_name = station_Location.split(')')[1].rstrip(" )")  # On enlève l'espace et la parenthèse en fin de chaîne
    else:
        station_name = station_Location.strip()  # Si pas de ")", on garde toute la chaîne

    station_name = station_name.rstrip(" )").replace("(", "").replace(")", "").strip()

    # Nettoyage final pour enlever toute parenthèse ou espace en fin de nom
    station_name = station_name.rstrip(" )")

    # Vérifier si "Remark" n'est pas NaN et si un des éléments de blacklist est dans "Remark"
    if isinstance(row["Remark"], str) and any(blacklist_word.lower() in row["Remark"].lower() for blacklist_word in blacklist):
        # Ajouter l'index à la liste
        indices_a_supprimer.append(idx)
    else:
        try:
            # Effectuer l'appel API
            buoy_data = NDBC.realtime_observations(station_id)
            
            # Vérifier si les données de l'API sont valides (si le DataFrame n'est pas vide)
            if not buoy_data.empty:
                print(f'Buoy {station_id}: {station_name} passed the Remarks and API Test!')
                stations_id_set.add(station_id)
            else:
                print(f'Buoy {station_id}: {station_name} did not return valid data. Deleting.')
                indices_a_supprimer.append(idx)

        except Exception as e:
            # Si l'erreur est un HTTPError, on peut essayer d'afficher le code d'erreur
            if isinstance(e, HTTPError):
                print(f'Buoy {station_id}: {station_name} API Call returned {e.code}. Deleting.')
            else:
                # Dans tous les autres cas d'exception, on affiche le message d'erreur complet
                print(f'Buoy {station_id}: {station_name} API Call encountered an error. Deleting.')
                
                if str(e).startswith("Error accessing"):
                    url = f"https://www.ndbc.noaa.gov/station_page.php?station={station_id}"
                    access_error_url_list.append([station_id, url])
            # Ajouter l'index à la liste en cas d'erreur
            indices_a_supprimer.append(idx)

# Supprimer les lignes après la boucle
stations_df.drop(index=indices_a_supprimer, inplace=True)

print(f'Après Filtre: {stations_df.shape[0]}')

In [None]:
for item in access_error_url_list:
    print(f"Access error for buoy {item[0]}")
    print(f"{item[1]}\n")

In [None]:
stations_id_list = list(stations_id_set)

for item in stations_id_list:
    metadata = get_station_metadata(item)
    station_name, station_id, station_zone, lat_buoy, lon_buoy, marine_data_table_name = parse_buoy_json(metadata)
    print(f"Name: {station_name}")
    print(f"Station ID: {station_id}")
    print(f"Lat: {lat_buoy}")
    print(f"Lon: {lon_buoy}")
    print(f"Zone: {station_zone}")
    print(f"Base Table Name: {marine_data_table_name}\n")

In [None]:
metadata = get_station_metadata("FFIA2")
station_name, station_id, station_zone, lat_buoy, lon_buoy, marine_data_table_name = parse_buoy_json(metadata)
print(station_zone)
for key, value in metadata.items():
    print(key)
    print(value)
    print(" ")

In [None]:
stations_dict = {}

for station in stations_id_list:
    metadata = get_station_metadata(station)  # Récupérer les métadonnées de la station

    # Assurez-vous que les métadonnées sont correctement récupérées avant de les analyser
    if metadata:
        # Parser les métadonnées de la station
        station_name, station_id, station_zone, lat_buoy, lon_buoy, marine_data_table_name = parse_buoy_json(metadata)
        stations_dict[station_id] = {}

        # Remplir le dictionnaire avec les données traitées
        stations_dict[station_id]["Station Name"] = station_name
        stations_dict[station_id]["Lat"] = lat_buoy
        stations_dict[station_id]["Lon"] = lon_buoy
        stations_dict[station_id]["Zone"] = station_zone
        stations_dict[station_id]["Table Name"] = marine_data_table_name

        # Affichage pour chaque station
        print(f"Name: {station_name}")
        print(f"Station ID: {station_id}")
        print(f"Lat: {lat_buoy}")
        print(f"Lon: {lon_buoy}")
        print(f"Zone: {station_zone}")
        print(f"Base Table Name: {marine_data_table_name}\n")

# Résultat final du dictionnaire des stations
stations_dict

In [None]:
def print_with_flush(message):
    sys.stdout.write(f'\r{message}  ')  # \r permet de revenir au début de la ligne
    sys.stdout.flush()  # Force l'affichage immédiat

Build Bronze Layer Table Names

In [None]:
stations_id_list =list(stations_id_set)
buoy_chosen = random.choice(stations_id_list)

In [None]:
buoy_chosen_metadata = get_station_metadata(buoy_chosen)
buoy_chosen_metadata

In [None]:
# Exemple d'utilisation avec un dictionnaire 'buoy_chosen_metadata'
try:
    lat_buoy, lon_buoy, station_name, station_id, station_zone, marine_data_table_name = parse_buoy_json(buoy_chosen_metadata)
    print(lat_buoy, lon_buoy, station_name, station_id, station_zone, marine_data_table_name)
except ValueError as e:
    print(f"Erreur lors du traitement des données: {e}")

In [None]:
df_marine = NDBC.realtime_observations(buoy_chosen)
print(type(buoy_chosen_metadata))
df_marine.head()

Marine API

Get Data From Json

In [None]:
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin
from IPython.core.display import display, HTML

# Remplacer `{station_id}` par l'identifiant de la station spécifique
url = f"https://www.ndbc.noaa.gov/station_page.php?station={station_id}"

# Faire une requête GET pour obtenir le HTML de la page
response = requests.get(url)

# Vérifier que la requête a réussi
if response.status_code == 200:
    # Parse le HTML avec BeautifulSoup
    soup = BeautifulSoup(response.text, 'html.parser')
    
    # Trouver la division avec l'ID 'stationmetadata'
    station_metadata = soup.find(id="stationmetadata")
    
    # Vérifier si la division existe
    if station_metadata:
        # Chercher les deux images spécifiques
        img_1 = station_metadata.find('img', src='/images/stations/3mfoam_scoop_mini.jpg')
        img_2 = station_metadata.find('img', src='/images/buoycam/W64A_2025_03_15_1510.jpg')

        # Si l'image 1 est trouvée, modifier son lien en absolu
        if img_1:
            img_1['src'] = urljoin(url, img_1['src'])

        # Si l'image 2 est trouvée, modifier son lien en absolu
        if img_2:
            img_2['src'] = urljoin(url, img_2['src'])
        
        # Afficher directement le HTML avec les liens des images mis à jour
        display(HTML(str(station_metadata)))  # Affiche la division en HTML rendu
    else:
        print("La division avec l'ID 'stationmetadata' n'a pas été trouvée.")
else:
    print(f"Erreur lors de la récupération de la page, statut: {response.status_code}")


Get Stations from Caribbean Zone

In [None]:
# Création du nom de la table
bronze_meteo_data_table_name = f"bronze_meteo_data_{station_name.replace(' ', '_')}_{station_zone.replace(' ', '_')}_{str(lat_buoy).replace('.', '-')}_{str(lon_buoy).replace('.', '-')}"
bronze_meteo_data_table_name = bronze_meteo_data_table_name.replace('.', '-')
load_data_in_table(db=db, schema=schema, table_name=bronze_meteo_data_table_name, df=df_meteo,conn=conn,key_column='date')

In [None]:
with open("buoy_near_SM.json", "r") as f:
    buoy_near_SM = json.load(f)


path_postgresql_creds = r"C:\Users\f.gionnane\Documents\Data Engineering\Credentials\postgresql_creds.json"
with open(path_postgresql_creds, 'r') as file:
    content = json.load(file)
    user = content["user"]
    password = content["password"]
    host = content["host"]
    port = content["port"]

# Créer l'engine PostgreSQL
engine = create_engine(f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{db}")
conn = engine.connect()

db = buoy_near_SM["db"]
schema = buoy_near_SM["schema"] 

bronze_marine_data_table_name = buoy_near_SM["bronze_marine"] 
bronze_meteo_data_table_name = buoy_near_SM["bronze_meteo"] 

In [None]:
try:
    df_marine_to_clean = fetch_table_data(conn=conn, schema=schema, table_name= bronze_marine_data_table_name)
    df_meteo_to_clean = fetch_table_data(conn=conn, schema=schema, table_name= bronze_meteo_data_table_name)
except Exception as e:
    print(e)

In [None]:
def auto_convert(df):
    for col in df.columns:
        # Essayer de convertir en datetime
        if df[col].dtype == 'object':
            try:
                # Si tu connais le format, tu peux spécifier ici, par exemple '%Y-%m-%d'
                # Exemple de format: '2021-01-01' ou '01/01/2021'
                df[col] = pd.to_datetime(df[col], format='%Y-%m-%d', errors='raise')  # Converte en datetime
            except Exception as e:
                pass

        # Essayer de convertir en numérique
        if df[col].dtype == 'object':
            try:
                df[col] = pd.to_numeric(df[col], errors='raise')  # Converte en numérique
            except Exception as e:
                pass

    return df

df_marine_to_clean = auto_convert(df_marine_to_clean)
print(df_marine_to_clean.dtypes)
df_marine_to_clean.head()

In [None]:
df_marine_to_clean = handle_null_values(df_marine_to_clean)

In [None]:
df_meteo_to_clean = auto_convert(df_meteo_to_clean)
df_meteo_to_clean.dtypes

In [None]:
df_marine_to_clean = handle_null_values(df_marine_to_clean)
df_meteo_to_clean = handle_null_values(df_meteo_to_clean)

In [None]:
print(df_marine_to_clean.columns)
print(df_meteo_to_clean.columns)

In [None]:
df_meteo_to_clean.head()

open-meteo API

In [None]:
df_meteo_to_clean.isna().sum()

In [None]:
df_meteo = drop_columns_if_exist(df_meteo,['rain', 'showers','soil_moisture_0_to_1cm', 'cloud_cover', 'soil_temperature_0cm',	'soil_moisture_0_to_1cm', 'is_day'])
df_meteo.head()

In [None]:
df_meteo.rename(columns={'temperature_2m': 'T°(C°)', 
                         'relative_humidity_2m': 'Relative Humidity (%)',
                         'dew_point_2m': 'Dew Point (°C)', 
                         'precipitation': 'Precipitation (mm)', 
                         'pressure_msl':' Sea Level Pressure (hPa)', 
                         'cloud_cover_low':'Low Clouds (%)',
                         'cloud_cover_mid' : 'Middle Clouds (%)',	
                         'cloud_cover_high' : 'High Clouds (%)', 
                         'visibility' : ' Visibility (%)', 
                         'wind_speed_10m' : 'Wind Speed (km/h)'}, 
                         inplace=True)
df_marine.rename(columns={
    'wind_direction': 'Wind Direction (°)',
    'wind_speed': 'Wind Speed (km/h)',
    'wind_gust': 'Wind Gusts (km/h)',
    'wave_height': 'Wave Height (m)',
    'average_wave_period': 'Average Wave Period (s)',
    'dominant_wave_direction': 'Dominant Wave Direction (°)',
    'pressure': 'Pressure (hPA)',
    'air_temperature': 'Air T°',
    'water_temperature': 'Water T°'}, 
    inplace=True)

print(df_meteo.columns)
print(df_marine.columns)
print(df_marine.shape)
print(df_meteo.shape)

Merging Dataframes

In [None]:
# Effectuer la jointure interne sur la colonne 'time'
df_merged = pd.merge(df_marine, df_meteo, on = 'Datetime', how='inner')

# Afficher le résultat
print(df_merged.shape)
print(df_merged.dtypes)
df_merged.head(20)

In [None]:
# Exemple d'utilisation
df_merged = add_daytime_and_month_column(df_merged,'Datetime')

print(df_merged.columns)
df_merged.head(10)

In [None]:
df_merged['Wind Speed (km/h)'] = (df_merged['Wind Speed (km/h)_x']+ df_merged['Wind Speed (km/h)_y'])/2
df_merged = drop_columns_if_exist(df_merged, ['Wind Speed (km/h)_x', 'Wind Speed (km/h)_y', 'Wind Gusts (km/h)'])

print(df_merged.columns)
df_merged.head()

Connexion BigQuery

In [None]:
# from google.oauth2 import service_account
# from google.cloud import storage  # Exemple pour Google Cloud Storage
# from google.cloud import bigquery
# from google.cloud.exceptions import NotFound
# import pandas as pd
# import pyarrow

# path_to_google_creds = r"C:\Users\f.gionnane\Documents\Data Engineering\Credentials\google_credentials.json"

# bq_client = bigquery.Client.from_service_account_json(
#     path_to_google_creds)
# project_id = "rare-bloom-419220"
# dataset_End_To_End_Oceanography_ML = "End_To_End_Oceanography_ML"
# bq_client
# # dataset_ref = bq_client.dataset('my_dataset_name', project=project_id)


# # LIST DATASETS AND FIND ONE
# datasets = list(bq_client.list_datasets())  # Make an API request.
# project = client.project
# bq_datasets_list =[]

# if datasets:
#     print("Datasets in project {}:".format(project))
#     for dataset in datasets:
#         print("\t{}".format(dataset.dataset_id))
#         bq_datasets_list.append(dataset.dataset_id)
#     if dataset_End_To_End_Oceanography_ML in bq_datasets_list:
#         dataset =  dataset_End_To_End_Oceanography_ML
#         print("Dataset Found !")
# else:
#     print("{} project does not contain any datasets.".format(project))

# # (developer): Set table_id to the ID of the table to determine existence.
# # table_id = "your-project.your_dataset.your_table"

# try:
#     table_ref = bq_client.dataset(dataset).table(table_name)
#     bq_client.get_table(table_ref)  # Make an API request.
#     print("Table {} already exists.".format(table_name))
# except NotFound:
#     print("Table {} is not found.".format(table_name))


# def clean_column_names(df):

#     cleaned_columns = []
#     for column in df.columns:
#         # Remplacer tous les caractères non alphanumériques (sauf underscores) par un underscore
#         cleaned_column = re.sub(r'[^A-Za-z0-9_]', '_', column)
        
#         # Ajouter le nom de colonne nettoyé à la liste
#         cleaned_columns.append(cleaned_column)
    
#     # Appliquer les nouveaux noms de colonnes au DataFrame
#     df.columns = cleaned_columns
#     return df

# df_merged = clean_column_names(df_merged)

# table_id = f"{project_id}.{dataset}.{table_name}"


# try:
#     bq_client.get_table(table_id)  # Make an API request.
#     print("Table {} already exists.".format(table_id))
# except NotFound:
#     print("Table {} is not found.".format(table_id))
#     bq_client.create_table(table_id)
#     print("Creation of the Table {}.".format(table_id))


# def load_data_to_bigquery(client, dataset: str = None, table: str = None, df: pd.DataFrame = None, key_column: str = 'Datetime', table_id: str = None):
   
    
#     # Fonction pour détecter et convertir les types de données
#     def convert_column_types(df):
#         for column in df.columns:
#             dtype = df[column].dtype

#             if dtype == 'object':  # Chaînes de caractères
#                 df[column] = df[column].astype(str)
#             elif dtype == 'datetime64[ns]':  # Datetime
#                 df[column] = pd.to_datetime(df[column], errors='coerce').dt.tz_localize('UTC', ambiguous='NaT').dt.tz_localize(None)
#             elif dtype == 'float64':  # Float
#                 df[column] = df[column].astype('float32')
#             elif dtype == 'int64':  # Integer
#                 df[column] = df[column].astype('int64')  # On garde int64, car BigQuery supporte ce type
#             else:
#                 # Autres types, on les convertit en string
#                 df[column] = df[column].astype(str)
        
#         return df

#     # Convertir les types des colonnes
#     df = convert_column_types(df)

#     if table_id:
#         # Si table_id est fourni, on l'utilise directement.
#         full_table_id = table_id
#     elif dataset and table:
#         # Si table_id n'est pas fourni, on construit table_id à partir de dataset et table.
#         full_table_id = f"{client.project}.{dataset}.{table}"
#     else:
#         raise ValueError("Il faut fournir soit 'table_id' ou les paramètres 'dataset' et 'table' séparés.")

#     # Vérifier si le dataset existe, sinon le créer
#     try:
#         dataset = full_table_id.split('.')[1]
#         client.get_dataset(dataset)  # Vérifie si le dataset existe
#         print(f"Le dataset {dataset} existe déjà.")
#     except NotFound:
#         print(f"Le dataset {dataset} n'existe pas. Création du dataset...")
#         client.create_dataset(dataset)  # Crée le dataset s'il n'existe pas
#         print(f"Le dataset {dataset} a été créé.")

#     # Vérifier si la table existe, sinon la créer
#     try:
#         client.get_table(full_table_id)  # Vérifie si la table existe
#         print(f"La table {full_table_id} existe déjà.")
#     except NotFound:
#         print(f"La table {full_table_id} n'existe pas. Création de la table...")
#         # Créer la table avec le schéma du DataFrame
#         schema = []
#         for name, dtype in df.dtypes.items():
#             if name == 'Datetime':
#                 schema.append(bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.TIMESTAMP))
#             elif dtype == 'float32' or dtype == 'float64':
#                 schema.append(bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.FLOAT64))
#             elif dtype == 'int64':
#                 schema.append(bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.INTEGER))
#             else:
#                 schema.append(bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.STRING))
        
#         # Créer la table avec le schéma
#         table = bigquery.Table(full_table_id, schema=schema)
#         client.create_table(table)  # Crée la table si elle n'existe pas
#         print(f"La table {full_table_id} a été créée.")

#     # Préparer les données pour l'insertion
#     if key_column in df.columns:
#         # Si la colonne clé est fournie, supprimer les doublons en fonction de cette colonne
#         df = df.drop_duplicates(subset=[key_column])

#     # Charger les données dans la table BigQuery
#     job_config = bigquery.LoadJobConfig(
#         schema=[
#             bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.TIMESTAMP) if name == 'Datetime' else
#             bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.FLOAT64) if dtype == 'float32' or dtype == 'float64' else
#             bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.INTEGER) if dtype == 'int64' else
#             bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.STRING)
#             for name, dtype in df.dtypes.items()
#         ],
#         write_disposition="WRITE_APPEND"  # Ajoute les données sans écraser les anciennes
#     )

#     # Charger le DataFrame dans BigQuery
#     job = client.load_table_from_dataframe(df, full_table_id, job_config=job_config)
#     job.result()  # Attendre la fin de la tâche
#     print(f"Données chargées dans la table {full_table_id}.")

# # Exemple d'appel à la fonction
# load_data_to_bigquery(table_id=table_id, client=bq_client, df=df_merged)


PostgreSQL

In [None]:
df_merged.columns