In [None]:
import logging
import os
from datetime import timedelta
from pathlib import Path
import re

import matplotlib.pyplot as plt
from metpy.constants import g
import numpy as np
import pandas as pd
import requests
import xarray as xr

# prompt: Find gefs data , use this information https://nomads.ncep.noaa.gov/pub/data/nccf/com/gens/prod/gefs.20250530/.. download the data before trying to access it

# We need to construct the URL for the desired data.
# The pattern seems to be: https://nomads.ncep.noaa.gov/pub/data/nccf/com/gens/prod/gefs.YYYYMMDD/
# We also need to specify the ensemble member and the forecast hour.
# The specific file names are also structured, e.g., gefs.t00z.pgrb2a.0p50.f006.grib2
# where t00z is the cycle time (00Z), pgrb2a is the grid type, 0p50 is the resolution.

now = pd.Timestamp.now()
init_start = pd.to_datetime("20250608")
init_end = pd.to_datetime("20250614")
forecast_length = 120

init_time_range = pd.date_range(
    init_start.floor("24h"), init_end.floor("24h"), freq="24h"
)  # round down to nearest multiple

In [None]:
for init_time in init_time_range:
    print(init_time)

In [None]:
shortName, isobaricInhPa = "t", 850


def build_file_url(init_time, mem, forecast_hour):
    return (
        f"https://nomads.ncep.noaa.gov/pub/data/nccf/com/gens/prod/gefs.{init_time:%Y%m%d}/"
        f"{init_time:%H}/atmos/pgrb2ap5/"
        f"{mem}.t{init_time:%H}z.pgrb2a.0p50.f{forecast_hour:03d}"
    )


def local_path_from_url(gefsdir, file_url_str):
    url_path = Path(file_url_str)
    file_path = gefsdir.joinpath(*url_path.parts[-5:])
    os.makedirs(file_path.parent, exist_ok=True)
    return file_path


def open_grib_dataset(path):
    ds = xr.open_dataset(
        path,
        engine="cfgrib",
        backend_kwargs={"errors": "ignore"},
        filter_by_keys={
            "typeOfLevel": "isobaricInhPa",
            "level": isobaricInhPa,
            "shortName": "gh" if shortName == "z" else shortName,
        },
        decode_timedelta=True,
        chunks={},  # chunking can help reduce memory usage
    )
    if shortName == 'z':
        ds = ds.rename(gh='z')
    return ds


def download_file(url, local_file_path):
    try:
        response = requests.get(url, stream=True, timeout=30)
        response.raise_for_status()
        with open(local_file_path, "wb") as f:
            for chunk in response.iter_content(chunk_size=8192):
                f.write(chunk)
        print(f"Successfully downloaded: {local_file_path}")
    except requests.exceptions.RequestException as e:
        print(f"Error during download: {e}")
        print(f"Could not download {url}")
        print("Please verify the date of the file exists on the server.")

In [None]:
def pangu_ifiles(pangu_dir, init_time, mem, forecast_length):
    file_path = pangu_dir / init_time.strftime("%Y%m%d%H") / f"ens{mem}" / "pangu_forecast_data"
    ifiles = [file_path / f"pangu_ens{mem}_pred_{i:03d}.nc" for i in range(24, forecast_length+1, 24)]
    return ifiles


def add_ensemble_number(ds):
    """
    A preprocessing function to be used with xr.open_mfdataset.
    It extracts the ensemble member number from the source filename
    and adds it as a 'number' coordinate.
    """
    # Get the basename of the file (e.g., "pangu_ens0_pred_162.nc")
    try:
        filename = os.path.basename(ds.encoding["source"])
    except (KeyError, TypeError):
        # Fallback if source encoding is not available
        return ds

    # Use a regular expression to find the number following 'ens'
    match = re.search(r'ens(\d+)', filename)

    if match:
        # Extract the number, convert to integer
        ensemble_number = int(match.group(1))

        # Add 'number' as a new dimension and assign the extracted number as its coordinate
        return ds.expand_dims(number=[ensemble_number])

    return ds


