In [1]:
import boto3
from botocore import UNSIGNED
from botocore.config import Config
from boto3.s3.transfer import TransferConfig
import xarray as xr
import numpy as np
import pandas as pd

import requests
from bs4 import BeautifulSoup
import re

from datetime import datetime, timedelta
import os
import shutil
import time

import concurrent.futures
import glob

In [2]:
places_coordinates = {
    "Europe": {'lat_min': 35.25, 'lat_max': 71.75, 'lon_min': -10.25, 'lon_max': 31},
    "Canada": {'lat_min': 44.0, 'lat_max': 70.0, 'lon_min': -141.5, 'lon_max': -51.5}, 
    "US": {'lat_min': 24, 'lat_max': 49, 'lon_min': -125.5, 'lon_max': -65.5},
    "BlackSea": {'lat_min': 41.25, 'lat_max': 57.0, 'lon_min':22.75, 'lon_max': 61.5},
    "Australia": {'lat_min': -39, 'lat_max': -10.5, 'lon_min': 112.75, 'lon_max': 153.25},
    "Argentina": {'lat_min': -56.25, 'lat_max': -21.75, 'lon_min': -75.5, 'lon_max': -51.75}
}

In [3]:
def find_dates(year, max_retries=5, start_date=False):
    retries = 0
    while retries < max_retries:
        try:
            r = requests.get(f'https://tds.gdex.ucar.edu/thredds/catalog/files/g/d084001/{year}/catalog.html', timeout=10)
            break
        except requests.exceptions.RequestException as e:
            retries += 1
            time.sleep(5)
            
    soup = BeautifulSoup(r.text, "html.parser")
    dates = [f.get_text() for f in soup.find_all('code') if re.fullmatch(r"\d{8}", f.get_text())]
    if start_date:
        return [d for d in dates if datetime.strptime(d, '%Y%m%d') >= datetime.strptime(str(start_date), '%Y%m%d')]
    else:
        return dates

In [4]:
def find_files(year, date, run, forecast_horizon, max_retries=5):
    retries = 0
    while retries < max_retries:
        try:
            r = requests.get(f'https://tds.gdex.ucar.edu/thredds/catalog/files/g/d084001/{year}/{date}/catalog.html')
            break
        except requests.exceptions.RequestException as e:
            retries += 1
            time.sleep(5)
            
    soup = BeautifulSoup(r.text, "html.parser")
    files =  [f.get_text() for f in soup.find_all('code') if f.get_text()[-6:] == '.grib2']
    current_run = [f for f in files if f.split('.')[2][-2:] == run  # check if run is 00
                   and int(f.split('.f')[1][:3]) > 0 #remove the first data, where data is initial values of the model and not forecast
                   and int(f.split('.f')[1][:3]) <= forecast_horizon] #where files are max forecast horizon
    return current_run

In [5]:
def download_single_file(year, date, file, coords, filepath, max_retries=5):
    forecast_hour = int(file.split('.f')[1][:3])
    forecast_hour_timedelta = timedelta(hours=forecast_hour)
    ref_date =  datetime.strptime(file.split('.')[2], "%Y%m%d%H")
    final_dt = ref_date + forecast_hour_timedelta
    iso_date = final_dt.strftime("%Y-%m-%dT%H:%M:%SZ")        

    filename = f"gfs.0p25.{iso_date.replace(':','-')}.nc"

    endpoint = "https://tds.gdex.ucar.edu/thredds/ncss/grid/files/g/d084001/"    
    
    var_precip = [
        'Total_precipitation_surface_3_Hour_Accumulation',
        'Total_precipitation_surface_6_Hour_Accumulation',
        'Total_precipitation_surface_Mixed_intervals_Accumulation'
    ]

    for attempt in range(max_retries):
        for var in var_precip:
            try:
                url = (
                    f"{year}/{date}/{file}"
                    f"?var=Temperature_height_above_ground"
                    f"&var={var}"
                    f"&north={coords['lat_max']}&west={coords['lon_min']}"
                    f"&east={coords['lon_max']}&south={coords['lat_min']}"
                    f"&horizStride=1"
                    f"&time_start={iso_date}&time_end={iso_date}"
                    f"&accept=netcdf3"
                )

                r = requests.get(endpoint + url, timeout=10)
                r.raise_for_status()
                content_type = r.headers.get("Content-Type", "").lower()
                if "netcdf" in content_type and len(r.content) > 1000:
                    with open(os.path.join(filepath, filename), "wb") as f:
                        f.write(r.content)
                    return True
                else:
                    continue

            except requests.exceptions.RequestException:
                continue

        time.sleep(1)

    print("Max retries reached. Download failed for", file)
    return False

