# Processing Large Climate Datasets with Dask and Xarray
Efficient data processing on PAVICS, or using Xarray/Dask in general, typically requires understanding two different notions of data *chunking*: 1) *on-disk* chunking or the way the data is stored in the filesystem and 2) *in-memory* chunking, or the way that Dask will break up a large dataset into manageable portions and process the data in parallel. In the case of *in-memory* chunks, it is possible to specify any size of in-memory chunking based on system resources and the analysis at hand. However, this does not guarantee efficient computation. Indeed, to efficiently process large datasets and prevent memory overloads, aligning in-memory chunks with the on-disk chunk structure is essential. This allows Dask to load and process data in a way that minimizes memory usage and I/O operations, speeding up computation.



In [14]:
import shutil
import time
from pathlib import Path
from tempfile import TemporaryDirectory

import psutil
import xarray as xr
from dask import compute
from dask.distributed import Client
from IPython.display import clear_output
from xclim import atmos
from xscen.io import save_to_zarr

### How to align memory chunks with on-disk chunks?
I begin by checking the on-disk chunk structure of my dataset by loading it with `decode_times=False`. This skips time decoding and loads only metadata without reading the data into memory.

In [15]:
# open the daily ERA5-land dataset as an example
url = "https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/dodsC/datasets/reanalyses/day_ERA5-Land_NAM.ncml"

# use decode_times=False to skip time decoding and load only metadata
ds = xr.open_dataset(url, decode_times=False, chunks={})

# `._ChunkSizes` attribute shows the chunk structure of the variable
print(ds["tas"]._ChunkSizes)

[366  50  50]


The `._ChunkSizes` attribute shows the chunk sizes of the `tas` variable on disk. I then re-open the dataset with matching Dask chunk sizes for efficient memory alignment. 

When you open a dataset using chunking, it is represented as Dask-backed arrays, where only the metadata is initially loaded, and the data itself remains on disk until needed. Dask uses lazy evalution, meaning it doesn't immediately perform the operation but instead builds a computation graph. This graph tracks the sequence of operations, delaying execution until `compute()` is called. During computation, Dask reads and processes data in chunks and loads only the necessary parts into memory. 

Here's an example showing the difference in computation times when the dataset is loaded with unaligned versus aligned chunking. In this example, I resample daily ERA5-Land `tas` data to yearly means. To keep the computation time manageable, I limit the calculation to a single year and focus on the Eastern North America domain. Given the large spatial coverage and shorter time span, a logical choice might be to chunk the data in-memory for a few days over the `time` dimension, while keeping the entire spatial domain in the `lat` and `lon` dimensions:

In [None]:
# NBVAL_SKIP

# open dataset with unaligned chunks
ds = xr.open_dataset(url, chunks={"time": 10, "lat": -1, "lon": -1})

# resample the 'tas' variable to yearly means
%time tas_resampled = ds['tas'].sel(time=slice('1981-01-01', '1981-12-31'), lat=slice(35, 65), lon=slice(-100, -60)).resample(time='YS').mean().compute()
del ds

However, this original chunking choice resulted in inefficient computation because the on-disk chunk size for the `time` dimension was much larger than 10. Processing such small chunks led to excessive overhead and slow performance. Instead of arbitrary chunking, I align the chunks with the on-disk structure by setting the `time` chunk size to match the smallest on-disk chunk and choosing spatial chunks in multiples of 50 to avoid creating too many small chunks:

In [9]:
# NBVAL_IGNORE_OUTPUT

# open dataset with aligned chunks
ds = xr.open_dataset(url, chunks={"time": 366, "lat": 50 * 5, "lon": 50 * 5})

# resample the 'tas' variable to yearly means
%time tas_resampled = ds['tas'].sel(time=slice('1981-01-01', '1981-12-31'), lat=slice(35, 65), lon=slice(-100, -60)).resample(time='YS').mean().compute()
del ds

CPU times: user 3.76 s, sys: 2.79 s, total: 6.55 s
Wall time: 32.5 s


The computation time dropped from approximately 1 minute to 10 seconds after aligning the chunks, as this approach better managed memory and I/O operations, leading to faster execution.

### How to do parallel processing with Dask Distributed Client? 
Processing large climate datasets can be computationally intensive and time-consuming due to their size and complexity. Parallel processing divides a computational task into smaller, independent subtasks that can be executed simultaneously across multiple CPU cores. This can minimize memory overhead and significantly reduce computation time. 

