# How to customize your Dask jobqueue cluster

covers the following aspects, i.e. how to
* choose a NEC Linux cluster queue
* adjust Dask jobqueue worker resources
* scale Dask clusters adaptively

## Choose a NEC Linux cluster queue

The [NEC Linux cluster system](https://www.rz.uni-kiel.de/de/angebote/hiperf/nec-linux-cluster) has a theoretical number of 198 batch nodes with 24-32 CPUs and 128-384 GB memory available per execution host. Dask jobqueue workers are intended to have rather short lifetimes and would therefore survive with the walltime limits set for the `clexpress` batch queue. However, the very limited node/execution host number might lead to very unpredictable Dask worker job execution starting times (note the [spiky workload tendency](https://nbviewer.jupyter.org/github/ExaESM-WP4/nesh-monitoring/blob/v2020.01.24.1/analysis.ipynb)), which is not good for a default configuration. Therefore, the batch class `clmedium` has been chosen as default. A [relatively stable number of 200-400 idle CPUs](https://nbviewer.jupyter.org/github/ExaESM-WP4/nesh-monitoring/blob/v2020.01.24.1/analysis.ipynb) (at least during the considered log period!) is observed and (not too big!) Dask jobqueue clusters should always immediately start/connect. However, as available resources generally fluctuate with present user behaviour (which might change!), below you find an example on how to get an overview on currently available CPU resources and how to set up your Dask jobqueue cluster in a different than the default batch class.

### Get details on available batch class resources

Ther [recommended way](https://www.rz.uni-kiel.de/de/angebote/hiperf/nec-linux-cluster) of getting an overview on available batch queue resources is via the `qcl` command.

In [1]:
!qcl

 Batch class  Walltime[h]  Max.cores/node  Max.RAM[gb]  Total[*]  Used[*]  Avail[*]  Run.jobs/user
 -----------  -----------  --------------  -----------  --------  -------  --------  -------------
 clexpress              2              32          192         2        2         0              -
 clmedium              48              32          192       116      116         0             20
 cllong               100              32          192        49       49         0             10
 clbigmem             200              32          384         8        8         0              2
 clfo2                200              24          128        17       17         0              8
 feque                  1              32           64         1        1         0              -
 -----------  -----------  --------------  -----------  --------  -------  --------  -------------
 Sum                                                         193      193         0
 -----------  -----------

However, the NEC Linux cluster is a shared host/node system and `qcl` doesn't provide useful details on actually allocated resources. The following `qstat` command lists free resources of the currently active nodes/execution hosts grouped alphabetically by batch queue class, filtered for nodes that have at least one unoccupied CPU. It gives an overview on theoretically available resources and might help with choosing a batch queue for your Dask jobqueue cluster. Please note, that each batch class has its own running job number limit that constrains the total number of Dask workers and hence your Dask cluster size.

In [2]:
%%bash
qstat -E -F fcpu,fmem1,ucpu,umem1,ehost,quenm | head -n2 # provide header
#qstat -E -F fcpu,fmem1,ucpu,umem1,ehost,quenm | awk '{ if ($1>0 && substr($5,1,6)=="neshcl") { print } }' | sort -k6 # raw output
qstat -E -F fcpu,fmem1,ucpu,umem1,ehost,quenm | awk '{ if ($1>0 && substr($5,1,6)=="neshcl") { printf "%9d%11.3f%10d%11.3f %-15s %-15s\n",$1,$2/2**8,$3,$4/2**8,$5,$6 } }' | sort -k6 # memory units in GB

Free_CPUs  Free_Mem1 Used_CPUs  Used_Mem1 ExecutionHost   QueueName      
--------- ---------- --------- ---------- --------------- ---------------
       16    334.586        16     48.125 neshcl219       clbigmem       
       30    363.465         2     19.246 neshcl224       clbigmem       
       30    373.582         2      9.129 neshcl225       clbigmem       
       31    376.113         1      6.598 neshcl220       clbigmem       
       31    377.371         1      5.340 neshcl221       clbigmem       
        3    110.418        21     17.477 neshcl216       clfo2          
        4    101.266        20     26.629 neshcl209       clfo2          
        4    101.629        20     26.266 neshcl211       clfo2          
        4    101.742        20     26.152 neshcl201       clfo2          
        4    102.043        20     25.852 neshcl207       clfo2          
        4    102.625        20     25.270 neshcl217       clfo2          
        4    105.160        20     22.

### Start Dask jobqueue cluster in a different batch class
Manually choosing a batch queue for your Dask jobqueue cluster is as simple as using the default configuration and calling the `PBSCluster` method with the `queue` keyword argument. This simply alters the PBS header specifying the batch class in the Dask worker job script.

In [1]:
import os; os.environ['DASK_CONFIG']='.'
import dask.config; dask.config.get('jobqueue')

{'nesh-jobqueue-config': {'cores': 4,
  'memory': '24GB',
  'processes': 1,
  'queue': 'clmedium',
  'resource_spec': 'elapstim_req=00:45:00,cpunum_job=4,memsz_job=24gb',
  'interface': 'ib0',
  'local-directory': '/scratch',
  'walltime': None,
  'job-extra': ['-o dask_jobqueue_logs/dask-worker.o%s',
   '-e dask_jobqueue_logs/dask-worker.e%s'],
  'project': None,
  'name': 'dask-worker',
  'death-timeout': 60,
  'extra': [],
  'env-extra': [],
  'log-directory': None,
  'shebang': '#!/bin/bash'}}

In [2]:
import dask_jobqueue
custom_queue_cluster = dask_jobqueue.PBSCluster(config_name='nesh-jobqueue-config',queue='clbigmem')

In [3]:
print(custom_queue_cluster.job_script())

#!/bin/bash

#PBS -N dask-worker
#PBS -q clbigmem
#PBS -l elapstim_req=00:45:00,cpunum_job=4,memsz_job=24gb
#PBS -o dask_jobqueue_logs/dask-worker.o%s
#PBS -e dask_jobqueue_logs/dask-worker.e%s
JOB_ID=${PBS_JOBID%%.*}

/sfs/fs6/home-geomar/smomw260/miniconda3/envs/dask-minimal-20191218/bin/python -m distributed.cli.dask_worker tcp://192.168.31.10:32866 --nthreads 4 --memory-limit 24.00GB --name name --nanny --death-timeout 60 --local-directory /scratch --interface ib0



## Adjust Dask jobqueue worker resources
Getting details on currently available batch class resources might also help in guiding your choice on single Dask worker resources. The default values are chosen to allocate one eighth (a subjective choice!) of the average `clmedium` batch class host resources. However, depending on your dataset characteristics, the calculation/operation properties and the currently occupied Linux cluster resources, other values might be more useful.

Please note, that available [resources at the NEC Linux cluster system are mostly CPU limited, with plenty of memory available](https://nbviewer.jupyter.org/github/ExaESM-WP4/nesh-monitoring/blob/v2020.01.24.1/analysis.ipynb). Below you therefore find an example of how to decrease single Dask worker resources in terms of CPU allocation. Such Dask workers might occasionally better squeeze into the available NEC Linux cluster resources. In terms of total available Dask cluster resources, you can compensate the smaller single Dask worker size by increasing the total number of Dask jobqueue workers, i.e. using a larger value for the `jobs` keyword in `cluster.scale(jobs=4)`.

However, this approach might both increase or decrease total execution time of your calculation, depending on the task types associated with your operation and the actual load on the node/execution host. If you play around with jobqueue worker resources (and total worker numbers) never forget that your total Dask cluster size is not only limited by the available resources and the batch class running job number limit, but also by the resources demanded by other already queued jobs (which influence the starting times of your Dask jobqueue workers!).

In [1]:
import os; os.environ['DASK_CONFIG']='.'
import dask.config; dask.config.get('jobqueue')

{'nesh-jobqueue-config': {'cores': 4,
  'memory': '24GB',
  'processes': 1,
  'queue': 'clmedium',
  'resource_spec': 'elapstim_req=00:45:00,cpunum_job=4,memsz_job=24gb',
  'interface': 'ib0',
  'local-directory': '/scratch',
  'walltime': None,
  'job-extra': ['-o dask_jobqueue_logs/dask-worker.o%s',
   '-e dask_jobqueue_logs/dask-worker.e%s'],
  'project': None,
  'name': 'dask-worker',
  'death-timeout': 60,
  'extra': [],
  'env-extra': [],
  'log-directory': None,
  'shebang': '#!/bin/bash'}}

In [2]:
import dask_jobqueue
custom_worker_cluster = dask_jobqueue.PBSCluster(config_name='nesh-jobqueue-config',
                                                # Define smaller Linux cluster resources per Dask worker.
                                                resource_spec='elapstim_req=00:45:00,cpunum_job=2,memsz_job=24gb',
                                                # Adjust Dask worker resource limits.
                                                cores=2 )

In [3]:
print(custom_worker_cluster.job_script())

#!/bin/bash

#PBS -N dask-worker
#PBS -q clmedium
#PBS -l elapstim_req=00:45:00,cpunum_job=2,memsz_job=12gb
#PBS -o dask_jobqueue_logs/dask-worker.o%s
#PBS -e dask_jobqueue_logs/dask-worker.e%s
JOB_ID=${PBS_JOBID%%.*}

/sfs/fs6/home-geomar/smomw260/miniconda3/envs/dask-minimal-20191218/bin/python -m distributed.cli.dask_worker tcp://192.168.31.10:38568 --nthreads 2 --memory-limit 12.00GB --name name --nanny --death-timeout 60 --local-directory /scratch --interface ib0



## Scale Dask clusters adaptively
You can operate a Dask jobqueue cluster using fixed scaling with the `cluster.scale(jobs=2)` method, or by using adaptive scaling with the `cluster.adapt(minimum=2, maximum=10)` method. For more examples on advanced scaling methods see for example [this Dask jobqueue workshop materials notebook](https://nbviewer.jupyter.org/github/willirath/dask_jobqueue_workshop_materials/blob/v1.1.0/notebooks/03_tuning_adaptive_clusters.ipynb).