In [6]:
def process_files(year, date, place):
    time_vars = ['time1', 'time2']
    height_vars = ['height_above_ground1', 'height_above_ground2', 'height_above_ground3']
    precip_vars = ['Total_precipitation_surface_3_Hour_Accumulation', 'Total_precipitation_surface_6_Hour_Accumulation', 'Total_precipitation_surface_Mixed_intervals_Accumulation']
    time_bounds_vars = ['time1_bounds', 'time2_bounds']

    def fix_coords(ds):
        if ds.longitude.max() > 180:
            ds = ds.assign_coords(longitude=((ds.longitude + 180) % 360) - 180)
            ds = ds.sortby("longitude")

        return ds
    
    def normalize_time(ds, var):
        time_dim_candidates = [d for d in ds.dims if d.startswith('time')]
        if not time_dim_candidates:
            raise ValueError("Aucune dimension time trouvée")
        main_time = [d for d in ds[var].dims if d.startswith('time')][0]
        extra_dims = [d for d in time_dim_candidates if d != main_time]
        if extra_dims:
            ds = ds.drop_dims(extra_dims)
        ds = ds.rename({main_time: 'time'})
        return ds
    
    def process_temp(ds):    
        for h in height_vars:
            if h in ds.coords:
                ds = ds.rename({h: 'height_above_ground'})
                break

        if 'Temperature_height_above_ground' in ds.data_vars:
            ds = ds.rename_vars({'Temperature_height_above_ground': 't2m'})

        ds = normalize_time(ds, 't2m')        
        ds = fix_coords(ds)
        
        return ds
    
    def process_precip(ds):
        for var in precip_vars:
            if var in ds.data_vars:
                ds = ds.rename_vars({var: 'tp'})
                break        
        for tb in time_bounds_vars:
            if tb in ds.data_vars:
                ds = ds.rename_vars({tb: 'time_bounds'})
                break

        ds = normalize_time(ds, 'tp')
        ds = fix_coords(ds)
        
        return ds      
    
    ds_temp = xr.open_mfdataset(f'{place}/{year}/{date}/*.nc', engine='netcdf4', preprocess=process_temp)
    ds_precip = xr.open_mfdataset(f'{place}/{year}/{date}/*.nc', engine='netcdf4', preprocess=process_precip)

    ds_precip['time'] = ds_temp.time

    ds = xr.merge([ds_temp, ds_precip])
    t2m_mean = ds.get('t2m').sel(height_above_ground=2).mean(dim='time')

    precip = ds.get(['tp', 'time_bounds'])
    precip_height = [d for d in precip.dims if d.startswith('height_above_ground')]
    if len(precip_height) > 0:
        precip = precip.isel({precip_height[0]:2})
    precip = precip.sel(time=ds.time.dt.hour % 6 == 0)
    bounds = [bound[1] - bound[0] for bound in precip.time_bounds.values]
    bounds_6h = all(x == np.timedelta64(21600000000000, 'ns') for x in bounds)

    if bounds_6h:
        precip_sum = precip.get('tp').sum(dim='time')
    else:
        precip_sum = None

    t2m_df = t2m_mean.to_dataframe().set_index('reftime', append=True)
    t2m_df['t2m'] = t2m_df['t2m'] - 273.15
    precip_df = precip_sum.to_dataframe().set_index('reftime', append=True)

    df = pd.concat([t2m_df, precip_df], axis=1)
    if df.empty:
        print(f'Empty df for {place}/{year}/{date}')
        return 0
    df = df[['t2m', 'tp']]
    df = df.to_parquet(f'{place}/{year}/{date}_processed.parquet')