One way to implement parallel processing is through the Dask Distributed Client. Initializing a Dask Client establishes a connection to a system of interconnected computers or processors, known as a cluster. This connection enables interaction with two key components: the scheduler and the workers. The scheduler is the central component that manages task distribution across the workers while the workers are responsible for executing these tasks using one or more CPU cores and threads, depending on the configuration.

A core is a physical processing unit within a CPU that can execute tasks independently. Modern CPUs often have multiple cores, allowing them to perform multiple operations in parallel. A thread, on the other hand, is the smallest sequence of programmed instructions that the CPU can manage independently. Threads run within cores, and each core can handle multiple threads through a process called multithreading. While cores provide true parallelism by handling separate tasks simultaneously, threads allow for more efficient use of each core by managing multiple tasks in a way that can overlap their execution. For example, while one thread performs calculations, another thread within the same core can handle data loading or I/O operations, ensuring the core remains fully utilized.

Let's look at an example of resampling daily ERA5-Land data into yearly means for multiple variables using the Dask Distributed Client. To manage system resources effectively, especially since PAVICS is a shared environment, I set the total memory limit to 20 GB. After experimenting with different configurations, I found that using 5 workers, each with 2 threads and 4 GB memory, provides a good balance between reducing overhead and optimizing computation time. I track memory usage and task progress in real time by displaying the Dask dashboard.

To create delayed tasks, I set `compute=False` with `to_zarr`. This delays the entire pipeline —from reading the data, performing computations, to writing the output— and allows Dask to optimize the execution plan. By scheduling tasks efficiently, Dask reduces unnecessary data movement and enables each worker to write data independently to Zarr format. This approach is much faster than using sequential formats like NetCDF, which don’t support parallel writes as effectively.

Finally, to prevent memory leaks, which occur when the program retains memory it no longer needs, potentially slowing down the system or causing it to crash, I use the Client within a context manager. This ensures that resources are released after execution. 

In [10]:
# NBVAL_IGNORE_OUTPUT

start_time = time.time()

# function to compute the yearly mean for each variable


def compute_annual_mean(dataset, variable_name):
    var = dataset[variable_name]
    annual_mean = var.resample(time="YS").mean()
    return annual_mean


# set up Dask client within a context manager
with Client(n_workers=5, threads_per_worker=2, memory_limit="4GB") as client:
    # display the Dask Dashboard
    display(client)

    # open the dataset with on-disk chunking structure
    url = "https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/dodsC/datasets/reanalyses/day_ERA5-Land_NAM.ncml"
    ds = xr.open_dataset(url, chunks={"time": 366, "lat": 50, "lon": 50})

    # focus on Eastern North America and a single year for this example
    ds = ds.sel(
        time=slice("1981-01-01", "1981-12-31"), lat=slice(35, 65), lon=slice(-100, -60)
    )

    # define the variables to compute yearly means for
    variables = ["tas", "tasmin", "tasmax"]

    # create a list to hold delayed tasks
    tasks = []
    with TemporaryDirectory(prefix="output") as tempdir:
        for var_name in variables:
            output_path = Path(tempdir.name).joinpath(
                f"var_means/{var_name}_1981_yearly_mean.zarr"
            )
            if not output_path.exists():
                yearly_mean = compute_annual_mean(ds, var_name)
                # save to Zarr with compute=False to get a delayed task object
                delayed_task = yearly_mean.chunk(
                    {"time": -1, "lat": 50, "lon": 50}
                ).to_zarr(output_path, mode="w", compute=False)
                tasks.append(delayed_task)

        # trigger the execution of all delayed tasks
        compute(*tasks)

        # fetch memory usage from all workers and display the total usage
        worker_memory = client.run(lambda: psutil.Process().memory_info().rss)
        total_memory = sum(worker_memory.values())
        print(f"Total memory usage across all workers: {total_memory / 1e9:.2f} GB")

end_time = time.time()
elapsed_time = end_time - start_time
print(f"Total computation time: {elapsed_time:.2f} seconds")

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 5
Total threads: 10,Total memory: 18.63 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:40827,Workers: 5
Dashboard: http://127.0.0.1:8787/status,Total threads: 10
Started: Just now,Total memory: 18.63 GiB

