In [None]:
import xarray as xr
import matplotlib.pyplot as plt
import pandas as pd

import cdsapi
from pathlib import Path
import time

# IBTrACS DataSet


- We restrict the dataset to time steps where the cyclone position is explicitly reported by the U.S. agency, since spatial localization is required for any subsequent coupling with gridded meteorological data.

In [None]:
ds = xr.open_dataset("data/IBTrACS.ALL.v04r01.nc")


In [None]:
# SID is a unique identifier for a storm in the dataset
print(len(ds["sid"].values) == len(set(ds["sid"].values)))

In [None]:
sources = [
    "usa", "tokyo", "cma", "hko", "kma",
    "newdelhi", "reunion", "bom",
    "nadi", "wellington"
]

rows = []

for src in sources:
    var = f"{src}_wind"
    if var in ds:
        n_storms = int(
            ds[var]
            .notnull()
            .any(dim="date_time")
            .sum()
            .values
        )
        rows.append({
            "source": src,
            "storms_covered": n_storms
        })

df_storms = pd.DataFrame(rows).sort_values("storms_covered", ascending=False)

# Plot
plt.figure()
plt.bar(df_storms["source"], df_storms["storms_covered"])
plt.xlabel("Source")
plt.ylabel("Number of storms")
plt.title("Storm Coverage")
plt.xticks(rotation=45)
plt.tight_layout()
plt.grid()
plt.show()


Several tropical cyclones are documented by multiple agencies simultaneously; therefore, the sum of per-agency storm counts exceeds the number of unique cyclones.
The USA storm coverage is the largest one, therefore it is the one on which we will restrict ourselves during the project.

In [None]:
import xarray as xr

# Data Loading
ds = xr.open_dataset("data/IBTrACS.ALL.v04r01.nc")

print(f"Number of storms : {len(ds.storm)}")

In [None]:
# Only keep relevant variable related to US sources
vars_usa = [
    "sid", "name", "season", "basin",
    "time",
    "usa_lat", "usa_lon",
    "usa_wind", "usa_pres",
    "storm_speed", "storm_dir"
]
ds_tc = ds[vars_usa]

# Filter when the position (lat, long) is NaN
ds_tc_filtered = ds_tc.where(
    ds_tc["usa_lat"].notnull()
    & ds_tc["usa_lon"].notnull(),
    drop=True
)

# Convert to a pandas DataFrame
df_tc_pd = (
    ds_tc_filtered
    .to_dataframe()
    .reset_index()
)

# Drop global estimate, the storm index column and rename
df_tc_pd = (
    df_tc_pd
    .drop(columns=["storm", "lat", "lon"])
    .rename(columns={
        "usa_lat": "lat",
        "usa_lon": "lon",
        "usa_wind": "wind",
        "usa_pres": "pressure",
        "time": "time_stamp",
    })
    .dropna(subset=["lat", "lon"])
    .reset_index(drop=True)
)

# Conversion in datetime pandas
df_tc_pd["time_stamp"] = pd.to_datetime(df_tc_pd["time_stamp"])
df_tc_pd["year"] = df_tc_pd["time_stamp"].dt.year
df_tc_pd["month"] = df_tc_pd["time_stamp"].dt.month
df_tc_pd["day"] = df_tc_pd["time_stamp"].dt.day
df_tc_pd["hour"] = df_tc_pd["time_stamp"].dt.hour

# Sort by the most recent
df = df_tc_pd.sort_values(
    by=["time_stamp", "sid"],
    ascending=[False, True]
)


df.head(20)

In [None]:
obs_per_cyclone = df["sid"].value_counts()

plt.figure()
plt.hist(obs_per_cyclone.values, bins=10)
plt.xlabel("Number of observations per cyclone")
plt.ylabel("Number of cyclones")
plt.title("Histogram of observations per cyclone")
plt.grid(True)
plt.show()

Some comments on the original acquisition methods of such data.

- For all of these they came from estimation based on observation retrieved on-ground, satelite data, drone or planes.

In [None]:
df_filtered = df[
    (df["time_stamp"].dt.year >= 2000) &
    (df["time_stamp"].dt.year < 2025)
].copy()