In [None]:
pangu_dir = Path("/glade/derecho/scratch/sobash/pangu_realtime")
# Create a nested list where list[init_time][member] = [files_for_all_forecast_hours]
nested_files = []
for init_time in init_time_range:
    # List for all members for this init_time
    time_specific_files = []
    for mem in range(51):
        # List of all forecast hour files for this specific member
        member_files = pangu_ifiles(pangu_dir, init_time, mem, forecast_length)
        time_specific_files.append(member_files)

    # Only add the list of time-specific files if it's not empty
    if time_specific_files:
        nested_files.append(time_specific_files)

# The channel label we want to select
channel_label = f"{shortName}{isobaricInhPa}"

ds_pangu = (
    xr.open_mfdataset(
        nested_files,
        combine="nested",
        concat_dim=["init_time", "number", "prediction_timedelta"],
        chunks="auto",  # Use 'auto' for better performance with dask
    )
    # Rename dimensions and coordinates at the start
    .rename(
        {
            "init_time": "initialization_time",
            "prediction_timedelta": "step",
            "lat": "latitude",
            "lon": "longitude",
            "__xarray_dataarray_variable__": shortName,  # Rename the main variable
        }
    )
    # Assign the integer coordinate for the 'number' dimension
    .assign_coords(number=range(51))
    # Convert 'step' dimension from timedelta to integer forecast hours
    .pipe(
        lambda ds: ds.assign_coords(
            forecast_hour=("step", ds["step"].data / np.timedelta64(1, "h"))
        ).swap_dims({"step": "forecast_hour"})
    )
    # Calculate the valid_time coordinate
    .assign(valid_time=lambda ds: ds.initialization_time + ds.step)
    # Select the desired channel by its label (more readable)
    .sel(channel=channel_label)
    # Add the pressure level as a non-dimension coordinate
    .assign_coords(isobaricInhPa=isobaricInhPa)
)

ds_pangu

In [None]:
gefs_members = ['gec00'] + [f'gep{i:02d}' for i in range(1, 31)]
# Use pathlib for robust path management. Default to './gefs_data' if TMPDIR isn't set.
gefsdir = Path(os.getenv("TMPDIR", "./gefs_data")) / "gefs"
gefsdir = Path("/glade/derecho/scratch/ahijevyc/tmp/gefs")

existing_files = set(p.resolve() for p in gefsdir.glob("*/??/atmos/pgrb2ap5/*pgrb2a.0p50.f???"))

# Base URL for the public NOAA GEFS S3 bucket
base_url = "https://noaa-gefs-pds.s3.amazonaws.com"
# ===================================================================
# PHASE 1: DEFINE FILE REQUIREMENTS
# ===================================================================
print("--- Phase 1: Defining all required file paths ---")
required_files = []

# Extend end by 120 hours (5 days) so we have "truth" to compare forecasts to.
last_valid_time = init_time_range[-1] + pd.Timedelta(hours=120)
extended_time_range = pd.date_range(start=init_time_range[0], end=last_valid_time, freq="24h")
for init_time in extended_time_range:
    date_str = init_time.strftime('%Y%m%d')
    cycle = init_time.strftime('%H')
    for member in gefs_members:
        for fhr in range(0, forecast_length+1, 24):
            valid_time = init_time + pd.Timedelta(hours=fhr)
            # We don't need non-zero-hour forecasts beyond last_valid_time.
            if fhr > 0 and valid_time > last_valid_time:
                #print(f"don't need forecast {init_time} {fhr} {valid_time} past {last_valid_time}")
                continue
            fhr_str = f"{fhr:03d}"
            
            # --- USING YOUR CORRECT FILE AND PATH STRUCTURE ---
            s3_filename = f"{member}.t{cycle}z.pgrb2a.0p50.f{fhr_str}"
            file_path_on_s3 = f"gefs.{date_str}/{cycle}/atmos/pgrb2ap5/{s3_filename}"
            url = f"{base_url}/{file_path_on_s3}"

            # Create a descriptive and unique local path
            local_path = local_path_from_url(gefsdir, url)
            required_files.append({'url': url, 'local_path': local_path})