0,1
Comm: tcp://127.0.0.1:36563,Total threads: 2
Dashboard: http://127.0.0.1:41405/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:44683,
Local directory: /tmp/dask-scratch-space/worker-1lhu8_ru,Local directory: /tmp/dask-scratch-space/worker-1lhu8_ru

0,1
Comm: tcp://127.0.0.1:33447,Total threads: 2
Dashboard: http://127.0.0.1:45231/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:38867,
Local directory: /tmp/dask-scratch-space/worker-v9t65o25,Local directory: /tmp/dask-scratch-space/worker-v9t65o25

0,1
Comm: tcp://127.0.0.1:46075,Total threads: 2
Dashboard: http://127.0.0.1:36861/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:39841,
Local directory: /tmp/dask-scratch-space/worker-z5bjm3wl,Local directory: /tmp/dask-scratch-space/worker-z5bjm3wl

0,1
Comm: tcp://127.0.0.1:41555,Total threads: 2
Dashboard: http://127.0.0.1:39265/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:34799,
Local directory: /tmp/dask-scratch-space/worker-zo6_505_,Local directory: /tmp/dask-scratch-space/worker-zo6_505_

0,1
Comm: tcp://127.0.0.1:40567,Total threads: 2
Dashboard: http://127.0.0.1:40103/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:36821,
Local directory: /tmp/dask-scratch-space/worker-bbkrl7la,Local directory: /tmp/dask-scratch-space/worker-bbkrl7la


Total memory usage across all workers: 1.22 GB
Total computation time: 102.68 seconds


Let’s look at a more complex analysis involving heatwave indicators. Here, I calculate two climate indicators, `heat_wave_total_length` and `heat_wave_frequency`, using the `atmos` submodule from the `xclim` library. Both indicators rely on the same input data (`tasmin` and `tasmax`), so I create a pipeline of delayed tasks, which minimizes I/O operations by keeping the data in memory until both indicators are calculated and saved.

In [11]:
# NBVAL_IGNORE_OUTPUT

start_time = time.time()

with Client(n_workers=5, threads_per_worker=2, memory_limit="4GB") as client:
    display(client)

    # load data using on-disk chunk sizes
    url = "https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/dodsC/datasets/reanalyses/day_ERA5-Land_NAM.ncml"
    ds = xr.open_dataset(url, chunks={"time": 366, "lat": 50, "lon": 50})

    # focus on Eastern North America and a single year
    ds = ds.sel(
        time=slice("1981-01-01", "1981-12-31"), lat=slice(35, 65), lon=slice(-100, -60)
    )

    # variables 'tasmax' and 'tasmin' has 'cell_methods' set to 'time: point'
    # update it to conform to CF conventions expected by xclim
    ds["tasmax"].attrs["cell_methods"] = "time: maximum"
    ds["tasmin"].attrs["cell_methods"] = "time: minimum"

    # list of heatwave indicator functions
    indicators = [atmos.heat_wave_total_length, atmos.heat_wave_frequency]

    tasks = []
    with TemporaryDirectory(prefix="output1") as tempdir1:
        for indicator in indicators:
            ds_out = xr.Dataset(
                attrs=ds.attrs
            )  # create a new dataset for each indicator

            out = indicator(ds=ds, freq="YS")

            out = out.chunk({"time": -1, "lat": 50, "lon": 50})
            ds_out[out.name] = out

            output_path = Path(tempdir1.name).joinpath(
                f"heatwaves_ex1/{out.name}_1981.zarr"
            )
            output_path.parent.mkdir(parents=True, exist_ok=True)

            if not output_path.exists():
                # save to Zarr with compute=False to get a delayed task object
                delayed_task = ds_out.to_zarr(output_path, mode="w", compute=False)
                tasks.append(delayed_task)

        # trigger computation
        compute(*tasks)

        # fetch memory usage from all workers and display the total usage
        worker_memory = client.run(lambda: psutil.Process().memory_info().rss)
        total_memory = sum(worker_memory.values())
        print(f"Total memory usage across all workers: {total_memory / 1e9:.2f} GB")

end_time = time.time()
print(f"Total computation time: {end_time - start_time:.2f} seconds")

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 5
Total threads: 10,Total memory: 18.63 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:34335,Workers: 5
Dashboard: http://127.0.0.1:8787/status,Total threads: 10
Started: Just now,Total memory: 18.63 GiB