print(
    "Période retenue:",
    df_filtered["time_stamp"].dt.year.min(),
    "→",
    df_filtered["time_stamp"].dt.year.max(),
    "| obs:", len(df_filtered),
    "| cyclones:", df_filtered["sid"].nunique()
)


# ERA5 DataSet

In [None]:
def fetch_era5_for_row(
    row,
    out_dir="data/era5",
    buffer_deg=5.0,
    variables=None,
    max_available_time=pd.Timestamp("2025-12-10 22:00"),
):
    """
    Download ERA5 single-level data around one IBTrACS observation.
    """

    # Default variables to retrieve
    if variables is None:
        variables = [
            "mean_sea_level_pressure",
            "10m_u_component_of_wind",
            "10m_v_component_of_wind",
            "2m_temperature",
        ]

    # Sanity check for the time
    time = pd.to_datetime(row["time_stamp"])
    if time > max_available_time:
        return None

    # Cast Python types
    lat = float(row["lat"])
    lon = float(row["lon"])
    year = int(time.year)
    month = int(time.month)
    day = int(time.day)
    hour = int(time.hour)

    # Directory and file name
    out_dir = Path(out_dir)
    out_dir.mkdir(parents=True, exist_ok=True)

    fname = (
        f"era5_{row['sid'].decode() if isinstance(row['sid'], bytes) else row['sid']}_"
        f"{year}{month:02d}{day:02d}_{hour:02d}.nc"
    )
    out_path = out_dir / fname

    # Cache
    if out_path.exists():
        return out_path

    # Request ERA5
    c = cdsapi.Client()

    dataset = "reanalysis-era5-single-levels"
    request = {
        "product_type": ["reanalysis"],
        "variable": [
            "10m_u_component_of_wind",
            "10m_v_component_of_wind",
            "2m_temperature",
            "mean_sea_level_pressure"
        ],
        "year": ["2017"],
        "month": ["10"],
        "day": ["23", "24", "25"],
        "time": [
            "00:00", "01:00", "02:00",
            "03:00", "04:00", "05:00",
            "06:00", "07:00", "08:00",
            "09:00", "10:00", "11:00",
            "12:00", "13:00", "14:00",
            "15:00", "16:00", "17:00",
            "18:00", "19:00", "20:00",
            "21:00", "22:00", "23:00"
        ],
        "format": "netcdf",
        "area": [
            30, 
            -80, 
            10, 
            -60
            ]
    }

    c.retrieve(dataset, request, str(out_path))

    return out_path


def extract_era5_at_point(ds_era5, lat, lon):
    ds_point = ds_era5.sel(
        latitude=lat,
        longitude=lon,
        method="nearest"
    )

    return {
        "era5_msl": ds_point["msl"].values.item() / 100.0, 
        "era5_u10": ds_point["u10"].values.item(),
        "era5_v10": ds_point["v10"].values.item(),
        "era5_t2m": ds_point["t2m"].values.item(),
    }


def build_enriched_row(row, era5_features):
    """
    Combine a row from IBTrACS and ERA5 features in a single pandas row.
    """
    base = row.copy()

    for col in ["sid", "name", "basin"]:
        if col in base and isinstance(base[col], bytes):
            base[col] = base[col].decode()

    for k, v in era5_features.items():
        base[k] = v

    return pd.DataFrame([base])



def enrich_ibtracs_with_era5(
    df,
    max_rows=None,
    drop_failed=True,
    verbose=True,
    throttle_seconds=2.0, 
):
    """
    Loop on rows of IBTrACS
    """

    enriched_rows = []

    iterable = df.itertuples(index=False)
    if max_rows is not None:
        iterable = list(iterable)[:max_rows]

    for i, row in enumerate(iterable):
        row = pd.Series(row._asdict())

        if verbose and i % 10 == 0:
            print(f"[{i}] Processing {row.get('sid')} @ {row.get('time_stamp')}")

        try:
            path = fetch_era5_for_row(row)

            if path is None:
                if not drop_failed:
                    enriched_rows.append(pd.DataFrame([row]))
                continue

            with xr.open_dataset(path) as ds_era5:
                features = extract_era5_at_point(
                    ds_era5,
                    row["lat"],
                    row["lon"]
                )

            Path(path).unlink(missing_ok=True)

            df_row = build_enriched_row(row, features)
            enriched_rows.append(df_row)

        except Exception as e:
            if verbose:
                print(f"Failed for row {i}: {e}")

            if not drop_failed:
                enriched_rows.append(pd.DataFrame([row]))

            if throttle_seconds > 0:
                time.sleep(throttle_seconds)

    if len(enriched_rows) == 0:
        return pd.DataFrame()

    return pd.concat(enriched_rows, ignore_index=True)


