
This code retrieves data from the Canadian Regional Climate Model V5 driven by CMIP6 global climate models (CRCM5-CMIP6).
https://www.ouranos.ca/en/ouranos-climate-data/crcm5-cmip6

To optimize storage efficiency, this code generates a Zarr file for each pilot, time step, and data type (historical or projections).

It should be noted that some variables are instantaneous, meaning the value corresponds to the stated hour. However, there are others at half-hour intervals, which represent the average, minimum, or maximum value for the time period (for example, an average flux over 3 hours). To optimize storage, all variables at half-hour intervals are shifted to the previous instantaneous time step, e.g., a value at 4:30 is set at 3:00. This is not a problem for the purpose of this dataset, as this variable will be aggregated at a daily timestep. However, it might be more appropriate to shift it to the next instantaneous time step.

In [0]:
import json
import os
import re
from datetime import datetime, timedelta

import dask
import numpy as np
import requests
import xarray as xr
import zarr
from bs4 import BeautifulSoup
from pyspark.sql import SparkSession

In [0]:
def get_urls_from_catalog(catalog_url, list_url=None):
    """
    Create a list of .nc file URLs from the given catalog URL.

    This function fetches the content of the catalog URL, parses it to find all links,
    and recursively processes sub-catalogs to gather .nc file URLs.

    Args:
        catalog_url (str): The URL of the THREDDS catalog.
        list_url (list, optional): A list to store the collected .nc file URLs. Defaults to None.
    Returns:
        list: A list of URLs pointing to .nc files in the catalog.
    """

    if list_url is None:
        list_url = []

    response = requests.get(catalog_url)
    soup = BeautifulSoup(response.content, "html.parser")

    # Find all the links in the catalog
    links = soup.find_all("a", href=True)

    for link in links:
        href = link["href"]
        if any(sbt in href for sbt in ["twitcher", "https:", "fx"]) or not href:
            continue
        # Recursively download from sub-catalogs
        if href.endswith("catalog.html"):
            new_catalog_url = os.path.join(os.path.dirname(catalog_url), href)
            get_urls_from_catalog(new_catalog_url, list_url)
        elif href.endswith(".nc"):
            url = f"https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/dodsC/{href.split('=')[1]}"
            list_url.append(url)
    return list_url


def create_dict_from_urls(list_url):
    """
    Create a nested dictionary from a list of .nc file URLs.

    This function processes each URL in the list, extracts relevant parts from the filename,
    and organizes them into a nested dictionary structure.

    Args:
        list_url (list): A list of .nc file URLs.

    Returns:
        dict: A nested dictionary where the keys are extracted parts of the filenames.
    """
    nc_dict = {}
    for nc_file in list_url:
        parts = nc_file.split("/")[-1].split("_")

        model = parts[2]  # pilots
        ts = parts[8]  # ts
        ssp = parts[3]  # ssp
        member = parts[4]  # member
        version = parts[7]  # version
        var = parts[0]  # variable

        if model not in nc_dict:
            nc_dict[model] = {}
        if ts not in nc_dict[model]:
            nc_dict[model][ts] = {}
        if ssp not in nc_dict[model][ts]:
            nc_dict[model][ts][ssp] = {}
        if member not in nc_dict[model][ts][ssp]:
            nc_dict[model][ts][ssp][member] = {}
        if version not in nc_dict[model][ts][ssp][member]:
            nc_dict[model][ts][ssp][member][version] = {}
        if var not in nc_dict[model][ts][ssp][member][version]:
            nc_dict[model][ts][ssp][member][version][var] = []

        nc_dict[model][ts][ssp][member][version][var].append(nc_file)
    return nc_dict


def calculate_time_steps(file_url, ts):
    """
    Calculate the number of time steps from the file name.

    Args:
        file_url (str): The URL of the .nc file containing the date range.
        ts (str): The time step interval ('day', '1hr', '3hr').

    """
    if ts == "day":
        ts_ = 24
    elif ts == "1hr":
        ts_ = 1
    elif ts == "3hr":
        ts_ = 3
    else:
        raise ValueError(f"Invalid time step interval: {ts}")

    match = re.search(r"(\d{12})-(\d{12})\.nc", file_url)
    if match:
        start_date_str, end_date_str = match.groups()
        start_date = datetime.strptime(start_date_str, "%Y%m%d%H%M")
        end_date = datetime.strptime(end_date_str, "%Y%m%d%H%M")

        # Calculate the number of time steps hour intervals
        time_steps = int((end_date - start_date) / timedelta(hours=ts_)) + 1
        return time_steps
    else:
        return None