0,1
Comm: tcp://127.0.0.1:42711,Total threads: 2
Dashboard: http://127.0.0.1:32813/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:36533,
Local directory: /tmp/dask-scratch-space/worker-eri9d1g8,Local directory: /tmp/dask-scratch-space/worker-eri9d1g8

0,1
Comm: tcp://127.0.0.1:44179,Total threads: 2
Dashboard: http://127.0.0.1:43289/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:40297,
Local directory: /tmp/dask-scratch-space/worker-a2cxiltl,Local directory: /tmp/dask-scratch-space/worker-a2cxiltl

0,1
Comm: tcp://127.0.0.1:41409,Total threads: 2
Dashboard: http://127.0.0.1:35035/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:40111,
Local directory: /tmp/dask-scratch-space/worker-qj08eev0,Local directory: /tmp/dask-scratch-space/worker-qj08eev0

0,1
Comm: tcp://127.0.0.1:39283,Total threads: 2
Dashboard: http://127.0.0.1:44543/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:36823,
Local directory: /tmp/dask-scratch-space/worker-wdxdz8tb,Local directory: /tmp/dask-scratch-space/worker-wdxdz8tb

0,1
Comm: tcp://127.0.0.1:42915,Total threads: 2
Dashboard: http://127.0.0.1:42003/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:43949,
Local directory: /tmp/dask-scratch-space/worker-xcdrsrd5,Local directory: /tmp/dask-scratch-space/worker-xcdrsrd5


Total memory usage across all workers: 1.26 GB
Total computation time: 77.14 seconds


### What can we do when we have large task graphs / large memory footprint? 
One downside of using a fully delayed computation approach is that it can lead to the creation of large task graphs that are difficult to manage. This can result in excessive memory consumption as the Dask scheduler struggles to handle numerous interdependent tasks.

To address this issue, we can simplify the task graph by computing and saving each indicator one at a time. This method ensures that Dask completes the calculation and writing of the first indicator, then releases the memory used for that computation before moving on to the second indicator. By processing each indicator sequentially, the memory footprint is reduced, and the scheduler has fewer tasks to manage at any given time.

In [12]:
# NBVAL_IGNORE_OUTPUT

start_time = time.time()

with Client(n_workers=5, threads_per_worker=2, memory_limit="4GB") as client:
    display(client)

    url = "https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/dodsC/datasets/reanalyses/day_ERA5-Land_NAM.ncml"
    ds = xr.open_dataset(url, chunks={"time": 366, "lat": 50, "lon": 50})

    ds = ds.sel(
        time=slice("1981-01-01", "1981-12-31"), lat=slice(35, 65), lon=slice(-100, -60)
    )

    ds["tasmax"].attrs["cell_methods"] = "time: maximum"
    ds["tasmin"].attrs["cell_methods"] = "time: minimum"

    indicators = [atmos.heat_wave_total_length, atmos.heat_wave_frequency]

    with TemporaryDirectory(prefix="output2") as tempdir2:
        for indicator in indicators:
            ds_out = xr.Dataset(attrs=ds.attrs)

            out = indicator(ds=ds, freq="YS")

            out = out.chunk({"time": -1, "lat": 50, "lon": 50})
            ds_out[out.name] = out

            output_path = Path(tempdir2.name).joinpath(
                f"heatwaves_ex2/{out.name}_1981.zarr"
            )
            output_path.parent.mkdir(parents=True, exist_ok=True)

            if not output_path.exists():
                # save to Zarr, triggering computation immediately
                ds_out.to_zarr(output_path)

        # fetch memory usage from all workers and display the total usage
        worker_memory = client.run(lambda: psutil.Process().memory_info().rss)
        total_memory = sum(worker_memory.values())
        print(f"Total memory usage across all workers: {total_memory / 1e9:.2f} GB")

end_time = time.time()
print(f"Total computation time: {end_time - start_time:.2f} seconds")

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 5
Total threads: 10,Total memory: 18.63 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:38913,Workers: 5
Dashboard: http://127.0.0.1:8787/status,Total threads: 10
Started: Just now,Total memory: 18.63 GiB

0,1
Comm: tcp://127.0.0.1:42089,Total threads: 2
Dashboard: http://127.0.0.1:44733/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:33511,
Local directory: /tmp/dask-scratch-space/worker-2q5owxhc,Local directory: /tmp/dask-scratch-space/worker-2q5owxhc