In [None]:
df_era5_ready = df[df["time_stamp"] <= "2025-12-10 22:00"]

In [None]:
df_era5 = enrich_ibtracs_with_era5(
    df_era5_ready,
    max_rows=2,
    verbose=True
)

df_era5

### time series track

In [None]:
import time
from pathlib import Path
import numpy as np
import pandas as pd
import xarray as xr
import cdsapi

# =========================================================
# CONFIG GLOBALE
# =========================================================

ERA5_OUT = Path("data/era5_tests")
ERA5_OUT.mkdir(parents=True, exist_ok=True)

VARIABLES = [
    "10m_u_component_of_wind",
    "10m_v_component_of_wind",
    "2m_temperature",
    "mean_sea_level_pressure",
]

ERA5_VAR_MAP = {
    "10m_u_component_of_wind": "u10",
    "10m_v_component_of_wind": "v10",
    "2m_temperature": "t2m",
    "mean_sea_level_pressure": "msl",
}

GRID = (0.5, 0.5)      # résolution ERA5
BBOX_PAD = 2.0         # marge spatiale (degrés)

# =========================================================
# OUTILS DE BASE
# =========================================================

def normalize_lon_0_360(lon):
    lon = np.asarray(lon, dtype=float)
    return np.mod(lon, 360.0)

def compute_bbox(df, pad=BBOX_PAD):
    lat_min = float(df["lat"].min()) - pad
    lat_max = float(df["lat"].max()) + pad

    lon = normalize_lon_0_360(df["lon"].values)
    lon_min = float(lon.min()) - pad
    lon_max = float(lon.max()) + pad

    return [
        float(min(90.0, lat_max)),
        float(max(0.0, lon_min)),
        float(max(-90.0, lat_min)),
        float(min(360.0, lon_max)),
    ]  # [N, W, S, E]

def cds_day_list(days):
    return [f"{int(d):02d}" for d in sorted(set(days))]

def cds_time_list(hours):
    return [f"{int(h):02d}:00" for h in sorted(set(hours))]

# =========================================================
# ERA5 DOWNLOAD
# =========================================================

def download_era5(area, year, month, days, hours, out_nc, force=False):
    if out_nc.exists() and not force:
        return out_nc

    c = cdsapi.Client()
    req = {
        "product_type": "reanalysis",
        "variable": VARIABLES,
        "year": str(year),
        "month": f"{month:02d}",
        "day": cds_day_list(days),
        "time": cds_time_list(hours),
        "area": area,
        "grid": [GRID[0], GRID[1]],
        "data_format": "netcdf",
    }

    print(f"[ERA5] download {year}-{month:02d}")
    c.retrieve("reanalysis-era5-single-levels", req, str(out_nc))
    return out_nc

# =========================================================
# SAMPLING ERA5 -> IBTrACS
# =========================================================

def sample_era5_to_obs(nc_path, df):
    ds = xr.open_dataset(nc_path)
    time_dim = "time" if "time" in ds.dims else "valid_time"

    df = df.copy()
    df["lon_0360"] = normalize_lon_0_360(df["lon"].values)

    p_time = xr.DataArray(df["time_stamp"].values, dims="point")
    p_lat = xr.DataArray(df["lat"].values, dims="point")
    p_lon = xr.DataArray(df["lon_0360"].values, dims="point")

    ds_t = ds.sel({time_dim: p_time}, method="nearest")
    ds_p = ds_t.sel(latitude=p_lat, longitude=p_lon, method="nearest")

    era_vars_nc = [ERA5_VAR_MAP[v] for v in VARIABLES]
    era_df = ds_p[era_vars_nc].to_dataframe().reset_index(drop=True)
    era_df = era_df.rename(columns={v_nc: v for v, v_nc in ERA5_VAR_MAP.items()})

    return pd.concat([df.reset_index(drop=True), era_df], axis=1)

