# Dask Job Queue on Summit

**Emily Costa**

07/31/2019
***

This tutorial will use the Python packages, pyUSID and pyrcoscopy, as examples. As the parallel_compute() function in pyUSID does not scale up to multi-node machines, Dask_jobqueue can be used to automate computation to clusters and supercomputers. This tutorial uses Summit at Oak Ridge National Laboratory, but can be applied to HPC systems that use LFS files to submit and deploy jobs. 

It is strongly recommended that Jupyter Lab is used on the remote machine to facilitate the interactive session.

**Note**: The contents of this tutorial can be run in an interactive session on Summit. Please open this Jupyter Lab from a Summit login node.

## Jupyter Lab on Summit 
In order to properly complete and run this tutorial, follow these instructions to open Jupyter Lab on the remote machine, Summit.
***
**Open this Jupyter Notebook using Jupyter Lab on a Summit login node.**

Note: If the full repo containing this Jupyter Notebook does not exist in your current Summit path on the login node, it will automatically download. If it does, it will make sure the repo is up-to-date.
***
1. On summit login node, run this script and leave terminal running:
> `source /gpfs/alpine/gen011/scratch/ecost020/jupyterlab.sh`

2. Open a new terminal and run this on your local machine:
> `ssh -L 8887:localhost:8887 user@login##.summit.olcf.ornl.gov`

3. Now, open `localhost:8887` on your local browser and navigate to this Jupyter Notebook.
***
Now, let us begin the tutorial.

Import all necessary modules

In [None]:
from dask_jobqueue import LFSCluster
from dask.distributed import Client, LocalCluster
import pyUSID as usid


In the following format 
> `parameter: default     # Option flag and description`
    
    name: dask-worker           # -N option, name of job.

***Dask worker options***
    
    cores: null                 # Total number of cores per job.
    memory: null                # Total amount of memory per job.
    processes: 1                # Number of Python processes per job

    interface: null             # Network interface to use like eth0 or ib0
    death-timeout: 60           # Number of seconds to wait if a worker can not find a scheduler
    local-directory: null       # Fast local storage location (/scratch, $TMPDIR)

***LFS resource manager options***
    
    shebang: "#!/usr/bin/env bash"    # Command language
    queue: null                       # -q option
    project: null                     # -A option
    walltime: '00:30:00'              # -l walltime: xx:xx:xx
    extra: []                         # Additional arguments to pass to `dask-worker`
    env-extra: []                     # Other commands to add to script before launching worker.
    job-extra: []                     # Additional commands
    log-directory: null               # Directory to use for job scheduler logs.
    lsf-units: null                   # Unit system for large units in resource usage set by the LSF_UNIT_FOR_LIMITS in the lsf.conf file of a cluster.
    kwargs: null                      # Additional keyword arguments to pass to `LocalCluster`
    
***Summit Specific Flags***

These can be added to job-extra, a list of additional flags.

In order to accomodate an interactive session, let us add a few more commands. Create a list of additional commands then put it in job_extra parameter.

In [None]:
job_extra = list[]
# Command to begin interactive session, which we will open in terminal later.
job_extra.append('-Is')
# Job name
job_extra.append('-J dask-jq')
# Compute specs
job_extra.append('-nnodes2')    
# Local SSD storage request
# Hardware threat

Set up nodes and LFS script for requesting then running a job on Summit.

The job request will mimick the following one:
> `bsub -P project_name -J job_name -W 1:30 -nnodes 2 -B -alloc_flags nvme -Is /bin/bash`

In [None]:
local_directory = '/.' # Current directory, feel free to change.

cluster = LSFCluster(name='HelloWorld',
                     project='gen011',
                     walltime='0:01:00',
                     job-extra=job_extra
                     local-directory=local_directory)

## Dask-jobqueue Scaling
Now, let us mess around with some of the features of dask-jobqueue.
***


In [None]:
cluster.scale(2) # Number of workers, or jobs, you wish to schedule
# or
# Talk about later?: cluster.adapt(minimum=18, maximum=360) # Scale between 18 and 360 workers or 1 and 

Instantiate the Client class, which will be how you interact with the cluster. 

In [None]:
client = Client(cluster)

Now the rest of the code will resemble dask’s local schedulers.