0,1
Comm: tcp://127.0.0.1:34459,Total threads: 2
Dashboard: http://127.0.0.1:43461/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:34549,
Local directory: /tmp/dask-scratch-space/worker-xwk51ct_,Local directory: /tmp/dask-scratch-space/worker-xwk51ct_

0,1
Comm: tcp://127.0.0.1:41171,Total threads: 2
Dashboard: http://127.0.0.1:37373/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:37253,
Local directory: /tmp/dask-scratch-space/worker-axm0lvtz,Local directory: /tmp/dask-scratch-space/worker-axm0lvtz

0,1
Comm: tcp://127.0.0.1:33777,Total threads: 2
Dashboard: http://127.0.0.1:37925/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:44565,
Local directory: /tmp/dask-scratch-space/worker-dxgg4wk6,Local directory: /tmp/dask-scratch-space/worker-dxgg4wk6

0,1
Comm: tcp://127.0.0.1:32999,Total threads: 2
Dashboard: http://127.0.0.1:35735/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:34381,
Local directory: /tmp/dask-scratch-space/worker-o1y5drv9,Local directory: /tmp/dask-scratch-space/worker-o1y5drv9


Total memory usage across all workers: 1.47 GB
Total computation time: 138.56 seconds


Even with the sequential computation approach, there may be scenarios where the output data is still too large to write in a single step, for example when calculating heatwave indicators over the entire North America domain. In such cases, an effective strategy is to split the dataset into smaller, manageable spatial chunks, which would allow for more efficient processing and data writing.

For instance, we can calculate the heatwave indicators over the entire dataset spanning North America, and then divide the results into smaller latitude bins (e.g., groups of 50 latitudes each). Each bin can then be processed and saved individually to a temporary `.zarr` file using Dask. Once all bins are saved, they can be merged back into a single dataset and written out as a final `.zarr` file. This method distributes the data processing load and helps minimize the strain on system resources.

In [13]:
# NBVAL_IGNORE_OUTPUT

start_time = time.time()

url = "https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/dodsC/datasets/reanalyses/day_ERA5-Land_NAM.ncml"
ds = xr.open_dataset(url, chunks={"time": 366, "lat": 50, "lon": 50})

# keep the entire North America domain and select a single year
ds = ds.sel(time=slice("1981-01-01", "1981-12-31"))

ds["tasmax"].attrs["cell_methods"] = "time: maximum"
ds["tasmin"].attrs["cell_methods"] = "time: minimum"

# create an empty output dataset
dsout = xr.Dataset(attrs=ds.attrs)

# calculation on the entire dataset
out = atmos.heat_wave_total_length(ds=ds, freq="YS")
dsout[out.name] = out

with TemporaryDirectory(prefix="output3") as tempdir3:
    outdir_ex3 = Path(tempdir3.name).joinpath(f"heatwaves_ex3/spatial_ex")
    outdir_ex3.mkdir(parents=True, exist_ok=True)
    outzarr3 = outdir_ex3.joinpath(f"{out.name}_1981.zarr")

    if not outzarr3.exists():

        # if data is too big to write in single step - make individual datasets by binning the `lat` dim (approx n=50 latitudes at a time)
        grp_dim = "lat"
        bins = round(len(dsout.lat) / 50)
        _, datasets = zip(*dsout.groupby_bins(grp_dim, bins))
        print(f"Number of individual datasets: {len(datasets)}")
        if sum([len(d[grp_dim]) for d in datasets]) == len(dsout[grp_dim]):
            print("Data is binned correctly.")

        # export each chunk of 50-latitudes to a temporary location
        with TemporaryDirectory(prefix="output") as outtmp:
            for ii, dds in enumerate(datasets):
                dds = dds.chunk(time=-1, lon=50, lat=50)
                filename = Path(outtmp).joinpath(f"{ii}.zarr")
                with Client(
                    n_workers=5, threads_per_worker=2, memory_limit="4GB"
                ) as client:
                    display(client)
                    save_to_zarr(
                        ds=dds,
                        filename=filename,
                        mode="o",
                    )
                    # clear the output for the next iteration
                    clear_output(wait=True)

            # reassemble pieces and export joined
            inzarrs = sorted(list(filename.parent.glob(f"*.zarr")))

            # open the files as a combined multi-file dataset
            ds = xr.open_mfdataset(inzarrs, engine="zarr", decode_timedelta=False)

            # define the final chunking configuration
            final_chunks = dict(time=-1, lon=50, lat=50)

            # save the final combined dataset
            tmpzarr = Path(outtmp).joinpath(outzarr3.name)
            with Client(n_workers=10) as c:
                display(client)
                save_to_zarr(
                    ds=ds.chunk(final_chunks),
                    filename=tmpzarr,
                    mode="o",
                )
            # move the final combined file to the output location
            outzarr3.parent.mkdir(exist_ok=True, parents=True)
            shutil.move(tmpzarr, outzarr3)