# =========================================================
# SUBSETS CONTRÔLÉS
# =========================================================

def subset_n_cyclones(df, n=100, seed=0):
    sids = (
        df.groupby("sid")
          .size()
          .sort_values(ascending=False)
          .head(n)
          .index
    )
    return df[df["sid"].isin(sids)].copy()

def subset_month(df, year, month):
    ts = pd.to_datetime(df["time_stamp"])
    return df[(ts.dt.year == year) & (ts.dt.month == month)].copy()

# =========================================================
# RUNNER DE TEST
# =========================================================

def run_controlled_test(df, label="test", force_download=False):
    df = df.copy()
    df["time_stamp"] = pd.to_datetime(df["time_stamp"])

    df["year"] = df["time_stamp"].dt.year
    df["month"] = df["time_stamp"].dt.month
    df["day"] = df["time_stamp"].dt.day
    df["hour"] = df["time_stamp"].dt.hour

    year = int(df["year"].iloc[0])
    month = int(df["month"].iloc[0])
    df = df[(df["year"] == year) & (df["month"] == month)].copy()

    area = compute_bbox(df)
    days = df["day"].unique()
    hours = df["hour"].unique()

    out_nc = ERA5_OUT / f"era5_{label}_{year}_{month:02d}.nc"

    print("\n====================================")
    print(f"TEST [{label}]")
    print("rows:", len(df))
    print("cyclones:", df['sid'].nunique())
    print("month:", f"{year}-{month:02d}")
    print("bbox:", area)
    print("====================================")

    t0 = time.perf_counter()
    download_era5(area, year, month, days, hours, out_nc, force=force_download)
    t1 = time.perf_counter()

    size_mb = out_nc.stat().st_size / (1024**2)
    print(f"download: {t1-t0:.2f}s | file: {size_mb:.1f} MB")

    merged = sample_era5_to_obs(out_nc, df)
    t2 = time.perf_counter()
    print(f"sample+merge: {t2-t1:.2f}s")

    nan_rate = merged[VARIABLES].isna().mean()
    print("NaN rates:")
    print(nan_rate)

    return merged


In [None]:
df_test = subset_n_cyclones(df_filtered, n=20)
m1 = run_controlled_test(df_test, label="n20")

In [None]:
m1

In [None]:
def augment_recent_years(
    df,
    start_year=2015,
    end_year=2025,
    n_cyclones_per_month=20,
    force_download=False,
):
    """
    Augmente le dataset IBTrACS avec ERA5 sur les années récentes,
    en contrôlant le nombre de cyclones par mois.
    """

    df = df.copy()
    df["time_stamp"] = pd.to_datetime(df["time_stamp"])

    results = []

    # boucle années / mois
    for year in range(start_year, end_year + 1):
        for month in range(1, 13):

            df_month = df[
                (df["time_stamp"].dt.year == year) &
                (df["time_stamp"].dt.month == month)
            ]

            if df_month.empty:
                continue

            # sélection des N cyclones les plus riches DU MOIS
            df_month = subset_n_cyclones(
                df_month,
                n=n_cyclones_per_month
            )

            label = f"y{year}_m{month:02d}_n{n_cyclones_per_month}"

            try:
                merged = run_controlled_test(
                    df_month,
                    label=label,
                    force_download=force_download
                )
                results.append(merged)

            except Exception as e:
                print(f"[SKIP] {label} → {e}")
                continue

    if not results:
        raise RuntimeError("Aucune donnée produite.")

    return pd.concat(results, ignore_index=True)


In [None]:
# RUN TEST on monthly API calls

df_aug = augment_recent_years(
    df_filtered,
    start_year=2020,
    end_year=2022,
    n_cyclones_per_month=15
)

df_aug.to_parquet("data/ibtracs_era5_augmented_recent.parquet")


### Yearly API call test

In [None]:
import numpy as np

ERA5_OUT = Path("data/era5_yearly_tests")
ERA5_OUT.mkdir(parents=True, exist_ok=True)

VARIABLES = [
    "10m_u_component_of_wind",
    "10m_v_component_of_wind",
    "2m_temperature",
    "mean_sea_level_pressure",
]

