# Loading ERA5

This notebook serves to extract, filter, and load ERA5 data from Google's public ERA5 analysis-ready, cloud-optimised (ARCO) mirror into Azure Blob Storage. Here, ERA5 data for the 1940-01-01 to 2025-12-31 period (continually, if irregularly, updated) at hourly frequency is stored in Zarr format. Beyond format, the sole difference between re-gridded data available therein and that through the Copernicus Climate Data Store (CDS) is variable naming: longnames are used in the former and shortnames in the latter.

By default, subset data is written to the default blob storage container for the workspace, "workspaceblobstore".

This notebook should be run in the Notebooks area of Azure Machine Learning. It does not require GPU capable compute and, due to lazy loading of extracted data and streamed write of the selected subset, is less memory intensive.

In [None]:
# using the Python 3.10 - SDK v2 kernel
%pip install xarray zarr fsspec gcsfs dask adlfs azure-ai-ml azure-identity

In [None]:
import sys
from datetime import datetime, timezone
from pathlib import Path
from uuid import uuid4

import numpy as np
import xarray as xr  # also requires zarr, fsspec, gcsfs, dask
from adlfs import AzureBlobFileSystem
from azure.ai.ml import MLClient
from azure.ai.ml.entities import Data
from azure.core.exceptions import ResourceNotFoundError
from azure.identity import DefaultAzureCredential

# insert parent directory to path for proper absolute local imports
sys.path.insert(0, str(Path.cwd().parent.parent.resolve()))
from setup.common.utils import get_aml_ci_env_vars, get_latest_asset
from setup.components.common.constants import (
    ATMOS_LEVELS,
    ATMOS_VAR_MAP,
    STATIC_VAR_MAP,
    SURF_VAR_MAP,
)

Define the GCP ERA5 dataset from which to extract a subset.

**NOTE:** See the [GCP ERA5 ARCO bucket](https://console.cloud.google.com/storage/browser/gcp-public-data-arco-era5) for other datasets including alternatively gridded Zarr and raw source NetCDF files. Not all datasets contain every variable or the same time range / frequency.

In [None]:
GCP_ERA5_PATH = "gs://gcp-public-data-arco-era5/ar/full_37-1h-0p25deg-chunk-1.zarr-v3"

Define parameters dictating timestamps to load, namely start and end date and timestep (frequency).

In [None]:
START_DATE = datetime(2025, 1, 1, 0, tzinfo=timezone.utc).isoformat()
END_DATE = datetime(2025, 1, 31, 23, tzinfo=timezone.utc).isoformat()
FREQUENCY = 6

Define the surface and pressure level variables and pressure levels to load. Variable longnames are mapped to shortnames for convenience (particularly when reading data into Aurora `Batch` objects) and are non-functional.

To ingest new variables and levels, add the former by longname to the appropriate dictionary and the latter by integer pressure level (hPa) to the given list.

**NOTE:** The two variable mappings are not strict. That is, single-level variables can be added to the pressure level variable mapping without error, they simply afford separation and readability. For the minimum set of variables required for Aurora 0.25 pre-trained, see `setup/components/common/constants.py`.

In [None]:
EXTRA_SFC_VARS = {
    "2m_dewpoint_temperature": "d2m",
}
EXTRA_ATMOS_VARS = {}
EXTRA_LEVELS = []

Lazy load and subset data by variables, levels, time range, and timestep.

**NOTE:** This will take at least 1 minute regardless of subset size due to the need to load metadata for this PB scale dataset.

In [None]:
ds = xr.open_zarr(GCP_ERA5_PATH, chunks={})
variables = [
    *SURF_VAR_MAP.keys(),
    *STATIC_VAR_MAP.keys(),
    *ATMOS_VAR_MAP.keys(),
    *EXTRA_SFC_VARS.keys(),
    *EXTRA_ATMOS_VARS.keys(),
]
var_subset_ds = ds[variables]
subset_ds = var_subset_ds.sel(
    time=slice(np.datetime64(START_DATE), np.datetime64(END_DATE), FREQUENCY),
    level=ATMOS_LEVELS + EXTRA_LEVELS,
)
# update metadata attributes to reflect the subset, not original, data
subset_ds.attrs.update(valid_time_start=START_DATE, valid_time_stop=END_DATE)
subset_ds

Obtain and create necessary environment parameters and Azure interface objects.

**NOTE:** when run on AML, the compute instance should have an identity with the Azure AI Administrator and Storage Blob Data Contributor roles. You may need to use the `azure.identity.ManagedIdentityCredential(client_id="...")` class with a client ID referencing an identity assigned the aforementioned roles.

In [None]:
az_cred = DefaultAzureCredential()
sub_id, rg_name, ws_name = get_aml_ci_env_vars()
ml_client = MLClient(
    credential=az_cred,
    subscription_id=sub_id,
    resource_group_name=rg_name,
    workspace_name=ws_name,
)

Define location and write subset data using the default workspace blob storage container ("workspaceblobstore"), the corresponding storage account, and a UUID v4 store name to avoid inadvertent naming collisions.

**NOTE:** The filesystem object and mapper can be avoided by using "abfs://" protocol paths and the `storage_options` parameter of `.to_zarr()`, though doing so can result in bugs from event loops created and managed by `xarray` / `zarr` and `fsspec` / `adlfs`. For example:
```python
subset_ds.to_zarr(
    f"abfs://{dst_datastore.container_name}/{uuid4()}.zarr",
    mode="w",
    compute=True,
    consolidated=True,
    zarr_format=2,
    storage_options={
        "credential": DefaultAzureCredential(),
        "account_name": dst_datastore.account_name,
    },
)
```

In [None]:
dst_datastore = ml_client.datastores.get("workspaceblobstore")
path = f"aurora-workshop/input/{uuid4()}.zarr"
fs = AzureBlobFileSystem(dst_datastore.account_name, credential=az_cred)
store = fs.get_mapper(f"{dst_datastore.container_name}/{path}")
subset_ds.to_zarr(
    store,
    mode="w",
    compute=True,
    consolidated=True,
    zarr_format=2,
)
print(
    f"Wrote to: account={dst_datastore.account_name}, "
    f"container={dst_datastore.container_name}, store={path}",
)

Confirm persisted data is available and valid.

**NOTE:** An equality check with the original `subset_ds` (e.g. `new_ds.equals(subset_ds)`) can be used for the avoidance of doubt but requires loading data into memory, which may take time and result in an OOM error, subset size dependent.

In [None]:
xr.open_dataset(store, engine="zarr", chunks={})

Define and create / update the Azure Machine Learning data asset entity for persisted data.

In [None]:
asset_name = "gcp-era5-arco"
try:
    new = int(get_latest_asset(ml_client.data, asset_name).version) + 1
except ResourceNotFoundError:
    new = 1

data_asset = Data(
    name=asset_name,
    version=str(new),
    description="Zarr subset of ERA5 data from the GCP ERA5 ARCO dataset.",
    path=f"azureml://subscriptions/{sub_id}/resourcegroups/{rg_name}/workspaces/{ws_name}/datastores/{dst_datastore.name}/paths/{path}",
)
ml_client.data.create_or_update(data_asset)
print(
    f"Created or updated asset: name={data_asset.name}, version={data_asset.version}, "
    f"path={data_asset.path}",
)