def process_file(url, store, offset, ssp, member):
    """
    Process a NetCDF file and store the data in a Zarr group.

    This function opens a NetCDF file from the given URL, filters the data based on the specified
    longitude and latitude bounds, and stores the filtered data in a Zarr group.

    Args:
        url (str): The URL of the NetCDF file to be processed.
        store (str): The Zarr store where the data will be stored.
        offset (int): The offset to be applied to the time slices.
        ssp (str): The SSP scenario identifier.
        member (str): The member identifier.
        lon_min (float): The minimum longitude for filtering.
        lon_max (float): The maximum longitude for filtering.
        lat_min (float): The minimum latitude for filtering.
        lat_max (float): The maximum latitude for filtering.
    """
    ds = xr.open_dataset(url)

    ds = ds.where(
        (ds.lon >= lon_min)
        & (ds.lon <= lon_max)
        & (ds.lat >= lat_min)
        & (ds.lat <= lat_max),
        drop=True,
    ).compute()

    variable = list(ds.data_vars.keys())[0]
    da = ds[variable]

    group = zarr.group(store)

    index_ssp = np.where(group["ssp"][:] == ssp)[0][0]
    index_member = np.where(group["member"][:] == member)[0][0]

    slices = dask.array.core.slices_from_chunks(dask.array.empty_like(da).chunks)
    for slice_ in slices:
        time_slice, *rest = slice_
        time_slice = slice(time_slice.start + offset, time_slice.stop + offset)
        target_slice = (time_slice,) + tuple(rest)
        group[variable][target_slice + (index_ssp, index_member)] = da[slice_].values


def process_expanding(variable, url, store, offset):
    """
    Process and store data from a NetCDF file in a Zarr group with an expanding offset.

    This function opens a NetCDF file from the given URL, extracts the specified variable,
    and stores the data in a Zarr group with an offset applied to the time dimension.

    Args:
        variable (str): The name of the variable to be processed.
        url (str): The URL of the NetCDF file to be processed.
        store (str): The Zarr store where the data will be stored.
        offset (int): The offset to be applied to the time dimension.
    """
    group = zarr.group(store)
    ds = xr.open_dataset(url)
    da = ds[variable]

    slice_ = (slice(offset, da.shape[0] + offset),)

    values = da.values

    nh = (values[1] - values[0]) / np.timedelta64(1, "h")

    # This is necessary to align the variables in the time dimension. Check the source.
    if nh == 3:  # TODO Implement for different time steps
        minu = [dt.astype(object).minute for dt in values.astype("datetime64[m]")]
        if 30 in minu:
            values = values - np.timedelta64(90, "m")
        group[variable][slice_] = values
    else:
        raise ValueError(
            "This function currently does not support time steps different from 3 hours"
        )


def decode_encode_attr(v):
    if isinstance(v, bytes):
        v = v.decode("utf-8")
    return xr.backends.zarr.encode_zarr_attr_value(v)


def run_parallel(pair, store, ssp, member):

    process_file(url=pair[0], store=store, offset=pair[1], ssp=ssp, member=member)
    # process_expanding(variable='time', url=pair[0], store=store, offset= pair[1])

# 1. Fetching and processing catalog URLs

In this section, the code fetches the content of the catalog URL, parses it to find all links, and recursively processes sub-catalogs to gather `.nc` file URLs. This process may take several minutes, so it is recommended to save the resulting dictionary for easier manipulation and future use.


In [0]:
base_url = "https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/catalog/birdhouse/disk2/ouranos/CORDEX/CMIP6/DD/NAM-12/OURANOS/catalog.html"
list_url = get_urls_from_catalog(base_url)

In [0]:
dict_url = create_dict_from_urls(list_url)
# Save dictionary to a JSON file
with open("./dic_nc.json", "w") as file:
    json.dump(dict_url, file)

In [0]:
with open("./dic_nc.json") as file:
    dict_url = json.load(file)

#2. Create an empty zarr

An empty Zarr file with the desired structure must be created first to download the data in parallel.


In this section, specify the pilot, time step, type (historical or projections), and variables to be downloaded. Additionally, define the desired spatial extent. This section also requires identifying the final length of the Zarr file (**len_time**). An example of how this can be obtained is provided, but it is recommended to double-check this variable.


In [0]:
directory = "./CRCM5-CMIP6/"

pilot = "CNRM-ESM2-1"
time_step = "3hr"
typ = "ssp"  #'ssp' or 'historical'

VARIABLES = [
    "prra",
    "prsn",
    "snw",
    "clwvi",
    "prw",
]

