# Dask jobqueue example

## What is Dask jobqueue? (<https://jobqueue.dask.org/>)

* deploys Dask workers on typical HPC job queueing systems

## Monte-Carlo estimate with multiple Dask batch job workers

We define a Dask jobqueue cluster with Dask workers that each have 8 CPUs and 48 GB of memory.

In [1]:
import dask, dask.distributed
import dask_jobqueue

In [2]:
cluster = dask_jobqueue.SLURMCluster(

    # Dask worker size
    cores=8, memory='48GB',
    processes=1, # Dask workers per job
    
    # SLURM job script things
    queue='cluster', walltime='00:15:00',
    
    # Dask worker network and temporary storage
    interface='ib0', local_directory='$TMPDIR',
)

client = dask.distributed.Client(cluster)
cluster.adapt(minimum=16, maximum=16)

<distributed.deploy.adaptive.Adaptive at 0x151d8ea44160>

In [3]:
client

0,1
Client  Scheduler: tcp://172.18.4.11:38827  Dashboard: http://172.18.4.11:8787/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


### From here everything is (almost) the same

We'll return the Dask array for `pi` and handle computation more explicitly.

In [4]:
import numpy, dask.array

def calculate_pi(size_in_bytes, number_of_chunks):
    
    """Calculate pi using a Monte Carlo method."""
    
    array_shape = (int(size_in_bytes / 8 / 2), 2)
    chunk_size = (int(array_shape[0] / number_of_chunks), 2)
    
    # 2D random positions array using dask.array
    xy = dask.array.random.uniform(
        low=0.0, high=1.0, size=array_shape,
        # specify chunk size, i.e. task number
        chunks=chunk_size )
  
    xy_inside_circle = (xy ** 2).sum(axis=1) < 1 # boolean

    pi = 4 * xy_inside_circle.sum() / xy_inside_circle.size
        
    return pi

### Let's calculate again...

Note the `.compute()`.

In [5]:
%time pi = calculate_pi(size_in_bytes=10_000_000_000, number_of_chunks=100).compute() # 10 GB

CPU times: user 341 ms, sys: 52.8 ms, total: 394 ms
Wall time: 786 ms


In [6]:
%time pi = calculate_pi(size_in_bytes=100_000_000_000, number_of_chunks=250).compute() # 100 GB

CPU times: user 816 ms, sys: 44.8 ms, total: 861 ms
Wall time: 3.08 s


### Alternative way for handling computation

In [7]:
pi = calculate_pi(size_in_bytes=100_000_000_000, number_of_chunks=250)
pi = client.compute(
    pi
)
print(pi)
print(pi.result())

<Future: pending, key: finalize-c6ccfe955bda2fb4210039cdebc6dafd>
3.14158823232


## What happens if a worker dies?

We'll find out all "our" job ids, mark a few of them non-preemptible, filter for the preemptible jobs, and define a function to kill one randomly selected preemptible job.

In [8]:
def get_current_jobs():
    current_jobs = !squeue | grep R | grep $USER | grep dask | awk '{print $1}'
    return current_jobs

In [9]:
non_preemptible_jobs = get_current_jobs()[:8]
non_preemptible_jobs

['47918', '47919', '47920', '47921', '47922', '47923', '47924', '47925']

In [10]:
def get_preemptible_jobs():
    return list(filter(lambda j: j not in non_preemptible_jobs, get_current_jobs()))

In [11]:
get_preemptible_jobs()

['47926', '47927', '47928', '47929', '47930', '47931', '47932', '47933']

In [12]:
import random

def kill_random_preemptible_job():
    preemptible_jobs = get_preemptible_jobs()
    if preemptible_jobs:
        worker_to_kill = random.choice(preemptible_jobs)
        print(f"will cancel job {worker_to_kill}")
        !scancel {worker_to_kill}

In [13]:
from time import sleep

In [14]:
print(get_preemptible_jobs())
kill_random_preemptible_job()
sleep(1)
print(get_preemptible_jobs())

['47926', '47927', '47928', '47929', '47930', '47931', '47932', '47933']
will cancel job 47932
['47926', '47927', '47928', '47929', '47930', '47931', '47933']


In [17]:
!squeue -u $USER

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON) 
             47934   cluster dask-wor smomw122 PD       0:00      1 (Priority) 
             47918   cluster dask-wor smomw122  R       1:44      1 neshcl203 
             47919   cluster dask-wor smomw122  R       1:44      1 neshcl203 
             47920   cluster dask-wor smomw122  R       1:44      1 neshcl203 
             47921   cluster dask-wor smomw122  R       1:44      1 neshcl214 
             47922   cluster dask-wor smomw122  R       1:44      1 neshcl214 
             47923   cluster dask-wor smomw122  R       1:44      1 neshcl214 
             47924   cluster dask-wor smomw122  R       1:44      1 neshcl216 
             47925   cluster dask-wor smomw122  R       1:44      1 neshcl216 
             47926   cluster dask-wor smomw122  R       1:44      1 neshcl216 
             47927   cluster dask-wor smomw122  R       1:44      1 neshcl216 
             47928   cluster dask-wor smomw1

## Let's start a computation with disappearing workers

In [29]:
pi = calculate_pi(
    size_in_bytes=1_000_000_000_000, number_of_chunks=10_000
)
display(pi)

Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Count,63338 Tasks,1 Chunks
Type,float64,numpy.ndarray
Array Chunk Bytes 8 B 8 B Shape () () Count 63338 Tasks 1 Chunks Type float64 numpy.ndarray,,

Unnamed: 0,Array,Chunk
Bytes,8 B,8 B
Shape,(),()
Count,63338 Tasks,1 Chunks
Type,float64,numpy.ndarray


In [30]:
pi = client.compute(pi)
print(pi)

<Future: pending, key: finalize-7463e029451ffcef396af73f3510904c>


In [None]:
sleep(5)

while not pi.done():
    kill_random_preemptible_job()
    sleep(5)

will cancel job 47941
will cancel job 47928
will cancel job 47933
will cancel job 47926
will cancel job 47943
will cancel job 47929
will cancel job 47936
will cancel job 47940


In [None]:
print(pi)

## And get the result

In [23]:
print(pi.result())

3.14160183008


## What happened?

The Dask scheduler keeps a suspiciousness counter for each task it manages.  Whenever a worker dies, all tasks that belong to the worker at the time of its death will have their suspiciousness increased by one. In doing so, the scheduler has no way of telling which exact task was responsible for the death of the worker and just flag all of them as bad.

All tasks with suspiciousness `>= 3` (default) are considered bad and won't be rescheduled.

## Make dask more resilient

We can increase the number of allowed failures.  Let's practically disable the threshold and re-do the calculation.

In [None]:
cluster.scheduler.allowed_failures = 1000

_(Note that the above is internal API that we need to use to increase the number of allowed failures for now.  With the current Dask.distributed release that we can't, however, use with Dask jobqueue yet, this can be changed by changing the Dask configuration at runtime.)_

In [None]:
pi = calc_pi_mc(1e12, 500e6)

In [None]:
pi = client.compute(pi)
print(pi)

In [None]:
sleep(5)

while not pi.done():
    kill_random_preemptible_job()
    sleep(10)

In [None]:
print(pi)

In [None]:
print(pi.result())