In [None]:
years = [i for i in range(2025, 2026)]
for year in years:
    dates = find_dates(year, start_date=20251106)
    for date in dates:
        files = find_files(year, date, '00', 168)
        for place, coords in places_coordinates.items():
            filepath = f'{place}/{year}/{date}'
            os.makedirs(filepath, exist_ok=True)
            files_tuple = [(year, date, file, coords, filepath) for file in files]
            # Télécharger en parallèle
            with concurrent.futures.ThreadPoolExecutor(max_workers=14) as executor:
                futures = [executor.submit(download_single_file, year_, date_, file_, coords_, filepath_) for year_, date_, file_, coords_, filepath_ in files_tuple]
                results = [f.result() for f in futures]
            # Vérifier que tout s'est bien passé
            if all(results):
                for variable in ['Temp', 'Precip']:
                    process_files(year, date, place)
            else:
                failed_files = [file for file, success in zip(files, results) if not success]
                print("Downloads failed for files:", failed_files)

AWS

In [3]:
GFS_BUCKET_NAME = "noaa-gfs-bdp-pds"
FORECAST_CYCLE = "00"
FORECAST_MODEL = "atmos"
FILE_TYPE = "pgrb2"
GRID_RESOLUTION = "0p25" 

TEMP_KEYS = {'filter_by_keys': {'typeOfLevel': 'heightAboveGround', 'level': 2, 'cfVarName': 't2m'}, 'indexpath': ''}
PRECIP_KEYS = {'filter_by_keys': {'typeOfLevel': 'surface', 'cfVarName': 'tp'}, 'indexpath': ''}

In [4]:
def get_files(s3, date):    
    folder = f"gfs.{date}/{FORECAST_CYCLE}/{FORECAST_MODEL}/"
    try:
        response = s3.list_objects_v2(Bucket=GFS_BUCKET_NAME, Prefix=folder, Delimiter='/')
        if not 'Contents' in response.keys():
            folder = f"gfs.{date}/{FORECAST_CYCLE}/"
            response = s3.list_objects_v2(Bucket=GFS_BUCKET_NAME, Prefix=folder, Delimiter='/')
    except Exception as e:
        print(f"Error accessing S3: {e}")

    pgrb2_files = [obj['Key'] for obj in response['Contents'] if f"gfs.t{FORECAST_CYCLE}z.{FILE_TYPE}.{GRID_RESOLUTION}.f" in obj['Key']]
    file_res = [file for file in pgrb2_files if file[-4:] != ".idx"]
    file_horizon = [
        file for file in file_res
        if 0 < (h := int(file.split('.')[5][-3:])) <= 168
        and (
            (h <= 84 and h % 3 == 0) or
            (h > 84 and h % 6 == 0)
        )
    ]
                    
    return file_horizon

In [34]:
def download_file(s3, file, filepath):
    try:
        filename = file.split('/')[-1]
        os.makedirs(filepath, exist_ok=True)
        s3.download_file(GFS_BUCKET_NAME, file, f'{filepath}/{filename}')
        return True
    except Exception as e:
        print(f"Error downloading {file}: {e}")
        return False