# Desired spatial extent
# This is primarily done to facilitate data manipulation; it may not improve download time
lat_min, lat_max = 43, 50
lon_min, lon_max = -82, -71

# Double-check
if typ == "historical":
    dic4len = dict_url[pilot][time_step]["historical"]["r1i1p1f2"]["v1-r1"]["clwvi"]
else:
    dic4len = dict_url[pilot][time_step]["ssp126"]["r1i1p1f2"]["v1-r1"]["clwvi"]
len_time = np.sum(
    np.array([calculate_time_steps(n, time_step) for n in dic4len])
)  # heads-up: this line might not work

If the above variables are modified, the dictionary `ATTRS_ARRAY_DIMENSIONS` must be updated to assign the coordinates on which the new variables depend.

In [0]:
COORDINATES = [
    "time",
    "rlat",
    "rlon",
    "lat",
    "lon",
]
NEW_COOR = ["ssp", "member"]

ATTRS_ARRAY_DIMENSIONS = {
    "prra": ["time", "rlat", "rlon", "ssp", "member"],
    "prsn": ["time", "rlat", "rlon", "ssp", "member"],
    "snw": ["time", "rlat", "rlon", "ssp", "member"],
    "clwvi": ["time", "rlat", "rlon", "ssp", "member"],
    "prw": ["time", "rlat", "rlon", "ssp", "member"],
    "time": ["time"],
    "rlat": ["rlat"],
    "rlon": ["rlon"],
    "ssp": ["ssp"],
    "member": ["member"],
    "lat": ("rlat", "rlon"),
    "lon": ("rlat", "rlon"),
}

EXPANDING = {"time"}

missing_value = np.nan

ATTRS_DROP = {
    "driving_experiment_id",
    "driving_variant_label",
    "variable_id",
    "history",
    "tracking_id",
    "version_realization",
    "_ChunkSizes",
}

Create the zarr group where the data will be stored

In [0]:
store = os.path.join(
    directory, "test_fixed_" + pilot + "_" + time_step + "_" + typ + ".zarr"
)
group = zarr.group(store, overwrite=True)

##2.1. Static Metadata

Static metadata, such as 'frequency', 'variable_id', 'units', etc., is assigned to the Zarr file using some .nc files as templates. It may be necessary to modify the `dic_vars_templates` variable according to the pilot or the desired time step.

In [0]:
dic_vars_templates = dict_url[pilot][time_step]["historical"]["r1i1p1f2"]["v1-r1"]
templates_list = [
    dic_vars_templates[li][0] for li in dic_vars_templates if li in VARIABLES
]  # It takes the first .nc file for each variable

ds_templ = xr.open_dataset(templates_list[0])

Next, the global metadata, along with the metadata for variables and coordinates, is added to the Zarr file.

In [0]:
global_attrs = {
    k: decode_encode_attr(v) for k, v in ds_templ.attrs.items() if k not in ATTRS_DROP
}
attrs = {".zattrs": global_attrs}

In [0]:
for file in templates_list:
    temp_i = xr.open_dataset(file)
    variable = list(temp_i.data_vars.keys())[0]
    dataset = temp_i[variable]
    attrs[variable] = {
        k: decode_encode_attr(v)
        for k, v in dataset.attrs.items()
        if k not in ATTRS_DROP
    }
    attrs[variable]["_ARRAY_DIMENSIONS"] = ATTRS_ARRAY_DIMENSIONS[variable]
    attrs[variable]["missing_value"] = missing_value

In [0]:
for coordinate in COORDINATES:
    dataset = ds_templ[coordinate]
    attrs[coordinate] = {
        k: decode_encode_attr(v)
        for k, v in dataset.attrs.items()
        if k not in ATTRS_DROP
    }
    attrs[coordinate]["_ARRAY_DIMENSIONS"] = ATTRS_ARRAY_DIMENSIONS.get(coordinate, [])
    if "_FILL_VALUE" in attrs[coordinate]:
        attrs[coordinate]["missing_value"] = missing_value

In [0]:
for new_coor in NEW_COOR:
    attrs[new_coor] = {}
    attrs[new_coor]["_ARRAY_DIMENSIONS"] = ATTRS_ARRAY_DIMENSIONS.get(new_coor, [])

## 2.2. Pre-allocate Zarr groups for variables and coordinates
The size of the Zarr file is defined based on the templates from the previous section.

In [0]:
if typ == "ssp":
    ssp_list = np.array(
        [key for key in dict_url[pilot][time_step].keys() if key != "historical"]
    )
