# Dask jobqueue example

## What is Dask jobqueue? (https://jobqueue.dask.org/)

* deploys Dask workers on typical HPC job queueing systems

## Monte-Carlo estimate with multiple Dask batch job workers
We define a Dask jobqueue cluster with Dask workers that each have 4 CPUs and 24 GB of memory.

In [1]:
import dask, dask.distributed, os
import dask_jobqueue

In [2]:
# look up further Dask configurations in local directory
additional_config = dask.config.collect(paths=['.'])
dask.config.update(dask.config.config, additional_config, priority='new');

In [3]:
dask.config.get('jobqueue.juwels-jobqueue-config')

{'cores': 96,
 'memory': '90000M',
 'processes': 1,
 'local-directory': '/tmp',
 'death-timeout': 60,
 'extra': ['--host ${SLURMD_NODENAME}.ib.juwels.fzj.de'],
 'interface': None,
 'shebang': '#!/usr/bin/env bash',
 'walltime': '00:15:00',
 'log-directory': 'dask_jobqueue_logs',
 'name': 'dask-worker',
 'queue': None,
 'project': None,
 'job-cpu': None,
 'job-mem': None,
 'job-extra': [],
 'env-extra': []}

In [4]:
cluster = dask_jobqueue.SLURMCluster(
    config_name='juwels-jobqueue-config',
    project='esmtst', # specify budget name associated with project
    queue='esm', # choose queue by available resources
    scheduler_options={"host": os.environ['HOSTNAME']}, # globally visible local scheduler network location
    cores=16  # divide into 16 processes
)

client = dask.distributed.Client(cluster)
cluster.scale(jobs=1)

In [11]:
client

0,1
Client  Scheduler: tcp://10.11.159.193:43091  Dashboard: http://10.11.159.193:8787/status,Cluster  Workers: 1  Cores: 16  Memory: 90.00 GB


### What is a jobqueue cluster?
The above is all we need to specify to run the computation on compute node Dask workers. 
Let's have a look at what's happening in the background.

In [9]:
!squeue -u {os.environ["USER"]}

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
           2524579       esm dask-wor    rath1 CF       0:03      1 jwc00n003


In [10]:
print(cluster.job_script())

#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -e dask_jobqueue_logs/dask-worker-%J.err
#SBATCH -o dask_jobqueue_logs/dask-worker-%J.out
#SBATCH -p esm
#SBATCH -A esmtst
#SBATCH -n 1
#SBATCH --cpus-per-task=16
#SBATCH --mem=84G
#SBATCH -t 00:15:00

/p/home/jusers/rath1/juwels/PROJECT_training2005/2020-08_dask_intro/miniconda3/envs/py3_dask/bin/python -m distributed.cli.dask_worker tcp://10.11.159.193:43091 --nthreads 16 --memory-limit 90.00GB --name name --nanny --death-timeout 60 --local-directory /tmp --host ${SLURMD_NODENAME}.ib.juwels.fzj.de



### Let's scale up the cluster

In [12]:
cluster.scale(jobs=5)

In [15]:
!squeue -u {os.environ["USER"]}

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
           2524581       esm dask-wor    rath1 PD       0:00      1 (None)
           2524582       esm dask-wor    rath1 PD       0:00      1 (None)
           2524583       esm dask-wor    rath1 PD       0:00      1 (None)
           2524580       esm dask-wor    rath1  R       0:07      1 jwc00n006
           2524579       esm dask-wor    rath1  R       0:20      1 jwc00n003


In [14]:
client

0,1
Client  Scheduler: tcp://10.11.159.193:43091  Dashboard: http://10.11.159.193:8787/status,Cluster  Workers: 1  Cores: 16  Memory: 90.00 GB


### From here everything is the same as with LocalCluster

In [16]:
import numpy, dask.array

def calculate_pi(size_in_bytes, number_of_chunks):
    
    """Calculate pi using a Monte Carlo method."""
    
    array_shape = (int(size_in_bytes / 8 / 2), 2) # tuple
    chunk_size = (int(array_shape[0] / number_of_chunks), 2) # tuple
    
    # 2D random positions array using dask.array
    xy = dask.array.random.uniform(
        low=0.0, high=1.0, size=array_shape,
        # specify chunk size, i.e. task number
        chunks=chunk_size )
  
    xy_inside_circle = (xy ** 2).sum(axis=1) < 1 # boolean

    pi = 4 * xy_inside_circle.sum() / xy_inside_circle.size
    
    # start Dask calculation
    pi = pi.compute()

    print(f"\nfrom {xy.nbytes / 1e9} GB randomly chosen positions")
    print(f"   pi estimate: {pi}")
    print(f"   pi error: {abs(pi - numpy.pi)}\n")
    # display(xy)
    
    return pi