In [None]:
def process_files(year, date, places):
    files = sorted(glob.glob(f'{year}/{date}/*'))  

    acc = None
    n = 0

    for f in files:
        #ds = xr.open_dataset(f, engine='cfgrib', backend_kwargs=kwargs, decode_timedelta=True)
        ds_t2m = xr.open_dataset(f, engine="cfgrib", backend_kwargs=TEMP_KEYS, decode_timedelta=True)
        ds_tp = xr.open_dataset(f, engine="cfgrib", backend_kwargs=PRECIP_KEYS, decode_timedelta=True)
        ds = xr.merge([ds_t2m, ds_tp])
        
        if ds.longitude.max() > 180:
            ds = ds.assign_coords(longitude=((ds.longitude + 180) % 360) - 180)
            ds = ds.sortby("longitude")

        if acc is None:
            acc = ds.copy(deep=False)
        else:
            acc += ds

        n += 1

    t2m_mean = acc['t2m'] / n
    t2m_mean -= 273.15
    precip_sum = acc['tp']

    for place, coords in places.items():
        sl = dict(
            latitude=slice(coords['lat_max'], coords['lat_min']),
            longitude=slice(coords['lon_min'], coords['lon_max'])
        )
        t2m_sliced = t2m_mean.sel(**sl)
        precip_sliced = precip_sum.sel(**sl)
    
        df_t2m = t2m_sliced.to_dataframe().reset_index()
        df_t2m = df_t2m[['time', 'latitude', 'longitude', 't2m']]

        df_precip = precip_sliced.to_dataframe().reset_index()
        df_precip = df_precip[['time', 'latitude', 'longitude', 'tp']]
        
        if df_t2m.empty or df_precip.empty:
            raise ValueError(f"{variable}, {date}, one of the 2 df is empty")

        os.makedirs(f"Temp/{place}/{year}/{date}", exist_ok=True)
        os.makedirs(f"Precip/{place}/{year}/{date}", exist_ok=True)
        df_t2m.to_parquet(f"Temp/{place}/{year}/{date}_processed.parquet", compression="snappy")
        df_precip.to_parquet(f"Precip/{place}/{year}/{date}_processed.parquet", compression="snappy")

In [26]:
def get_dates(s3):
    paginator = s3.get_paginator("list_objects_v2")
    pattern = re.compile(r"gfs\.\d{8}/$")
    gfs_dates = []

    for page in paginator.paginate(Bucket=GFS_BUCKET_NAME, Delimiter="/"):
        for cp in page.get("CommonPrefixes", []):
            prefix = cp["Prefix"]
            if pattern.match(prefix):
                file_name = prefix.rstrip("/")
                gfs_dates.append(file_name.split('.')[1])
    return gfs_dates

In [38]:
s3 = boto3.client('s3', config=Config(signature_version=UNSIGNED))
dates = get_dates(s3)
for date in dates:
    files = get_files(s3, date)
    year = files[0].split('/')[0][4:8]
    files_tuple = [(file, f'{year}/{date}') for file in files]
    with concurrent.futures.ThreadPoolExecutor(max_workers=8) as executor:
        futures = [executor.submit(download_file, s3, file, path) for file, path in files_tuple]
        results = [f.result() for f in futures]
    if all(results):
        print("ok")
        process_files(year, date, places_coordinates)
        shutil.rmtree(f'{year}/{date}')
    else:
        failed_files = [file for file, success in zip(files, results) if not success]
        print("Downloads failed for files:", failed_files)
    break

ok


In [None]:
import plotly.express as px
import pandas as pd

df = pd.read_parquet('Precip/Europe/2020/20200101_processed.parquet')

fig = px.scatter_geo(
    df,
    lat="latitude",
    lon="longitude",
    color="Total_precipitation_surface_Mixed_intervals_Accumulation",
    labels={"Total_precipitation_surface_Mixed_intervals_Accumulation": "Total precip"},
    color_continuous_scale=px.colors.sequential.Viridis
)
fig.update_geos(fitbounds="locations")
fig.update_layout(title="Total precip", title_x=0.5)
fig.show()