print(f"‚úÖ Defined {len(required_files)} total files required for analysis.\n")

In [None]:
print(f"‚úÖ Data saved in: {gefsdir}")
print("-" * 50)
datasets = []

for required_file in required_files:
    url = required_file["url"]
    local_file_path = required_file["local_path"]
    if local_file_path in existing_files:
        print(f"   -> üü¢ File already exists. Skipping.")
    else:
        print(local_file_path)
        # Ensure the destination directory exists before downloading
        Path(local_file_path).parent.mkdir(parents=True, exist_ok=True)
        print(f"   -> ‚¨áÔ∏è  Attempting to download: {url}")
        download_file(url, local_file_path)
    ds_gfs = open_grib_dataset(local_file_path)
    datasets.append(ds_gfs)

ds_gfs = xr.combine_nested(datasets, concat_dim=["time"])

init_times = ds_gfs["valid_time"] - ds_gfs["step"]
ds_gfs = ds_gfs.assign_coords(initialization_time=init_times)
ds_gfs = ds_gfs.groupby(["initialization_time", "step", "number"]).first()

step_as_hours = ds_gfs["step"].data / pd.to_timedelta("1h")
ds_gfs = ds_gfs.assign_coords(forecast_hour=("step", step_as_hours))
ds_gfs = ds_gfs.swap_dims(step="forecast_hour")
ds_gfs["valid_time"] = ds_gfs.initialization_time + ds_gfs.step

ds_gfs

In [None]:
# Apply the geographic selection to each dataset and re-assign it to the original variable
ds_gfs = ds_gfs.sel(latitude=slice(60, 20), longitude=slice(220, 300)).load()
ds_pangu = ds_pangu.sel(latitude=slice(60, 20), longitude=slice(220, 300)).load()

In [None]:
ds_gfs.valid_time

In [None]:
truth = ds_gfs.sel(forecast_hour=0, drop=True).mean(dim="number")  # drop forecast_hour coordinate

fig, axes = plt.subplots(ncols=2, figsize=(14, 5), sharey=True)

# These will store the handles and labels for shared legend
handles = []
labels = []

for ax, ds_model, title in zip(axes, [ds_gfs, ds_pangu], ["GEFS", "Pangu-Weather"]):
    # --- Data processing steps (explained above) ---
    ensemble_mean = ds_model.mean(dim=["number", "latitude", "longitude"])

    # --- Data Reshaping (Stacking) ---
    # To prepare for plotting against 'valid_time', we first need to reshape the
    # data. `.stack()` combines the two dimensions into a single "MultiIndex"
    # dimension named 'point'. The data is now effectively a 1D series where
    # each data point is indexed by a (initialization_time, forecast_hour) pair.
    stacked_ds = ensemble_mean.stack(point=("initialization_time", "forecast_hour"))

    # --- Swapping the Index ---
    # This is a critical step. We replace the 'point' index with a new MultiIndex
    # composed of 'initialization_time' and 'valid_time'. This aligns each
    # data point with its specific start time and valid time, preparing it for the final pivot.
    tidy_ds = stacked_ds.set_index(point=["initialization_time", "valid_time"])

    # --- Final Pivot (Unstacking) ---
    # `.unstack()` performs the final pivot. It converts the data into the ideal 2D
    # "tidy" format for plotting:
    #   - Rows are indexed by 'valid_time' (our desired x-axis).
    #   - Columns are indexed by 'initialization_time' (our desired series/hues).
    plot_ready = tidy_ds[shortName].unstack("point")

    # --- Plotting ---
    plot_ready.plot.line(
        x="valid_time",
        hue="initialization_time",
        ax=ax,
        add_legend=False,
    )
    truth[shortName].mean(dim=["latitude", "longitude"]).plot.line(
        ax=ax, x="initialization_time", marker="o", color="k"
    )

    # --- Axis formatting ---
    ax.set_title(f"{title} Forecast {isobaricInhPa}-hPa {shortName}")
    ax.set_xlabel("Valid Time")
    ax.grid(True, linestyle="--", alpha=0.6)

    # --- Capture handles and create labels ONCE from the first plot ---
    if not handles:  # An empty list is False, so this runs only on the first iteration
        handles = ax.get_lines()
        # Create nicely formatted labels from the coordinate values
        labels = [
            pd.to_datetime(t).strftime("%Y-%m-%d %H:%M")
            for t in plot_ready.initialization_time.values
        ]