elif typ == "historical":
    ssp_list = np.array(["historical"])
else:
    raise ValueError(f"Invalid type: {typ}. It must be ssp or historical")

member_list = np.array(
    list(dict_url[pilot][time_step][ssp_list[0]].keys())
)  # TODO: check that all the ssp has the same members. For now, we use the members of the first ssp

n_ssp = len(ssp_list)
n_member = len(
    dict_url[pilot][time_step][ssp_list[0]].keys()
)  # TODO: check that all the ssp has the same members

Crop `ds_templ` dataset based on the specified longitude and latitude boundaries and defined the size of the Zarr file.

In [0]:
templ = ds_templ.where(
    (ds_templ.lon >= lon_min)
    & (ds_templ.lon <= lon_max)
    & (ds_templ.lat >= lat_min)
    & (ds_templ.lat <= lat_max),
    drop=True,
).compute()
full_shape = (len_time, templ.rlat.shape[0], templ.rlon.shape[0], n_ssp, n_member)

The size of each variable and the coordinates are allocated according to the template of each variable, as well as the size of the number of SSPs and members

In [0]:
for file in templates_list:
    temp_i = xr.open_dataset(file)
    ds3 = temp_i.where(
        (temp_i.lon >= lon_min)
        & (temp_i.lon <= lon_max)
        & (temp_i.lat >= lat_min)
        & (temp_i.lat <= lat_max),
        drop=True,
    )  # .compute()
    variable = list(ds3.data_vars.keys())[0]
    dataset = ds3[variable]

    chunks = (templ.time.shape[0], templ.rlon.shape[0], templ.rlat.shape[0], 1, 1)  ###

    v = group.empty(
        dataset.name,
        shape=full_shape,
        dtype=dataset.dtype,
        chunks=chunks,
        overwrite=True,
    )
    v.attrs.update(attrs[dataset.name.lstrip("/")])

In [0]:
for coord in COORDINATES:  # TODO: this is slowish. Maybe parallelize on 1 machine.

    dataset = templ[coord]
    print("handle", coord)

    if coord not in EXPANDING:
        shape = dataset.shape
    else:
        shape = (full_shape[0],) + dataset.shape[1:]
    shape = tuple(int(x) for x in shape)

    v = group.empty(
        dataset.name, shape=shape, dtype=dataset.dtype, overwrite=True, chunks=shape
    )
    if coord not in EXPANDING:
        # Replace some static data
        v[:] = np.array(dataset)
        if coord in ["lat", "lon"]:
            v[:, :] = np.array(dataset)
    v.attrs.update(attrs[dataset.name.lstrip("/")])

In [0]:
v = group.empty(
    "ssp", shape=(n_ssp,), dtype=np.dtype("U10"), overwrite=True, chunks=(1,)
)
v[:] = ssp_list
v.attrs.update(attrs["ssp"])

v = group.empty(
    "member", shape=(n_member,), dtype=np.dtype("U10"), overwrite=True, chunks=(1,)
)
v[:] = member_list
v.attrs.update(attrs["member"])

# 3. Download data
Pay close attention to the `TODO` lines.

In [0]:
spark = SparkSession.builder.appName("Parallelize_download_crcm5").getOrCreate()
sc = spark.sparkContext

for ssp in ssp_list:
    for member in member_list:  # TODO: some members get error when downloading
        for key in VARIABLES:
            try:
                names = dict_url[pilot][time_step][ssp][member]["v1-r1"][
                    key
                ]  # TODO create another loop for "v1-r1" when there is more than one version

                full_shape = np.array(
                    [calculate_time_steps(n, time_step) for n in names]
                )

                csum = np.cumsum(np.array(full_shape))
                offsets = np.concatenate([np.array([0]), csum[:-1]])

                rdd1 = sc.parallelize(names)
                rdd2 = sc.parallelize(offsets)

                zipped_rdd = rdd1.zip(rdd2)
                # run_parallel((names[0],offsets[0]), store, ssp, member) #This line help to debug the code

                zipped_rdd.foreach(lambda pair: run_parallel(pair, store, ssp, member))
                print("ssp:" + ssp + ", member:" + member + ", var:" + key + ", done")
            except:
                print(
                    "############### ERROR: ssp:"
                    + ssp
                    + ", member:"
                    + member
                    + ", var:"
                    + key
                )

Add dates to the zarr file based on `.nc` files in the variable `names` from the last loop of the previous code 

In [0]:
for i in range(len(names)):
    process_expanding(variable="time", url=names[i], store=store, offset=offsets[i])

In [0]:
zarr.consolidate_metadata(store)