end_time = time.time()
print(f"Total computation time: {end_time - start_time:.2f} seconds")

0,1
Connection method: Cluster object,Cluster type: distributed.LocalCluster
Dashboard: http://127.0.0.1:8787/status,

0,1
Dashboard: http://127.0.0.1:8787/status,Workers: 5
Total threads: 10,Total memory: 18.63 GiB
Status: running,Using processes: True

0,1
Comm: tcp://127.0.0.1:36997,Workers: 5
Dashboard: http://127.0.0.1:8787/status,Total threads: 10
Started: Just now,Total memory: 18.63 GiB

0,1
Comm: tcp://127.0.0.1:46675,Total threads: 2
Dashboard: http://127.0.0.1:45847/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:46091,
Local directory: /tmp/dask-scratch-space/worker-hnvv8uu5,Local directory: /tmp/dask-scratch-space/worker-hnvv8uu5

0,1
Comm: tcp://127.0.0.1:44297,Total threads: 2
Dashboard: http://127.0.0.1:39407/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:32805,
Local directory: /tmp/dask-scratch-space/worker-daqwqzzv,Local directory: /tmp/dask-scratch-space/worker-daqwqzzv

0,1
Comm: tcp://127.0.0.1:41215,Total threads: 2
Dashboard: http://127.0.0.1:43245/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:36569,
Local directory: /tmp/dask-scratch-space/worker-rii7oq8g,Local directory: /tmp/dask-scratch-space/worker-rii7oq8g

0,1
Comm: tcp://127.0.0.1:41093,Total threads: 2
Dashboard: http://127.0.0.1:36645/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:37445,
Local directory: /tmp/dask-scratch-space/worker-1tiolw29,Local directory: /tmp/dask-scratch-space/worker-1tiolw29

0,1
Comm: tcp://127.0.0.1:41465,Total threads: 2
Dashboard: http://127.0.0.1:41187/status,Memory: 3.73 GiB
Nanny: tcp://127.0.0.1:41611,
Local directory: /tmp/dask-scratch-space/worker-ljs4tzdv,Local directory: /tmp/dask-scratch-space/worker-ljs4tzdv


2025-02-27 16:11:15,627 - distributed.protocol.core - CRITICAL - Failed to deserialize
Traceback (most recent call last):
  File "/home/tjs/miniforge3/envs/pavics-sdi/lib/python3.10/site-packages/zarr/core.py", line 202, in _load_metadata_nosync
    meta_bytes = self._store[mkey]
  File "/home/tjs/miniforge3/envs/pavics-sdi/lib/python3.10/site-packages/zarr/storage.py", line 1120, in __getitem__
    raise KeyError(key)
KeyError: 'heat_wave_total_length/.zarray'

The above exception was the direct cause of the following exception:

