In [None]:
%pip install -q -r requirements.txt

In [103]:
from imports import *
from functions import *
from IPython.core.display import *

In [104]:
path_postgresql_creds = r"C:\Users\f.gionnane\Documents\Data Engineering\Credentials\postgresql_creds.json"
with open(path_postgresql_creds, 'r') as file:
    content = json.load(file)
    user = content["user"]
    password = content["password"]
    host = content["host"]
    port = content["port"]

db = "MyProjects"
schema = "End_To_End_Oceanography_ML"

# Créer l'engine PostgreSQL
engine = create_engine(f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{db}")
conn = engine.connect()

Get Available Stations ID List
Filter Dysfunctional Stations

In [105]:
# get all stations and some metadata as a Pandas DataFrame
stations_df = api.stations()
# parse the response as a dictionary
stations_df = api.stations(as_df=True)
stations_df.head()

Unnamed: 0,Station,Hull No./Config and Location,Location Lat/Long,Wind Speed,Wind Direction,Sea Level Pressure,Wave Height,Dominant Period,Air Temp,Water Temp,Dew Point,Remark
0,41001,East Hatteras,34.70N 72.23W,Sensor/system failure.,Sensor/system failure.,Sensor/system failure.,Sensor/system failure.,Sensor/system failure.,Sensor/system failure.,Sensor/system failure.,Sensor/system failure.,"Buoy recovered 10/26/24, data release stopped."
1,41002,3DV33 (SC) South Hatteras,31.75N 74.93W,Sensor/system failure.,Sensor/system failure.,Sensor/system failure.,Sensor/system failure.,Sensor/system failure.,Sensor/system failure.,Sensor/system failure.,Sensor/system failure.,Buoy adrift 12/30/24.
2,41004,3DV02 (SC) Edisto,32.50N 79.08W,100,100,100,100,100,100,99,97,Dewpoint is intermittent.
3,41008,3D36 (SC) Grays Reef,31.40N 80.85W,97,97,100,99,99,97,99,96,All data is intermittent after 8/18/23.
4,41009,3D65 (SC) Canaveral,28.50N 80.18W,100,100,100,1,1,95,95,95,Wave data ceased 3/6/25. Water temperature cea...


In [106]:
access_error_url_list = []

# Liste de mots à rechercher dans la colonne "Remark"
blacklist = ["Failure", "ceased", "failed", "recovered", "stopped", 'adrift']
stations_id_set = set()

print(f'Avant Filtre: {stations_df.shape[0]}')

# Liste pour collecter les indices à supprimer
indices_a_supprimer = []

# Parcours des lignes de la DataFrame
for idx, row in stations_df.iterrows():
    station_id = row["Station"]
    station_Location = row["Hull No./Config and Location"]  # Extraire la valeur de la cellule pour chaque ligne
    
    # Extraction du nom de la station si un ")" est trouvé
    if ")" in station_Location:
        station_name = station_Location.split(')')[1].rstrip(" )")  # On enlève l'espace et la parenthèse en fin de chaîne
    else:
        station_name = station_Location.strip()  # Si pas de ")", on garde toute la chaîne

    station_name = station_name.rstrip(" )").replace("(", "").replace(")", "").strip()

    # Nettoyage final pour enlever toute parenthèse ou espace en fin de nom
    station_name = station_name.rstrip(" )")

    # Vérifier si "Remark" n'est pas NaN et si un des éléments de blacklist est dans "Remark"
    if isinstance(row["Remark"], str) and any(blacklist_word.lower() in row["Remark"].lower() for blacklist_word in blacklist):
        # Ajouter l'index à la liste
        indices_a_supprimer.append(idx)
    else:
        try:
            # Effectuer l'appel API
            buoy_data = NDBC.realtime_observations(station_id)
            
            # Vérifier si les données de l'API sont valides (si le DataFrame n'est pas vide)
            if not buoy_data.empty:
                print(f'Buoy {station_id}: {station_name} passed the Remarks and API Test!')
                stations_id_set.add(station_id)
            else:
                print(f'Buoy {station_id}: {station_name} did not return valid data. Deleting.')
                indices_a_supprimer.append(idx)

        except Exception as e:
            # Si l'erreur est un HTTPError, on peut essayer d'afficher le code d'erreur
            if isinstance(e, HTTPError):
                print(f'Buoy {station_id}: {station_name} API Call returned {e.code}. Deleting.')
            else:
                # Dans tous les autres cas d'exception, on affiche le message d'erreur complet
                print(f'Buoy {station_id}: {station_name} API Call encountered an error. Deleting.')
                
                if str(e).startswith("Error accessing"):
                    url = f"https://www.ndbc.noaa.gov/station_page.php?station={station_id}"
                    access_error_url_list.append([station_id, url])
            # Ajouter l'index à la liste en cas d'erreur
            indices_a_supprimer.append(idx)

# Supprimer les lignes après la boucle
stations_df.drop(index=indices_a_supprimer, inplace=True)

print(f'Après Filtre: {stations_df.shape[0]}')

Avant Filtre: 146
Buoy 41004: Edisto passed the Remarks and API Test!
Buoy 41008: Grays Reef passed the Remarks and API Test!
Buoy 41010: Canaveral East passed the Remarks and API Test!
Buoy 41013: Frying Pan Shoals, Nc buoy passed the Remarks and API Test!
Buoy 41040: North Equatorial One passed the Remarks and API Test!
Buoy 41043: Ne Puerto Rico passed the Remarks and API Test!
Buoy 41044: Ne St Martin passed the Remarks and API Test!
Buoy 41049: South Bermuda passed the Remarks and API Test!
Buoy 42001: Mid Gulf passed the Remarks and API Test!
Buoy 42002: West Gulf passed the Remarks and API Test!
Buoy 42003: East Gulf API Call encountered an error. Deleting.
Buoy 42012: Orange Beach, Al passed the Remarks and API Test!
Buoy 42019: Freeport, Tx passed the Remarks and API Test!
Buoy 42020: Corpus Christi, Tx passed the Remarks and API Test!
Buoy 42035: Galveston, Tx passed the Remarks and API Test!
Buoy 42036: West Tampa passed the Remarks and API Test!
Buoy 42056: Yucatan Basin pa

In [107]:
for item in access_error_url_list:
    print(f"Access error for buoy {item[0]}")
    print(f"{item[1]}\n")

Access error for buoy 42003
https://www.ndbc.noaa.gov/station_page.php?station=42003

Access error for buoy KTNF1
https://www.ndbc.noaa.gov/station_page.php?station=KTNF1

Access error for buoy PTGC1
https://www.ndbc.noaa.gov/station_page.php?station=PTGC1

Access error for buoy SMKF1
https://www.ndbc.noaa.gov/station_page.php?station=SMKF1



In [108]:
stations_id_list = list(stations_id_set)

for item in stations_id_list:
    metadata = get_station_metadata(item)
    station_name, station_id, station_zone, lat_buoy, lon_buoy, marine_data_table_name = parse_buoy_json(metadata)
    print(f"Name: {station_name}")
    print(f"Station ID: {station_id}")
    print(f"Lat: {lat_buoy}")
    print(f"Lon: {lon_buoy}")
    print(f"Zone: {station_zone}")
    print(f"Base Table Name: {marine_data_table_name}\n")

Name: station 41040
Station ID: 41040
Lat: 14.536
Lon: -53.136
Zone: north equatorial one
Base Table Name: station_41040_north_equatorial_one_14_536_-53_136

Name: station ffia2
Station ID: FFIA2
Lat: 57.272
Lon: -133.63
Zone: five fingers, ak
Base Table Name: station_ffia2_five_fingers_ak_57_272_-133_63

Name: station 44027
Station ID: 44027
Lat: 44.284
Lon: -67.301
Zone: jonesport, me
Base Table Name: station_44027_jonesport_me_44_284_-67_301

Name: station 44009
Station ID: 44009
Lat: 38.46
Lon: -74.692
Zone: delaware bay 26 nm southeast of cape may, nj
Base Table Name: station_44009_delaware_bay_26_nm_southeast_of_cape_may_nj_38_46_-74_692

Name: station 46014
Station ID: 46014
Lat: 39.225
Lon: -123.98
Zone: pt arena
Base Table Name: station_46014_pt_arena_39_225_-123_98

Name: station 46072
Station ID: 46072
Lat: 51.645
Lon: -172.145
Zone: central aleutians 230 nm sw dutch harbor
Base Table Name: station_46072_central_aleutians_230_nm_sw_dutch_harbor_51_645_-172_145

Name: station

In [109]:
metadata = get_station_metadata("FFIA2")
station_name, station_id, station_zone, lat_buoy, lon_buoy, marine_data_table_name = parse_buoy_json(metadata)
print(station_zone)
for key, value in metadata.items():
    print(key)
    print(value)
    print(" ")

five fingers, ak
Barometer elevation
21.3 m above mean sea level
 
Anemometer height
22 m above site elevation
 
Air temp height
4.3 m above site elevation
 
Site elevation
6.7 m above mean sea level
 
Location
57.272 N 133.630 W (57°16'18" N 133°37'48" W)
 
Statation Type
Owned and maintained by National Data Buoy Center, C-MAN Station, Sutron XPERT payload
 
Name
Station FFIA2  - Five Fingers, AK
 


In [110]:
stations_dict = {}

for station in stations_id_list:
    metadata = get_station_metadata(station)  # Récupérer les métadonnées de la station

    # Assurez-vous que les métadonnées sont correctement récupérées avant de les analyser
    if metadata:
        # Parser les métadonnées de la station
        station_name, station_id, station_zone, lat_buoy, lon_buoy, marine_data_table_name = parse_buoy_json(metadata)
        stations_dict[station_id] = {}

        # Remplir le dictionnaire avec les données traitées
        stations_dict[station_id]["Station Name"] = station_name
        stations_dict[station_id]["Lat"] = lat_buoy
        stations_dict[station_id]["Lon"] = lon_buoy
        stations_dict[station_id]["Zone"] = station_zone
        stations_dict[station_id]["Table Name"] = marine_data_table_name

        # Affichage pour chaque station
        print(f"Name: {station_name}")
        print(f"Station ID: {station_id}")
        print(f"Lat: {lat_buoy}")
        print(f"Lon: {lon_buoy}")
        print(f"Zone: {station_zone}")
        print(f"Base Table Name: {marine_data_table_name}\n")

# Résultat final du dictionnaire des stations
stations_dict

Name: station 41040
Station ID: 41040
Lat: 14.536
Lon: -53.136
Zone: north equatorial one
Base Table Name: station_41040_north_equatorial_one_14_536_-53_136

Name: station ffia2
Station ID: FFIA2
Lat: 57.272
Lon: -133.63
Zone: five fingers, ak
Base Table Name: station_ffia2_five_fingers_ak_57_272_-133_63

Name: station 44027
Station ID: 44027
Lat: 44.284
Lon: -67.301
Zone: jonesport, me
Base Table Name: station_44027_jonesport_me_44_284_-67_301

Name: station 44009
Station ID: 44009
Lat: 38.46
Lon: -74.692
Zone: delaware bay 26 nm southeast of cape may, nj
Base Table Name: station_44009_delaware_bay_26_nm_southeast_of_cape_may_nj_38_46_-74_692

Name: station 46014
Station ID: 46014
Lat: 39.225
Lon: -123.98
Zone: pt arena
Base Table Name: station_46014_pt_arena_39_225_-123_98

Name: station 46072
Station ID: 46072
Lat: 51.645
Lon: -172.145
Zone: central aleutians 230 nm sw dutch harbor
Base Table Name: station_46072_central_aleutians_230_nm_sw_dutch_harbor_51_645_-172_145

Name: station

{'41040': {'Station Name': 'station 41040',
  'Lat': 14.536,
  'Lon': -53.136,
  'Zone': 'north equatorial one',
  'Table Name': 'station_41040_north_equatorial_one_14_536_-53_136'},
 'FFIA2': {'Station Name': 'station ffia2',
  'Lat': 57.272,
  'Lon': -133.63,
  'Zone': 'five fingers, ak',
  'Table Name': 'station_ffia2_five_fingers_ak_57_272_-133_63'},
 '44027': {'Station Name': 'station 44027',
  'Lat': 44.284,
  'Lon': -67.301,
  'Zone': 'jonesport, me',
  'Table Name': 'station_44027_jonesport_me_44_284_-67_301'},
 '44009': {'Station Name': 'station 44009',
  'Lat': 38.46,
  'Lon': -74.692,
  'Zone': 'delaware bay 26 nm southeast of cape may, nj',
  'Table Name': 'station_44009_delaware_bay_26_nm_southeast_of_cape_may_nj_38_46_-74_692'},
 '46014': {'Station Name': 'station 46014',
  'Lat': 39.225,
  'Lon': -123.98,
  'Zone': 'pt arena',
  'Table Name': 'station_46014_pt_arena_39_225_-123_98'},
 '46072': {'Station Name': 'station 46072',
  'Lat': 51.645,
  'Lon': -172.145,
  'Zone'

In [95]:
def print_with_flush(message):
    sys.stdout.write(f'\r{message}  ')  # \r permet de revenir au début de la ligne
    sys.stdout.flush()  # Force l'affichage immédiat

Build Bronze Layer Table Names

In [33]:
bronze_tables_list = []

for item in station_table_mapping.values():
    bronze_table_name = f'bronze_{item["table name"]}'
    bronze_tables_list.append(bronze_table_name)
bronze_tables_list

['bronze_station_smkf1_sombrero_key_fl_24_628_-81_109',
 'bronze_station_46025_santa_monica_basin_33_755_-119_045',
 'bronze_station_46086_san_clemente_basin_32_499_-118_052',
 'bronze_station_46022_eel_river_40_716_-124_54',
 'bronze_station_41044_ne_st_martin_21_582_-58_63',
 'bronze_station_46071_western_aleutians_51_04_179_764',
 'bronze_station_46072_central_aleutians_230_nm_sw_dutch_harbor_51_645_-172_145',
 'bronze_station_42058_central_caribbean_14_512_-75_153',
 'bronze_station_mrka2_middle_rock_light_ak_61_082_-146_662',
 'bronze_station_46014_pt_arena_39_225_-123_98',
 'bronze_station_ffia2_five_fingers_ak_57_272_-133_63',
 'bronze_station_46001_western_gulf_of_alaska_56_296_-148_027',
 'bronze_station_sbio1_south_bass_island_oh_41_629_-82_841',
 'bronze_station_46069_south_santa_rosa_33_677_-120_213',
 'bronze_station_51002_southwest_hawaii_17_07_-157_755',
 'bronze_station_42001_mid_gulf_25_926_-89_662',
 'bronze_station_46084_cape_edgecumbe_56_614_-136_04',
 'bronze_stati

In [39]:
# Define possible data types to try
data_types = ['txt', 'drift', 'cwind', 'spec', 'ocean', 'srad', 'dart', 'supl', 'rain']

# Iterating over each key in station_table_mapping
for key in station_table_mapping:
    try:
        # Check the type of the key
        key_type = type(key)
        print(f"Key: {key}, Type: {key_type}")

        # Initialize a flag to track whether the data was fetched successfully
        data_fetched = False

        # Attempt to fetch data with different types if the first call fails
        for data_type in data_types:
            try:
                # Make an API call using the buoy ID as the key and the current data type
                print(f"Attempting to fetch data for buoy {key} with data type: {data_type}")
                buoy_data = NDBC.realtime_observations(key, data_type=data_type)
                
                # If the API call is successful, store the data and mark it as fetched
                station_table_mapping[key] = {"data": "success", "error": None, "data_type": data_type}
                print(f"API Call successful for buoy {key}. Data: success with type {data_type}\n")
                data_fetched = True
                break  # Exit the loop once a successful data fetch is made

            except HTTPError as e:
                # Log the error and continue trying the next data type
                print(f"HTTPError occurred for buoy {key} with data type {data_type}. Error: {str(e)}")
                continue  # Try the next data type

            except Exception as e:
                # Log any unexpected error and continue trying the next data type
                print(f"An unexpected error occurred for buoy {key} with data type {data_type}. Error: {str(e)}")
                continue  # Try the next data type

        # If no data was fetched after trying all data types, log the error
        if not data_fetched:
            station_table_mapping[key] = {"data": None, "error": "No data available for all data types", "data_type": None}
            print(f"Failed to fetch data for buoy {key} after trying all data types.\n")

    except Exception as e:
        # Log any top-level unexpected error for the buoy
        station_table_mapping[key] = {"data": None, "error": str(e), "data_type": None}
        print(f"An unexpected error occurred for buoy {key}. Error: {str(e)}")

# Print the final station_table_mapping dictionary with API data and errors
print("\nFinal station_table_mapping with API data and errors:")



Key: SMKF1, Type: <class 'str'>
Attempting to fetch data for buoy SMKF1 with data type: txt
An unexpected error occurred for buoy SMKF1 with data type txt. Error: Error accessing https://www.ndbc.noaa.gov/data/realtime2/SMKF1.txt
Server Error ( 404: Not Found)
Attempting to fetch data for buoy SMKF1 with data type: drift
An unexpected error occurred for buoy SMKF1 with data type drift. Error: Error accessing https://www.ndbc.noaa.gov/data/realtime2/SMKF1.drift
Server Error ( 404: Not Found)
Attempting to fetch data for buoy SMKF1 with data type: cwind
An unexpected error occurred for buoy SMKF1 with data type cwind. Error: Error accessing https://www.ndbc.noaa.gov/data/realtime2/SMKF1.cwind
Server Error ( 404: Not Found)
Attempting to fetch data for buoy SMKF1 with data type: spec
An unexpected error occurred for buoy SMKF1 with data type spec. Error: Error accessing https://www.ndbc.noaa.gov/data/realtime2/SMKF1.spec
Server Error ( 404: Not Found)
Attempting to fetch data for buoy SMK

In [40]:
for key, value in station_table_mapping.items():
    print(f"{key}: {value}")

SMKF1: {'data': None, 'error': 'No data available for all data types', 'data_type': None}
46025: {'data': 'success', 'error': None, 'data_type': 'txt'}
46086: {'data': 'success', 'error': None, 'data_type': 'txt'}
46022: {'data': 'success', 'error': None, 'data_type': 'txt'}
41044: {'data': 'success', 'error': None, 'data_type': 'txt'}
46071: {'data': 'success', 'error': None, 'data_type': 'txt'}
46072: {'data': 'success', 'error': None, 'data_type': 'txt'}
42058: {'data': 'success', 'error': None, 'data_type': 'txt'}
MRKA2: {'data': 'success', 'error': None, 'data_type': 'txt'}
46014: {'data': 'success', 'error': None, 'data_type': 'txt'}
FFIA2: {'data': 'success', 'error': None, 'data_type': 'txt'}
46001: {'data': 'success', 'error': None, 'data_type': 'txt'}
SBIO1: {'data': 'success', 'error': None, 'data_type': 'txt'}
46069: {'data': 'success', 'error': None, 'data_type': 'txt'}
51002: {'data': 'success', 'error': None, 'data_type': 'txt'}
42001: {'data': 'success', 'error': None, 

In [None]:
stations_id_list =list(stations_id_set)
buoy_chosen = random.choice(stations_id_list)

In [None]:
buoy_chosen_metadata = get_station_metadata(buoy_chosen)
buoy_chosen_metadata

In [None]:
# Exemple d'utilisation avec un dictionnaire 'buoy_chosen_metadata'
try:
    lat_buoy, lon_buoy, station_name, station_id, station_zone, marine_data_table_name = parse_buoy_json(buoy_chosen_metadata)
    print(lat_buoy, lon_buoy, station_name, station_id, station_zone, marine_data_table_name)
except ValueError as e:
    print(f"Erreur lors du traitement des données: {e}")

In [None]:
df_marine = NDBC.realtime_observations(buoy_chosen)
print(type(buoy_chosen_metadata))
df_marine.head()

Marine API

Get Data From Json

In [None]:
import requests
from bs4 import BeautifulSoup
from urllib.parse import urljoin
from IPython.core.display import display, HTML

# Remplacer `{station_id}` par l'identifiant de la station spécifique
url = f"https://www.ndbc.noaa.gov/station_page.php?station={station_id}"

# Faire une requête GET pour obtenir le HTML de la page
response = requests.get(url)

# Vérifier que la requête a réussi
if response.status_code == 200:
    # Parse le HTML avec BeautifulSoup
    soup = BeautifulSoup(response.text, 'html.parser')
    
    # Trouver la division avec l'ID 'stationmetadata'
    station_metadata = soup.find(id="stationmetadata")
    
    # Vérifier si la division existe
    if station_metadata:
        # Chercher les deux images spécifiques
        img_1 = station_metadata.find('img', src='/images/stations/3mfoam_scoop_mini.jpg')
        img_2 = station_metadata.find('img', src='/images/buoycam/W64A_2025_03_15_1510.jpg')

        # Si l'image 1 est trouvée, modifier son lien en absolu
        if img_1:
            img_1['src'] = urljoin(url, img_1['src'])

        # Si l'image 2 est trouvée, modifier son lien en absolu
        if img_2:
            img_2['src'] = urljoin(url, img_2['src'])
        
        # Afficher directement le HTML avec les liens des images mis à jour
        display(HTML(str(station_metadata)))  # Affiche la division en HTML rendu
    else:
        print("La division avec l'ID 'stationmetadata' n'a pas été trouvée.")
else:
    print(f"Erreur lors de la récupération de la page, statut: {response.status_code}")


In [None]:
coord_buoy_1 = (lat_buoy, lon_buoy)


map = folium.Map(location=[lat_buoy,lon_buoy], zoom_start=6)

folium.Marker(location=[lat_buoy, lon_buoy], popup=f"Chosen Buoy : {station_name}, lat :{lat_buoy},lon :{lon_buoy}").add_to(map)


Get Stations from Caribbean Zone

In [None]:
#### Prise en charge de plusieurs buoys à la fois
caribbean_df = stations_df[(stations_df['Lat'] >= 9) & 
                           (stations_df['Lat'] <= 25) & 
                           (stations_df['Lon'] >= -85) & 
                           (stations_df['Lon'] <= -60)]
caribbean_df.shape[0]

buoys_ids = []

for index, row in caribbean_df.iterrows():
    buoys_ids.append(row['Station'])  
print(len(buoys_ids))    

# Liste pour stocker les dataframes
all_dataframes = []

# Compteurs pour les bouées réussies et échouées
successful_buoy = 0
failed_buoy = 0

# Parcours de chaque bouée
for buoy_id in buoys_ids:
    try:
        # Récupère les observations en temps réel pour chaque bouée
        df_caribbean_buoy = NDBC.realtime_observations(buoy_id)

        # Ajoute le dataframe à la liste
        all_dataframes.append(df_caribbean_buoy)
        successful_buoy += 1

        # Efface et met à jour la ligne des bouées réussies
        print(f"\r✅ Bouées réussies : {successful_buoy}", end='', flush=True)

    except Exception:
        # Incrémente le compteur des échouées pour toutes les erreurs
        failed_buoy += 1

        # Efface et met à jour la ligne des bouées échouées
        print(f"\r❌ Bouées échouées : {failed_buoy}", end='', flush=True)

# Affichage final propre avec un saut de ligne
print("\n")

# Concaténation des dataframes si disponibles
if all_dataframes:
    final_df = pd.concat(all_dataframes, ignore_index=True)
    print(f"✔️ Concaténation terminée. Nombre de lignes : {final_df.shape[0]} \n Nombre de colonnes : {final_df.shape[1]}")
    print(f"✅ Bouées réussies : {successful_buoy}", end='', flush=True)
    print(f"❌ Bouées échouées : {failed_buoy}", end='', flush=True)

else:
    print("⚠️ Aucune donnée récupérée.")
    

In [None]:
df_marine = NDBC.realtime_observations(nearest)

# Afficher le résultat
print(df_marine.shape)
df_marine.head()

In [None]:
bronze_marine_data_table_name = f"Bronze_marine_data_{station_name.replace(' ', '_')}_{station_zone.replace(' ', '_')}_{str(lat_buoy).replace('.', '-')}_{str(lon_buoy).replace('.', '-')}"
print(bronze_marine_data_table_name)
load_data_in_table(db=db, schema=schema, table_name=bronze_marine_data_table_name, df=df_marine,conn=conn,key_column='time')

In [None]:
coordinates = [lat_buoy, lon_buoy]
df_meteo = meteo_api_request(coordinates=coordinates)
print(df_meteo.shape)
df_meteo.head()

In [None]:
# Création du nom de la table
bronze_meteo_data_table_name = f"bronze_meteo_data_{station_name.replace(' ', '_')}_{station_zone.replace(' ', '_')}_{str(lat_buoy).replace('.', '-')}_{str(lon_buoy).replace('.', '-')}"
bronze_meteo_data_table_name = bronze_meteo_data_table_name.replace('.', '-')
load_data_in_table(db=db, schema=schema, table_name=bronze_meteo_data_table_name, df=df_meteo,conn=conn,key_column='date')

In [None]:
with open("buoy_near_SM.json", "r") as f:
    buoy_near_SM = json.load(f)


path_postgresql_creds = r"C:\Users\f.gionnane\Documents\Data Engineering\Credentials\postgresql_creds.json"
with open(path_postgresql_creds, 'r') as file:
    content = json.load(file)
    user = content["user"]
    password = content["password"]
    host = content["host"]
    port = content["port"]

# Créer l'engine PostgreSQL
engine = create_engine(f"postgresql+psycopg2://{user}:{password}@{host}:{port}/{db}")
conn = engine.connect()

db = buoy_near_SM["db"]
schema = buoy_near_SM["schema"] 

bronze_marine_data_table_name = buoy_near_SM["bronze_marine"] 
bronze_meteo_data_table_name = buoy_near_SM["bronze_meteo"] 

In [None]:
try:
    df_marine_to_clean = fetch_table_data(conn=conn, schema=schema, table_name= bronze_marine_data_table_name)
    df_meteo_to_clean = fetch_table_data(conn=conn, schema=schema, table_name= bronze_meteo_data_table_name)
except Exception as e:
    print(e)

In [None]:
df_marine_to_clean = handle_null_values(df_marine_to_clean)

In [None]:
df_meteo_to_clean = auto_convert(df_meteo_to_clean)
df_meteo_to_clean.dtypes

In [None]:
df_marine_to_clean = handle_null_values(df_marine_to_clean)
df_meteo_to_clean = handle_null_values(df_meteo_to_clean)

In [None]:
print(df_marine_to_clean.columns)
print(df_meteo_to_clean.columns)

In [None]:
df_meteo_to_clean.head()

open-meteo API

In [None]:
df_meteo_to_clean.isna().sum()

In [None]:
df_meteo = drop_columns_if_exist(df_meteo,['rain', 'showers','soil_moisture_0_to_1cm', 'cloud_cover', 'soil_temperature_0cm',	'soil_moisture_0_to_1cm', 'is_day'])
df_meteo.head()

In [None]:
df_meteo.rename(columns={'temperature_2m': 'T°(C°)', 
                         'relative_humidity_2m': 'Relative Humidity (%)',
                         'dew_point_2m': 'Dew Point (°C)', 
                         'precipitation': 'Precipitation (mm)', 
                         'pressure_msl':' Sea Level Pressure (hPa)', 
                         'cloud_cover_low':'Low Clouds (%)',
                         'cloud_cover_mid' : 'Middle Clouds (%)',	
                         'cloud_cover_high' : 'High Clouds (%)', 
                         'visibility' : ' Visibility (%)', 
                         'wind_speed_10m' : 'Wind Speed (km/h)'}, 
                         inplace=True)
df_marine.rename(columns={
    'wind_direction': 'Wind Direction (°)',
    'wind_speed': 'Wind Speed (km/h)',
    'wind_gust': 'Wind Gusts (km/h)',
    'wave_height': 'Wave Height (m)',
    'average_wave_period': 'Average Wave Period (s)',
    'dominant_wave_direction': 'Dominant Wave Direction (°)',
    'pressure': 'Pressure (hPA)',
    'air_temperature': 'Air T°',
    'water_temperature': 'Water T°'}, 
    inplace=True)

print(df_meteo.columns)
print(df_marine.columns)
print(df_marine.shape)
print(df_meteo.shape)

Merging Dataframes

In [None]:
# Effectuer la jointure interne sur la colonne 'time'
df_merged = pd.merge(df_marine, df_meteo, on = 'Datetime', how='inner')

# Afficher le résultat
print(df_merged.shape)
print(df_merged.dtypes)
df_merged.head(20)

In [None]:
# Exemple d'utilisation
df_merged = add_daytime_and_month_column(df_merged,'Datetime')

print(df_merged.columns)
df_merged.head(10)

In [None]:
df_merged['Wind Speed (km/h)'] = (df_merged['Wind Speed (km/h)_x']+ df_merged['Wind Speed (km/h)_y'])/2
df_merged = drop_columns_if_exist(df_merged, ['Wind Speed (km/h)_x', 'Wind Speed (km/h)_y', 'Wind Gusts (km/h)'])

print(df_merged.columns)
df_merged.head()

Connexion BigQuery

In [None]:
# from google.oauth2 import service_account
# from google.cloud import storage  # Exemple pour Google Cloud Storage
# from google.cloud import bigquery
# from google.cloud.exceptions import NotFound
# import pandas as pd
# import pyarrow

# path_to_google_creds = r"C:\Users\f.gionnane\Documents\Data Engineering\Credentials\google_credentials.json"

# bq_client = bigquery.Client.from_service_account_json(
#     path_to_google_creds)
# project_id = "rare-bloom-419220"
# dataset_End_To_End_Oceanography_ML = "End_To_End_Oceanography_ML"
# bq_client
# # dataset_ref = bq_client.dataset('my_dataset_name', project=project_id)


# # LIST DATASETS AND FIND ONE
# datasets = list(bq_client.list_datasets())  # Make an API request.
# project = client.project
# bq_datasets_list =[]

# if datasets:
#     print("Datasets in project {}:".format(project))
#     for dataset in datasets:
#         print("\t{}".format(dataset.dataset_id))
#         bq_datasets_list.append(dataset.dataset_id)
#     if dataset_End_To_End_Oceanography_ML in bq_datasets_list:
#         dataset =  dataset_End_To_End_Oceanography_ML
#         print("Dataset Found !")
# else:
#     print("{} project does not contain any datasets.".format(project))

# # (developer): Set table_id to the ID of the table to determine existence.
# # table_id = "your-project.your_dataset.your_table"

# try:
#     table_ref = bq_client.dataset(dataset).table(table_name)
#     bq_client.get_table(table_ref)  # Make an API request.
#     print("Table {} already exists.".format(table_name))
# except NotFound:
#     print("Table {} is not found.".format(table_name))


# def clean_column_names(df):

#     cleaned_columns = []
#     for column in df.columns:
#         # Remplacer tous les caractères non alphanumériques (sauf underscores) par un underscore
#         cleaned_column = re.sub(r'[^A-Za-z0-9_]', '_', column)
        
#         # Ajouter le nom de colonne nettoyé à la liste
#         cleaned_columns.append(cleaned_column)
    
#     # Appliquer les nouveaux noms de colonnes au DataFrame
#     df.columns = cleaned_columns
#     return df

# df_merged = clean_column_names(df_merged)

# table_id = f"{project_id}.{dataset}.{table_name}"


# try:
#     bq_client.get_table(table_id)  # Make an API request.
#     print("Table {} already exists.".format(table_id))
# except NotFound:
#     print("Table {} is not found.".format(table_id))
#     bq_client.create_table(table_id)
#     print("Creation of the Table {}.".format(table_id))


# def load_data_to_bigquery(client, dataset: str = None, table: str = None, df: pd.DataFrame = None, key_column: str = 'Datetime', table_id: str = None):
   
    
#     # Fonction pour détecter et convertir les types de données
#     def convert_column_types(df):
#         for column in df.columns:
#             dtype = df[column].dtype

#             if dtype == 'object':  # Chaînes de caractères
#                 df[column] = df[column].astype(str)
#             elif dtype == 'datetime64[ns]':  # Datetime
#                 df[column] = pd.to_datetime(df[column], errors='coerce').dt.tz_localize('UTC', ambiguous='NaT').dt.tz_localize(None)
#             elif dtype == 'float64':  # Float
#                 df[column] = df[column].astype('float32')
#             elif dtype == 'int64':  # Integer
#                 df[column] = df[column].astype('int64')  # On garde int64, car BigQuery supporte ce type
#             else:
#                 # Autres types, on les convertit en string
#                 df[column] = df[column].astype(str)
        
#         return df

#     # Convertir les types des colonnes
#     df = convert_column_types(df)

#     if table_id:
#         # Si table_id est fourni, on l'utilise directement.
#         full_table_id = table_id
#     elif dataset and table:
#         # Si table_id n'est pas fourni, on construit table_id à partir de dataset et table.
#         full_table_id = f"{client.project}.{dataset}.{table}"
#     else:
#         raise ValueError("Il faut fournir soit 'table_id' ou les paramètres 'dataset' et 'table' séparés.")

#     # Vérifier si le dataset existe, sinon le créer
#     try:
#         dataset = full_table_id.split('.')[1]
#         client.get_dataset(dataset)  # Vérifie si le dataset existe
#         print(f"Le dataset {dataset} existe déjà.")
#     except NotFound:
#         print(f"Le dataset {dataset} n'existe pas. Création du dataset...")
#         client.create_dataset(dataset)  # Crée le dataset s'il n'existe pas
#         print(f"Le dataset {dataset} a été créé.")

#     # Vérifier si la table existe, sinon la créer
#     try:
#         client.get_table(full_table_id)  # Vérifie si la table existe
#         print(f"La table {full_table_id} existe déjà.")
#     except NotFound:
#         print(f"La table {full_table_id} n'existe pas. Création de la table...")
#         # Créer la table avec le schéma du DataFrame
#         schema = []
#         for name, dtype in df.dtypes.items():
#             if name == 'Datetime':
#                 schema.append(bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.TIMESTAMP))
#             elif dtype == 'float32' or dtype == 'float64':
#                 schema.append(bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.FLOAT64))
#             elif dtype == 'int64':
#                 schema.append(bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.INTEGER))
#             else:
#                 schema.append(bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.STRING))
        
#         # Créer la table avec le schéma
#         table = bigquery.Table(full_table_id, schema=schema)
#         client.create_table(table)  # Crée la table si elle n'existe pas
#         print(f"La table {full_table_id} a été créée.")

#     # Préparer les données pour l'insertion
#     if key_column in df.columns:
#         # Si la colonne clé est fournie, supprimer les doublons en fonction de cette colonne
#         df = df.drop_duplicates(subset=[key_column])

#     # Charger les données dans la table BigQuery
#     job_config = bigquery.LoadJobConfig(
#         schema=[
#             bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.TIMESTAMP) if name == 'Datetime' else
#             bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.FLOAT64) if dtype == 'float32' or dtype == 'float64' else
#             bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.INTEGER) if dtype == 'int64' else
#             bigquery.SchemaField(name, bigquery.enums.SqlTypeNames.STRING)
#             for name, dtype in df.dtypes.items()
#         ],
#         write_disposition="WRITE_APPEND"  # Ajoute les données sans écraser les anciennes
#     )

#     # Charger le DataFrame dans BigQuery
#     job = client.load_table_from_dataframe(df, full_table_id, job_config=job_config)
#     job.result()  # Attendre la fin de la tâche
#     print(f"Données chargées dans la table {full_table_id}.")

# # Exemple d'appel à la fonction
# load_data_to_bigquery(table_id=table_id, client=bq_client, df=df_merged)


PostgreSQL

In [None]:
df_merged.columns