ERA5_VAR_MAP = {
    "10m_u_component_of_wind": "u10",
    "10m_v_component_of_wind": "v10",
    "2m_temperature": "t2m",
    "mean_sea_level_pressure": "msl",
}

GRID = (0.5, 0.5)      # résolution ERA5
BBOX_PAD = 2.0         # marge spatiale (degrés)

def normalize_lon_0_360(lon):
    lon = np.asarray(lon, dtype=float)
    return np.mod(lon, 360.0)

def compute_bbox(df, pad=BBOX_PAD):
    lat_min = float(df["lat"].min()) - pad
    lat_max = float(df["lat"].max()) + pad

    lon = normalize_lon_0_360(df["lon"].values)
    lon_min = float(lon.min()) - pad
    lon_max = float(lon.max()) + pad

    return [
        float(min(90.0, lat_max)),
        float(max(0.0, lon_min)),
        float(max(-90.0, lat_min)),
        float(min(360.0, lon_max)),
    ]  # [N, W, S, E]

def cds_day_list(days):
    return [f"{int(d):02d}" for d in sorted(set(days))]

def cds_time_list(hours):
    return [f"{int(h):02d}:00" for h in sorted(set(hours))]

def subset_top_cyclones_per_year(df, year, n=50):
    df_y = df[df["time_stamp"].dt.year == year].copy()

    sids = (
        df_y.groupby("sid")
            .size()
            .sort_values(ascending=False)
            .head(n)
            .index
    )

    return df_y[df_y["sid"].isin(sids)].copy()

def sample_era5_to_obs(nc_path, df):
    ds = xr.open_dataset(nc_path)
    time_dim = "time" if "time" in ds.dims else "valid_time"

    df = df.copy()
    df["lon_0360"] = normalize_lon_0_360(df["lon"].values)

    p_time = xr.DataArray(df["time_stamp"].values, dims="point")
    p_lat = xr.DataArray(df["lat"].values, dims="point")
    p_lon = xr.DataArray(df["lon_0360"].values, dims="point")

    ds_t = ds.sel({time_dim: p_time}, method="nearest")
    ds_p = ds_t.sel(latitude=p_lat, longitude=p_lon, method="nearest")

    era_vars_nc = [ERA5_VAR_MAP[v] for v in VARIABLES]
    era_df = ds_p[era_vars_nc].to_dataframe().reset_index(drop=True)
    era_df = era_df.rename(columns={v_nc: v for v, v_nc in ERA5_VAR_MAP.items()})

    return pd.concat([df.reset_index(drop=True), era_df], axis=1)

def run_yearly_test(
    df,
    year,
    n_cyclones=50,
    label=None,
    force_download=False,
):
    df = df.copy()
    df["time_stamp"] = pd.to_datetime(df["time_stamp"])

    df_y = subset_top_cyclones_per_year(df, year, n=n_cyclones)

    if df_y.empty:
        raise ValueError(f"Aucune donnée pour {year}")

    df_y["day"] = df_y["time_stamp"].dt.day
    df_y["month"] = df_y["time_stamp"].dt.month
    df_y["hour"] = df_y["time_stamp"].dt.hour

    area = compute_bbox(df_y)
    days = df_y["day"].unique()
    hours = df_y["hour"].unique()
    months = df_y["month"].unique()

    label = label or f"year{year}_n{n_cyclones}"
    out_nc = ERA5_OUT / f"era5_{label}.nc"

    print("\n====================================")
    print(f"YEAR TEST [{year}]")
    print("cyclones:", df_y["sid"].nunique())
    print("rows:", len(df_y))
    print("bbox:", area)
    print("====================================")

    t0 = time.perf_counter()

    c = cdsapi.Client()
    req = {
        "product_type": "reanalysis",
        "variable": VARIABLES,
        "year": str(year),
        "month": [f"{m:02d}" for m in sorted(months)],
        "day": cds_day_list(days),
        "time": cds_time_list(hours),
        "area": area,
        "grid": [GRID[0], GRID[1]],
        "data_format": "netcdf",
    }

    if not out_nc.exists() or force_download:
        c.retrieve("reanalysis-era5-single-levels", req, str(out_nc))

    t1 = time.perf_counter()
    print(f"download: {t1 - t0:.2f}s | file: {out_nc.stat().st_size / 1e6:.1f} MB")

    merged = sample_era5_to_obs(out_nc, df_y)
    print("merge OK")

    return merged