Traceback (most recent call last):
  File "/home/tjs/miniforge3/envs/pavics-sdi/lib/python3.10/site-packages/distributed/protocol/core.py", line 175, in loads
    return msgpack.loads(
  File "msgpack/_unpacker.pyx", line 194, in msgpack._cmsgpack.unpackb
  File "/home/tjs/miniforge3/envs/pavics-sdi/lib/python3.10/site-packages/distributed/protocol/core.py", line 172, in _decode_default
    return pickle.loads(sub_header["pickled-obj"], buffers=sub_frames)
Fi

KeyboardInterrupt: 

Alternatively, if the goal is to calculate heatwave indicators over a longer time period but for a smaller spatial extent, memory usage can be managed more efficiently by splitting the data along the time dimension. In this example, I calculate the indicator over a 20-year period so, after performing the heatwave calculations, I split the results into 5-year intervals, save each interval as a smaller Zarr file, and reassemble them into the final dataset in the last step. You can try different interval sizes depending on your period of analysis.

In addition, when initally loading the input dataset, I set the chunk size of the `time` dimension to 4 years and 1 day. This choice is based on the need to account for leap years, ensuring that each chunk contains a consistent number of days while spanning multiple years. Chunking by 4 years instead of 1 year improves computational efficiency and reduces excessive I/O operations for this specific case, as the heatwave indicator analysis spans multiple years. 

In [None]:
# NBVAL_IGNORE_OUTPUT

start_time = time.time()

url = "https://pavics.ouranos.ca/twitcher/ows/proxy/thredds/dodsC/datasets/reanalyses/day_ERA5-Land_NAM.ncml"

# set the chunk size of the time dimension to 4 years + 1 day
ds = xr.open_dataset(url, chunks={"time": (365 * 4) + 1, "lat": 50, "lon": 50})

# this time, let's focus our analysis on a smaller region and a longer time period
ds = ds.sel(
    time=slice("1981-01-01", "1990-12-31"),
    lat=slice(42.1, 65.0),
    lon=(slice(-80.0, -73.5)),
)

ds["tasmax"].attrs["cell_methods"] = "time: maximum"
ds["tasmin"].attrs["cell_methods"] = "time: minimum"

dsout = xr.Dataset(attrs=ds.attrs)

out = atmos.heat_wave_total_length(ds=ds, freq="YS")
dsout[out.name] = out

with TemporaryDirectory(prefix="output4") as tempdir4:
    outdir4 = Path(tempdir4.name).joinpath(f"heatwaves_ex3/temporal_ex")
    outdir4.mkdir(parents=True, exist_ok=True)
    outzarr4 = outdir4.joinpath(f"{out.name}_1981-1990.zarr")

    if not outzarr4.exists():

        # resample into 5-year intervals
        _, datasets = zip(*dsout.resample(time="5YS"))
        print(f"Number of individual datasets: {len(datasets)}")

        # export each 5-year chunk to a temporary location
        with TemporaryDirectory(prefix="output") as outtmp:
            for ii, dds in enumerate(datasets):
                dds = dds.chunk(dict(time=-1, lon=50, lat=50))
                filename = Path(outtmp).joinpath(f"{ii}.zarr")
                with Client(
                    n_workers=5, threads_per_worker=2, memory_limit="4GB"
                ) as client:
                    display(client)
                    save_to_zarr(
                        ds=dds,
                        filename=filename,
                        mode="o",
                    )
                    # clear the output for the next iteration
                    clear_output(wait=True)

            # reassemble the 5-year chunks into a single dataset
            inzarrs = sorted(list(filename.parent.glob(f"*.zarr")))

            # open the files as a combined multi-file dataset
            ds = xr.open_mfdataset(inzarrs, engine="zarr", decode_timedelta=False)

            # define the final chunking configuration
            final_chunks = dict(time=12 * 50, lon=50, lat=50)

            # save the final combined dataset
            tmpzarr = Path(outtmp).joinpath(outzarr4.name)
            with Client(n_workers=10) as c:
                display(client)
                save_to_zarr(
                    ds=ds.chunk(final_chunks),
                    filename=tmpzarr,
                    mode="o",
                )
            # move the final combined file to the output location
            outzarr4.parent.mkdir(exist_ok=True, parents=True)
            shutil.move(tmpzarr, outzarr4)

end_time = time.time()
print(f"Total computation time: {end_time - start_time:.2f} seconds")

### Key takeaways from this tutorial:
* Dask is a parallel computing library that enables efficient processing of large datasets and complex computations by distributing tasks across multiple cores or machines
* When opening a dataset, align in-memory chunks with the dataset's on-disk chunking to minimize memory usage and reduce unnecessary I/O operations
* Use delayed tasks to optimize the execution plan and save outputs in Zarr format, which supports parallel data writing
* When dealing with large task graphs, simplify the process by handling tasks sequentially (e.g., process one climate indicator at a time) to keep the computation managable 
* If the data is too large to write in a single step, split the output into smaller, temporary Zarr files based on spatial or temporal segments relevant to your analysis; after splitting, reassemble these smaller files into the final Zarr file
* Monitor memory usage and task progress using the Dask Dashboard, and adjust the number of workers, threads, and memory limits as needed to maintain balanced resource utilization