These instructions describe how to use dask with SLURM on the CSU INCUS servers (downdraft + microburst[1-3]). 

Last updated: June 11, 2025 

For questions, contact Bee Leung (gabrielle.leung@colostate.edu)

# Prerequisites

If you don't have them yet, install dask, dask distributed, and dask jobqueue. Something like this on your command line:

    mamba install -n incus -c conda-forge dask distributed dask-jobqueue

# Set-up dask on SLURM

In [18]:
from dask_jobqueue import SLURMCluster
from dask.distributed import Client

In [19]:
# spin up SLURM cluster
cluster = SLURMCluster(cores=10, # this is cpus-per-task
                       processes=10, # number of processes for each job; if not specified, this is usually sqrt(cores) 
                       memory='20GB', # total memory to be divided among all workers
                       account='incus',
                       walltime='08:00:00', # hours:minutes:seconds
                       scheduler_options={'dashboard_address':':10101'}, # change this to a port you want to use for monitoring
                       job_extra_directives=['--partition=all',
                                             '--job-name=dask-test']) # change this to your job name

# set up our dask client and tell it to connect to the SLURM cluster
# this is where all our tasks get submitted to
client = Client(cluster)

Tips:

* Do some testing to see how many cores and how much memory you need to request. You’ll need to do some trial and error to see what works for your purposes. 

* You can also explicitly specify how many processes and how many threads you request, which can help with optimization (I am not an expert in this). 

* If you choose a port that is already in use (e.g., if you try to use my favorite port below 🙂), it outputs a message saying which port it is forwarding the dashboard to. 

SLURMCluster will create a job script and submit it for us under the hood. To make sure what it's submitting makes sense, run the cell below. It should output something similar to the sample SLURM submission script Peter sent out via email.