### 2022

In [None]:
# 18 mn pour 1gb de données, 2022, et 3 cyclones

df_2022 = run_yearly_test(
    df_filtered,
    year=2022,
    n_cyclones=3
)

In [None]:
PROCESSED_DIR = Path("data/processed")
PROCESSED_DIR.mkdir(parents=True, exist_ok=True)

out_path = PROCESSED_DIR / "ibtracs_era5_2022.csv"

df_2022.to_csv(out_path, index=False)

print(f"Saved to {out_path}")

In [None]:
print(df_2022.shape)
print(df_2022.columns)

print("Cyclones:", df_2022["sid"].nunique())
print("Période:",
      df_2022["time_stamp"].min(),
      "→",
      df_2022["time_stamp"].max())

df_2022.head(10)


In [None]:
df_2022[[
    "10m_u_component_of_wind",
    "10m_v_component_of_wind",
    "2m_temperature",
    "mean_sea_level_pressure"
]].describe()

In [None]:
import matplotlib.pyplot as plt

plt.figure()
for sid, g in df_2022.groupby("sid"):
    plt.plot(g["lon"], g["lat"], marker="o", label=sid)

plt.xlabel("Longitude")
plt.ylabel("Latitude")
plt.title("Trajectoires des cyclones (2022)")
plt.legend()
plt.grid(True)
plt.show()


In [None]:
plt.figure()
for sid, g in df_2022.groupby("sid"):
    g = g.sort_values("time_stamp")
    plt.plot(
        g["time_stamp"],
        g["mean_sea_level_pressure"],
        label=sid
    )

plt.xlabel("Temps")
plt.ylabel("Pression niveau mer (Pa)")
plt.title("Évolution de la pression ERA5")
plt.legend()
plt.grid(True)
plt.show()


In [None]:
plt.figure()
plt.scatter(
    df_2022["2m_temperature"],
    df_2022["mean_sea_level_pressure"],
    alpha=0.6
)
plt.xlabel("Température 2m (K)")
plt.ylabel("Pression niveau mer (Pa)")
plt.title("Température vs Pression")
plt.grid(True)
plt.show()


In [None]:
df_2022.isna().mean()


The merged IBTrACS–ERA5 dataset for 2022 shows strong internal consistency and physically plausible behavior. The selected cyclones exhibit coherent trajectories across distinct ocean basins, with smooth spatial evolution and no discontinuities. ERA5 sea-level pressure fields capture the expected cyclone signatures, with pressure minima temporally aligned with the most intense phases of each system. The relationship between near-surface temperature and pressure follows realistic meteorological patterns, with lower pressures generally associated with warmer air masses. No missing values were observed in the merged ERA5 variables, confirming the robustness of the spatial–temporal sampling strategy and the validity of the data integration pipeline.

### 2023

In [None]:
df_2023 = run_yearly_test(
    df_filtered,
    year=2023,
    n_cyclones=3
)

In [None]:
out_path = PROCESSED_DIR / "ibtracs_era5_2023.csv"

df_2023.to_csv(out_path, index=False)

print(f"Saved to {out_path}")

In [None]:
import plotly.express as px

df_plot = df_2023.sort_values("time_stamp")

fig = px.line_geo(
    df_plot,
    lon="lon",
    lat="lat",
    color="sid",
    hover_name="name",
    hover_data={
        "time_stamp": True,
        "wind": True,
        "pressure": True,
        "mean_sea_level_pressure": True,
    },
    title="Cyclone trajectories (2022)",
)

fig.update_traces(
    mode="lines+markers",
    marker=dict(size=5),
    line=dict(width=2),
)

fig.update_layout(
    geo=dict(
        projection_type="natural earth",
        showland=True,
        landcolor="rgb(230, 230, 230)",
        showcountries=True,
        showocean=True,
        oceancolor="rgb(200, 220, 255)",
    ),
    legend_title_text="Cyclone SID",
)

fig.show()