# Add a shared legend to the right of the figure
fig.legend(
    handles, labels, title="Initialization Time", loc="center right", bbox_to_anchor=(1.0, 0.5)
)

# Adjust layout to make room for the legend
fig.tight_layout(rect=[0, 0, 0.88, 1])  # Adjusted rect to give legend more space

In [None]:
plot_ready.load()

In [None]:
fig, axes = plt.subplots(ncols=2, figsize=(14, 5), sharex=True, sharey=True)

for ax, ds_model, title in zip(axes, [ds_gfs, ds_pangu], ["GEFS", "Pangu-Weather"]):
    # Average over spatial dimensions ONLY
    # This keeps the 'number' dimension for the ensemble members.
    ds_processed = ds_model.mean(dim=["latitude", "longitude"])

    # Loop over each initialization time
    for init_time in ds_processed.initialization_time:
        # Select all members for this single forecast run
        run_with_members = ds_processed.sel(initialization_time=init_time)

        # Plot the first member to establish the color and label for the legend
        first_member_data = run_with_members.isel(number=0)
        line = ax.plot(
            first_member_data.valid_time,
            first_member_data[shortName],
            alpha=0.5,
            label=pd.to_datetime(init_time.values).strftime("%Y-%m-%d %H:%M"),
        )
        # Get the color matplotlib automatically assigned to the first line
        run_color = line[0].get_color()

        # Loop over the REST of the members and plot them with the same color
        for member_index in range(1, len(run_with_members.number)):
            member_data = run_with_members.isel(number=member_index)
            ax.plot(
                member_data.valid_time,
                member_data[shortName],
                alpha=0.5,
                color=run_color,  # Reuse the color from the first member
            )
    truth[shortName].mean(dim=["latitude", "longitude"]).plot.line(
        ax=ax, x="initialization_time", marker="o", color="k"
    )

    ax.set_title(f"{title} Forecast {isobaricInhPa}-hPa {shortName}")
    ax.set_xlabel("Valid Time")
    ax.grid(True, linestyle="--", alpha=0.6)
    ax.legend(title="Initialization Time")

In [None]:
fig, axes = plt.subplots(ncols=2, figsize=(14, 5), sharex=True, sharey=True)

for ax, ds_model, title in zip(axes, [ds_gfs, ds_pangu], ["GEFS", "Pangu-Weather"]):
    # average over initialization_time, lat and lon. std over ensemble (number).
    ensemble_spread = (
        ds_model[shortName].std(dim="number", ddof=1).mean(dim=["initialization_time", "latitude", "longitude"])
    )
    # error = ensemble mean - truth
    error = ds_model.mean(dim="number") - truth
    se = error[shortName] ** 2
    mse = se.mean(dim=["latitude", "longitude"])
    rmse = np.sqrt(mse).mean(dim="initialization_time")
    rmse.plot.line(ax=ax, x="forecast_hour", marker="o", label="rmse")
    ensemble_spread.plot.line(ax=ax, x="forecast_hour", marker="o", label="spread")
    ax.grid(True, linestyle="--", alpha=0.6)
    ax.set_title(f"{title} Forecast {isobaricInhPa}-hPa {shortName}")
    ax.legend()