First install dask-jobqueue:
conda install dask-jobqueue -c conda-forge

In [1]:
from dask_jobqueue import SLURMCluster
cluster = SLURMCluster(cores=8,processes=4,project="notchpeak-shared-short",queue="notchpeak-shared-short",memory='8g')

This will define a Dask execution object with 4 processes, each using 2 cores. Each job is submitted with one task, and 8 cores per task. The above will just define the job, the below will submit one such jobs, each using the 8 cores. Note that notchpeak-shared-short allows max 2 running jobs per user, so, if we're running this notebook in notchpeak-shared-short via Open OnDemand we can only use one more job (worker).

In [2]:
from distributed import Client
from dask import delayed

cluster.start_workers(1)
client = Client(cluster)

We can see what job script the Dask uses, which is useful in figuring out the SLURM task / CPU usage mapping.

In [3]:
print(cluster.job_script())

#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -p notchpeak-shared-short
#SBATCH -A notchpeak-shared-short
#SBATCH -n 1
#SBATCH --cpus-per-task=8
#SBATCH --mem=8G
#SBATCH -t 00:30:00
JOB_ID=${SLURM_JOB_ID%;*}



/uufs/chpc.utah.edu/common/home/u0101881/software/pkg/miniconda3/bin/python -m distributed.cli.dask_worker tcp://10.242.75.81:39045 --nthreads 2 --nprocs 4 --memory-limit 2.00GB --name dask-worker--${JOB_ID}-- --death-timeout 60



In [4]:
client

0,1
Client  Scheduler: tcp://10.242.75.81:39045  Dashboard: http://10.242.75.81:8787/status,Cluster  Workers: 4  Cores: 8  Memory: 8.00 GB


Now we'll run the same embarrassingly parallel example, but using the SLURM job that Dask started. Note that the code is the same as in the local Dask run.

In [5]:
import time
import random

def costly_simulation(list_param):
    time.sleep(random.random())
    return sum(list_param)

In [6]:
import pandas as pd
import numpy as np

input_params = pd.DataFrame(np.random.random(size=(500, 4)),
                            columns=['param_a', 'param_b', 'param_c', 'param_d'])
input_params.head()

Unnamed: 0,param_a,param_b,param_c,param_d
0,0.442653,0.474941,0.418876,0.273237
1,0.357813,0.531603,0.656388,0.31108
2,0.534062,0.904749,0.743362,0.367175
3,0.28119,0.76331,0.606051,0.872185
4,0.349374,0.107803,0.064185,0.179712


In [7]:
import dask
lazy_results = []

for parameters in input_params.values:
    lazy_result = dask.delayed(costly_simulation)(parameters)
    lazy_results.append(lazy_result)

futures = dask.persist(*lazy_results)  # trigger computation in the background

In [8]:
%time results = dask.compute(*futures)
results[:5]

CPU times: user 2.47 s, sys: 430 ms, total: 2.9 s
Wall time: 29.5 s


(1.6097077549704961,
 1.8568845125651061,
 2.5493481296713503,
 2.5227364817882205,
 0.7010738471783476)

When we are done with using Dask, we cancel the job that runs the Dask workers.

In [12]:
cluster.close()

distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
distributed.utils - ERROR - 
Traceback (most recent call last):
  File "/uufs/chpc.utah.edu/common/home/u0101881/software/pkg/miniconda3/lib/python3.7/site-packages/distributed/utils.py", line 666, in log_errors
    yield
  File "/uufs/chpc.utah.edu/common/home/u0101881/software/pkg/miniconda3/lib/python3.7/site-packages/distributed/client.py", line 1276, in _close
    await gen.with_timeout(timedelta(seconds=2), list(coroutines))
concurrent.futures._base.CancelledError
distributed.utils - ERROR - 
Traceback (most recent call last):
  File "/uufs/chpc.utah.edu/common/home/u0101881/software/pkg/miniconda3/lib/python3.7/site-packages/distributed/utils.py", line 666, in log_errors
    yield
  File "/uufs/chpc.utah.edu/common/home/u0101881/software/pkg/miniconda3/lib/python3.7/site-packages/distributed/client.py", line 1005, in _reconnect
    await self._close()
  File "/uufs/chpc.utah.edu/com