### Let's calculate again...

In [17]:
%time pi = calculate_pi(size_in_bytes=10_000_000_000, number_of_chunks=100) # 10 GB


from 10.0 GB randomly chosen positions
   pi estimate: 3.1415992768
   pi error: 6.6232102069463394e-06

CPU times: user 319 ms, sys: 29.2 ms, total: 349 ms
Wall time: 1.49 s


In [18]:
%time pi = calculate_pi(size_in_bytes=100_000_000_000, number_of_chunks=250) # 100 GB


from 100.0 GB randomly chosen positions
   pi estimate: 3.14160130624
   pi error: 8.652650206997237e-06

CPU times: user 902 ms, sys: 71.7 ms, total: 973 ms
Wall time: 9.75 s


In [19]:
%time pi = calculate_pi(size_in_bytes=1_000_000_000_000, number_of_chunks=1000) # 1 TB


from 1000.0 GB randomly chosen positions
   pi estimate: 3.14160087136
   pi error: 8.21777020698633e-06

CPU times: user 8.35 s, sys: 631 ms, total: 8.98 s
Wall time: 57.8 s


### We can scale up the cluster whenever needed

In [20]:
cluster.scale(jobs=16)

In [21]:
!squeue -u {os.environ["USER"]}

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
           2524584       esm dask-wor    rath1 PD       0:00      1 (Resources)
           2524581       esm dask-wor    rath1  R       0:33      1 jwc00n009
           2524582       esm dask-wor    rath1  R       0:33      1 jwc00n012
           2524583       esm dask-wor    rath1  R       0:33      1 jwc00n015
           2524580       esm dask-wor    rath1  R       1:20      1 jwc00n006
           2524579       esm dask-wor    rath1  R       1:33      1 jwc00n003


In [22]:
client

0,1
Client  Scheduler: tcp://10.11.159.193:43091  Dashboard: http://10.11.159.193:8787/status,Cluster  Workers: 5  Cores: 80  Memory: 450.00 GB


### Let's calculate again...

In [23]:
%time pi = calculate_pi(size_in_bytes=1_000_000_000_000, number_of_chunks=1000) # 1 TB


from 1000.0 GB randomly chosen positions
   pi estimate: 3.141598500224
   pi error: 5.846634206996271e-06

CPU times: user 6.32 s, sys: 591 ms, total: 6.91 s
Wall time: 42.3 s


In [24]:
# %time pi = calculate_pi(size_in_bytes=10_000_000_000_000, number_of_chunks=10000) # 10 TB

### Note, we could also adaptively scale the jobqueue cluster!

Dask jobqueue is able to scale total worker number based on problem size. You can also specify a target duration.

In [25]:
cluster.adapt(
    minimum=1, maximum=5,  # between 1 and 5 jobs
    target_duration="60s"  # try to be done after 60 seconds
)

<distributed.deploy.adaptive.Adaptive at 0x2b062ba7f9d0>

In [28]:
!squeue -u {os.environ["USER"]}  # will scale down to one job if idle

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
           2524579       esm dask-wor    rath1  R       2:48      1 jwc00n003


In [29]:
# short calculations don't lead to scaling up
%time pi = calculate_pi(size_in_bytes=10_000_000_000, number_of_chunks=100) # 10 GB


from 10.0 GB randomly chosen positions
   pi estimate: 3.1415473728
   pi error: 4.528078979326722e-05

CPU times: user 417 ms, sys: 29.6 ms, total: 447 ms
Wall time: 2.38 s


In [30]:
# longer calculations will try to increase cluster size:
%time pi = calculate_pi(size_in_bytes=1_000_000_000_000, number_of_chunks=1000) # 1 TB


from 1000.0 GB randomly chosen positions
   pi estimate: 3.141599735488
   pi error: 7.081898206973136e-06

CPU times: user 7.21 s, sys: 582 ms, total: 7.79 s
Wall time: 46.5 s


In [31]:
!squeue -u {os.environ["USER"]}

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
           2524603       esm dask-wor    rath1  R       0:43      1 jwc00n006
           2524604       esm dask-wor    rath1  R       0:43      1 jwc00n009
           2524605       esm dask-wor    rath1  R       0:43      1 jwc00n012
           2524606       esm dask-wor    rath1  R       0:43      1 jwc00n015
           2524579       esm dask-wor    rath1  R       4:10      1 jwc00n003
