# Preemptive Dask jobqueue cluster
Specifies an adaptive cluster and a very simple job preemption mechanism, which simulates using Dask jobqueue clusters on preemptive cluster node resources.

Developed for usage on JUWELS, but can be used for "benchmarking" other machines by adjusting the Dask jobqueue cluster node specification and the workflow parameters, i.e. especially the `workload_chunk_size_in_megabytes`, which should be set according to the actually specified worker memory/threads and processes.

## Workflow parameters
Has an impact upon the total runtime of the Jupyter notebook and the workload defined below.

In [1]:
workload_size_in_terabytes = 2
workload_chunk_size_in_megabytes = 160 # ~memory/cpu, i.e. Dask worker specific!
print(f"there will be {(workload_size_in_terabytes*1e12)/(workload_chunk_size_in_megabytes*1e6)} chunks to process")

there will be 12500.0 chunks to process


In [2]:
wait_for_jobqueue_cluster_startup_in_seconds = 60
target_cluster_size_min = 7 # jobs and/or nodes
target_cluster_size_max = 8 # needs to be greater than min!
cluster_resilience = 1000

In [3]:
repeat_workloads = 3
sleep_between_repeat_workloads_in_seconds = 10

Specify preemption "aggressiveness" of the utilized node resources.

In [4]:
preemption_timeout_in_seconds=30
jobs_to_preempt = 2

Set necessary environment variables for use in the preemption simulator shell script.

In [5]:
%env preemption_timeout_in_seconds={preemption_timeout_in_seconds}
%env jobs_to_preempt={jobs_to_preempt}

env: preemption_timeout_in_seconds=30
env: jobs_to_preempt=2


## Setup Python environment

In [6]:
%run tech-preamble.py

## Manually kill obsolete Dask workers
Make sure existing/pending Dask worker jobs don't interfere with the scheduler load produced below.

In [7]:
!squeue -u $USER | grep dask | awk '{print $1}' | xargs -I {} scancel {}

In [8]:
time.sleep(5)

In [9]:
!squeue -u $USER

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)


## Dask jobqueue cluster

In [10]:
!sinfo -t idle --format="%9P %.5a %.5D %.5t"

PARTITION AVAIL NODES STATE
batch*       up  1567  idle
devel        up    18  idle
mem192       up   236  idle
esm          up     5  idle
large      down  1567  idle
gpus         up    21  idle
develgpus    up     8  idle
maint        up  1619  idle


In [11]:
jobqueue_cluster = dask_jobqueue.SLURMCluster(
    project="esmtst", queue="devel", walltime="00:15:00",
    cores=96, memory='79GiB', processes=1,
    interface="ib0", scheduler_options={},
    #local_directory="$SCRATCH_cesmtst/hoeflich1",
    local_directory="/tmp",
    log_directory="dask-jobqueue-logs",
)

In [12]:
jobqueue_cluster.scheduler.allowed_failures = cluster_resilience

In [13]:
print(jobqueue_cluster.job_script())

#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -e dask-jobqueue-logs/dask-worker-%J.err
#SBATCH -o dask-jobqueue-logs/dask-worker-%J.out
#SBATCH -p devel
#SBATCH -A esmtst
#SBATCH -n 1
#SBATCH --cpus-per-task=96
#SBATCH --mem=79G
#SBATCH -t 00:15:00

/gpfs/software/juwels/stages/Devel-2019a/software/Python/3.6.8-GCCcore-8.3.0/bin/python -m distributed.cli.dask_worker tcp://10.13.0.157:43509 --nthreads 96 --memory-limit 84.83GB --name name --nanny --death-timeout 60 --local-directory /tmp --interface ib0



In [14]:
client = dask_distributed.Client(jobqueue_cluster)

In [15]:
client

0,1
Client  Scheduler: tcp://10.13.0.157:43509  Dashboard: /user/khoeflich@geomar.de/jupyterlab_1/proxy/8787/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


In [16]:
jobqueue_cluster.adapt(
    minimum_jobs=target_cluster_size_min,
    maximum_jobs=target_cluster_size_max
)

<distributed.deploy.adaptive.Adaptive at 0x7f08c8394400>

In [17]:
time.sleep(wait_for_jobqueue_cluster_startup_in_seconds)

In [18]:
!squeue -u $USER

             JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)
           3028208     devel dask-wor hoeflich PD       0:00      1 (None)
           3028209     devel dask-wor hoeflich PD       0:00      1 (None)
           3028210     devel dask-wor hoeflich PD       0:00      1 (None)
           3028211     devel dask-wor hoeflich PD       0:00      1 (None)
           3028212     devel dask-wor hoeflich PD       0:00      1 (None)
           3028213     devel dask-wor hoeflich PD       0:00      1 (None)
           3028214     devel dask-wor hoeflich PD       0:00      1 (None)


## Start preemption simulator

In [19]:
%%script bash --bg --out stdout --err stderr

echo $preemption_timeout_in_seconds
echo $jobs_to_preempt

while true; do