If there are any errors (e.g., you're missing an important SLURM parameter), it may throw an error message below.

In [20]:
print(cluster.job_script())

#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -A incus
#SBATCH -n 1
#SBATCH --cpus-per-task=10
#SBATCH --mem=19G
#SBATCH -t 08:00:00
#SBATCH --partition=all
#SBATCH --job-name=dask-test

/home/gleung/miniforge3/envs/incusres/bin/python -m distributed.cli.dask_worker tcp://129.82.20.217:46411 --name dummy-name --nthreads 1 --memory-limit 1.86GiB --nworkers 10 --nanny --death-timeout 60



At this point, you can check that you can see the dask dashboard a.k.a. the <em>dask</em>board and make sure port forwarding is working. You can use the daskboard to monitor progress with the tasks you submit, as well as how much memory you’re using.

On your local machine, go to “downdraft.atmos.colostate.edu:[PORT]/status”, where [PORT] is the number we set above when we created the cluster. So far, we haven’t set up any tasks, so the page should load but there isn't much on it yet. 

Depending on how you’ve set up your .ssh file, the above might not work immediately. In that case, go to your terminal and run the following (remember to input your port!):
    
    ssh -L 1111:localhost:[PORT] downdraft
    
You can then use your browser to go to “[localhost](http://localhost):1111/status” to see the daskboard. Feel free to change the port on your localhost accordingly.

Now we are ready to start some workers. 

In [21]:
cluster.scale(n=10) # requesting 10 workers, which will be split up into however many jobs are needed to fit into your cluster defined above

# You can also put in how many cores, jobs, or memory here, as described: https://jobqueue.dask.org/en/latest/clusters-howitworks.html.

After running the above cell, you should see some bars pop up on your daskboard under "Bytes stored per worker". The workers are ready to go!

Note that the cluster attributes we specified above refer to each job. So if you start 4 jobs here, you will actually request 4 x 5 = 20 cores. 

At this point, check if job is running on SLURM. In the terminal (while logged onto downdraft), run:

    squeue

If you can see your job on squeue, everything is working!

# Simple example

First, start with a very simple example of reading in a RAMS output file and doing a computation. Similar examples: https://tutorial.xarray.dev/intermediate/xarray_and_dask.html

In [22]:
import xarray as xr

cp = 1004 # specific heat of water at constant pressure [J kg^-1 K^-1] in RAMS

In [23]:
# read in one RAMS file
ds = xr.open_dataset(
        '/monsoon/MODEL/LES_MODEL_DATA/V1/DRC1.1-R-V1/G3/out_30s/a-L-2016-12-30-130000-g1.h5',
        phony_dims="access",
        engine="h5netcdf",
        chunks="auto", # this is needed so xarray uses dask under the hood
    )

# just keep the variables we need for simplicity
ds = ds[['PI', # Exner function x cp 
       'THETA', # potential temperature [K]
       ]]

# rename dimensions to correspond with RAMS output
ds = ds.rename_dims({'phony_dim_3':'z',
                     'phony_dim_1':'y',
                     'phony_dim_2':'x'})

In [24]:
ds

Unnamed: 0,Array,Chunk
Bytes,635.13 MiB,127.87 MiB
Shape,"(232, 962, 746)","(136, 564, 437)"
Dask graph,8 chunks in 2 graph layers,8 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 635.13 MiB 127.87 MiB Shape (232, 962, 746) (136, 564, 437) Dask graph 8 chunks in 2 graph layers Data type float32 numpy.ndarray",746  962  232,

Unnamed: 0,Array,Chunk
Bytes,635.13 MiB,127.87 MiB
Shape,"(232, 962, 746)","(136, 564, 437)"
Dask graph,8 chunks in 2 graph layers,8 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,635.13 MiB,127.87 MiB
Shape,"(232, 962, 746)","(136, 564, 437)"
Dask graph,8 chunks in 2 graph layers,8 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 635.13 MiB 127.87 MiB Shape (232, 962, 746) (136, 564, 437) Dask graph 8 chunks in 2 graph layers Data type float32 numpy.ndarray",746  962  232,

Unnamed: 0,Array,Chunk
Bytes,635.13 MiB,127.87 MiB
Shape,"(232, 962, 746)","(136, 564, 437)"
Dask graph,8 chunks in 2 graph layers,8 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


Notice that the variables in our dataset are dask.arrays! These are already chunked, so any operation is executed on one chunk at a time and only when we call it (i.e., "lazy loading"). This is good for memory management. 

In [25]:
# very simple computation, let's calculate air temperature

ds = ds.assign(TEMP = ds.THETA * ds.PI/cp)

In [26]:
ds

Unnamed: 0,Array,Chunk
Bytes,635.13 MiB,127.87 MiB
Shape,"(232, 962, 746)","(136, 564, 437)"
Dask graph,8 chunks in 2 graph layers,8 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 635.13 MiB 127.87 MiB Shape (232, 962, 746) (136, 564, 437) Dask graph 8 chunks in 2 graph layers Data type float32 numpy.ndarray",746  962  232,

Unnamed: 0,Array,Chunk
Bytes,635.13 MiB,127.87 MiB
Shape,"(232, 962, 746)","(136, 564, 437)"
Dask graph,8 chunks in 2 graph layers,8 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,635.13 MiB,127.87 MiB
Shape,"(232, 962, 746)","(136, 564, 437)"
Dask graph,8 chunks in 2 graph layers,8 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 635.13 MiB 127.87 MiB Shape (232, 962, 746) (136, 564, 437) Dask graph 8 chunks in 2 graph layers Data type float32 numpy.ndarray",746  962  232,

Unnamed: 0,Array,Chunk
Bytes,635.13 MiB,127.87 MiB
Shape,"(232, 962, 746)","(136, 564, 437)"
Dask graph,8 chunks in 2 graph layers,8 chunks in 2 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray

Unnamed: 0,Array,Chunk
Bytes,635.13 MiB,127.87 MiB
Shape,"(232, 962, 746)","(136, 564, 437)"
Dask graph,8 chunks in 6 graph layers,8 chunks in 6 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 635.13 MiB 127.87 MiB Shape (232, 962, 746) (136, 564, 437) Dask graph 8 chunks in 6 graph layers Data type float32 numpy.ndarray",746  962  232,

Unnamed: 0,Array,Chunk
Bytes,635.13 MiB,127.87 MiB
Shape,"(232, 962, 746)","(136, 564, 437)"
Dask graph,8 chunks in 6 graph layers,8 chunks in 6 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


If you are looking at your daskboard, you should see that nothing is happening! We haven't actually executed any computations (yet).

In [27]:
# subsetting so this example runs quickly
surf_temp = ds.TEMP.sel(z=1)

When you run the cell below, you will actually send the computation to our dask client for execution. You should see bars moving around on the daskboard.

In [28]:
surf_temp.mean().compute()

The above was probably not the best example because this calculation is fairly simple. The overhead introduced from the dask scheduler is similar, if not more than, the actual computation time.

# Better example

Let's do a more realistic use case. Say I want to parallelize the above calculation over multiple times!

In [None]:
# define series of functions that do what we did above for easier reading

# RAMS reader
def read_rams_output(path: str, 
                     variables: list[str]=['PI', # Exner function x cp 
                                            'THETA', # potential temperature [K]
                                            ]
                    ):
    """
    Basic function to read in RAMS output from a given path

    Arguments:
        path -- path to file

    Keyword Arguments:
        variables -- list of RAMS variables to keep (default: {['PI','THETA']})
    """    

    # read in one RAMS file
    ds = xr.open_dataset(
            path,
            phony_dims="access",
            engine="h5netcdf",
            chunks="auto", # this is needed so xarray uses dask under the hood
        )

    # just keep the variables we need for simplicity
    ds = ds[variables]

    # rename dimensions to correspond with RAMS output
    ds = ds.rename_dims({'phony_dim_3':'z',
                        'phony_dim_1':'y',
                        'phony_dim_2':'x'})
    
    return(ds)

# surf temp calculator
def calculate_mean_surf_temp(ds):
    ds = ds.sel(z=1)
    ds = ds.assign(TEMP = ds.THETA * ds.PI/cp)
    return(ds.TEMP.mean())

In [54]:
# define list of paths for which i want to calculate

paths = ['/monsoon/MODEL/LES_MODEL_DATA/V1/DRC1.1-R-V1/G3/out_30s/a-L-2016-12-30-121500-g1.h5',
         '/monsoon/MODEL/LES_MODEL_DATA/V1/DRC1.1-R-V1/G3/out_30s/a-L-2016-12-30-123000-g1.h5',
         '/monsoon/MODEL/LES_MODEL_DATA/V1/DRC1.1-R-V1/G3/out_30s/a-L-2016-12-30-124500-g1.h5',
         '/monsoon/MODEL/LES_MODEL_DATA/V1/DRC1.1-R-V1/G3/out_30s/a-L-2016-12-30-130000-g1.h5']

We will submit each job to our client using "client.map". The first argument is the function you want to run, followed by a list it will iterate over. If any keywords should be shared across all iterations, specify/name the keyword in the call to client.map.

In [58]:
ds = client.map(read_rams_output, paths, variables=['PI','THETA'])
ds = client.map(calculate_mean_surf_temp, ds)

This will return a series of dask 'Futures', which are independent tasks that have not yet been executed.

In [59]:
ds 

[<Future: finished, type: xarray.core.dataarray.DataArray, key: calculate_mean_surf_temp-26a7520becedb1fc27335dee465b0a00>,
 <Future: finished, type: xarray.core.dataarray.DataArray, key: calculate_mean_surf_temp-8385c5cf4a32d16a5d8ac163d5459dc9>,
 <Future: finished, type: xarray.core.dataarray.DataArray, key: calculate_mean_surf_temp-22a07964037763cf96da19b8e96cb124>,
 <Future: finished, type: xarray.core.dataarray.DataArray, key: calculate_mean_surf_temp-f835c838614a175f158ae68d6397531d>]

We then need to "gather" those futures into a list (in this case, this is a list of dataarrays, since that's what our function "calculate_mean_surf_temp" returns).

In [60]:
ds = client.gather(ds)
ds

[<xarray.DataArray 'TEMP' ()>
 dask.array<mean_agg-aggregate, shape=(), dtype=float32, chunksize=(), chunktype=numpy.ndarray>,
 <xarray.DataArray 'TEMP' ()>
 dask.array<mean_agg-aggregate, shape=(), dtype=float32, chunksize=(), chunktype=numpy.ndarray>,
 <xarray.DataArray 'TEMP' ()>
 dask.array<mean_agg-aggregate, shape=(), dtype=float32, chunksize=(), chunktype=numpy.ndarray>,
 <xarray.DataArray 'TEMP' ()>
 dask.array<mean_agg-aggregate, shape=(), dtype=float32, chunksize=(), chunktype=numpy.ndarray>]

In [None]:
# concatenate the list
ds = xr.concat(ds,dim='time')
ds

Unnamed: 0,Array,Chunk
Bytes,16 B,4 B
Shape,"(4,)","(1,)"
Dask graph,4 chunks in 45 graph layers,4 chunks in 45 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray
"Array Chunk Bytes 16 B 4 B Shape (4,) (1,) Dask graph 4 chunks in 45 graph layers Data type float32 numpy.ndarray",4  1,

Unnamed: 0,Array,Chunk
Bytes,16 B,4 B
Shape,"(4,)","(1,)"
Dask graph,4 chunks in 45 graph layers,4 chunks in 45 graph layers
Data type,float32 numpy.ndarray,float32 numpy.ndarray


Now we finally actually compute the values and see what the output is. Remember to save this object (ds = ds.compute()) if you want to do future operations on it (e.g., plot, save).

In [62]:
ds.compute()

# Closing up

In [63]:
client.close()
cluster.close()