sleep $preemption_timeout_in_seconds

scancel $(squeue -u $USER | grep R | grep dask | awk '{print $1}' | \
          shuf --random-source=/dev/urandom -n $jobs_to_preempt)

done

## Workload example

In [20]:
%run define-pi-workload.py

In [21]:
# the worker status seems broken, needs a fix in Dask jobqueue?
def get_running_worker_jobs(cluster):
    worker_jobs = [
        cluster.workers[worker_id].job_id
        for worker_id in cluster.workers.keys()
        if (cluster.workers[worker_id].status == 'running')
    ]
    return sorted(worker_jobs)

# assumes that running Dask worker jobs are already/still connected to the cluster scheduler
def get_running_worker_jobs(cluster):
    worker_jobs = os.popen("squeue -u $USER | grep dask | grep R | awk '{ print $1 }'").read().strip('\n').split('\n')
    return sorted(worker_jobs)

In [22]:
for workload_cycle in range(repeat_workloads):

    start = time.time()
    worker_jobs_begin = get_running_worker_jobs(jobqueue_cluster)
    pi = calculate_pi(
        size_in_terabytes=workload_size_in_terabytes,
        chunk_size_in_megabytes=workload_chunk_size_in_megabytes
    ).compute()
    worker_jobs_end = get_running_worker_jobs(jobqueue_cluster)
    elapse = time.time() - start

    print(f"workload cycle: {workload_cycle}")
    print(f"jobs before: {worker_jobs_begin}")
    print(f"pi estimate: {pi}")
    print(f"pi error: {abs(pi - numpy.pi)}")
    print(f"wall time: {elapse : 7.3f}s")
    print(f"jobs after: {worker_jobs_end}")
    !squeue -u $USER | grep dask | awk '{print $1, $2, $3, $4, $5, $6, $7, $8}'
    print(f"")

    time.sleep(sleep_between_repeat_workloads_in_seconds)

workload cycle: 0
jobs before: ['']
pi estimate: 3.141588778848
pi error: 3.874741793197245e-06
wall time:  91.651s
jobs after: ['3028208', '3028210', '3028211']

3028208 devel dask-wor hoeflich CG 1:08 1 jwc00n002
3028210 devel dask-wor hoeflich CG 1:08 1 jwc00n005
3028216 devel dask-wor hoeflich PD 0:00 1 (None)
3028211 devel dask-wor hoeflich R 1:09 1 jwc00n007



KeyboardInterrupt: 

## Stop preemption simulator

In [23]:
%killbgscripts

All background processes were killed.


In [24]:
print(stdout.read().decode("ascii"))

30
2



In [25]:
print(stderr.read().decode("ascii"))

scancel: error: No job identification provided
scancel: error: No job identification provided
scancel: error: No job identification provided
scancel: error: No job identification provided
scancel: error: No job identification provided
scancel: error: No job identification provided
scancel: error: No job identification provided
scancel: error: No job identification provided
scancel: error: No job identification provided
scancel: error: No job identification provided
scancel: error: No job identification provided
scancel: error: No job identification provided



## Jupyter kernel env

In [26]:
!module --redirect list


Currently Loaded Modules:
  1) Stages/Devel-2019a                       ([1;31mS[0m)
  2) [2mGCCcore/.8.3.0[0m                           (H)
  3) [2mbinutils/.2.32[0m                           (H)
  4) [2mimkl/.2019.3.199[0m                         (H)
  5) [2mbzip2/.1.0.6[0m                             (H)
  6) [2mzlib/.1.2.11[0m                             (H)
  7) [2mncurses/.6.1[0m                             (H)
  8) [2mlibreadline/.8.0[0m                         (H)
  9) Tcl/8.6.9
 10) [2mSQLite/.3.27.2[0m                           (H)
 11) [2mexpat/.2.2.6[0m                             (H)
 12) [2mlibpng/.1.6.36[0m                           (H)
 13) [2mfreetype/.2.10.0[0m                         (H)
 14) [2mgperf/.3.1[0m                               (H)
 15) [2mutil-linux/.2.33.1[0m                       (H)
 16) [2mfontconfig/.2.13.1[0m                       (H)
 17) X11/20190311
 18) [2mTk/.8.6.9[0m                                (H)
 19) GMP/

In [27]:
!pip list

Package                            Version  
---------------------------------- ---------
absl-py                            0.8.1    
aiohttp                            3.6.2    
alabaster                          0.7.12   
alembic                            1.0.8    
altair                             3.3.0    
ansi2html                          1.5.2    
ansiwrap                           0.8.4    
apipkg                             1.5      
appdirs                            1.4.3    
appmode                            0.6.0    
argcomplete                        1.9.5    
arviz                              0.5.1    
asn1crypto                         0.24.0   
astroid                            2.3.3    
async-generator                    1.10     
async-timeout                      3.0.1    
atomicwrites                       1.3.0    
attrs                              19.1.0   
autobahn                           19.10.1  
Automat                            0.8.0    
autopep8  