# Dask Workflow - Local

Demo notebooks 02a and 02b will show you the recommended way to work with Dask on a cluster.

Requirements:
 - Cluster has been added and configured, as shown in notebook 01.
 - `idact` is installed on the cluster using pip.

## Initial setup

Add `idact` to path and import it:

In [1]:
import sys
import os
import bitmath
import logging
import subprocess

sys.path.append('../')  # For running in repo. Unnecessary with pip install.
from idact import *

## Load cluster

In [2]:
load_environment()
cluster = show_cluster("pro")  # replace with your cluster name if necessary
cluster

Cluster(pro.cyfronet.pl, 22, plggarstka, auth=AuthMethod.PUBLIC_KEY, key='C:\\Users\\Maciej/.ssh\\id_rsa_zp', install_key=False, disable_sshd=False)

Make sure it's properly configured. Replace the following with correct values for your cluster, or skip if already configured:

In [3]:
cluster.config.setup_actions.jupyter = ['module load plgrid/tools/python-intel/3.6.2']
cluster.config.setup_actions.dask = cluster.config.setup_actions.jupyter
cluster.config.scratch = '$SCRATCH'

save_environment()

Get access node:

In [4]:
node = cluster.get_access_node()
node

Node(pro.cyfronet.pl:22, None)

Make sure authentication is set up correctly:

In [5]:
node.connect()
save_environment()

In [6]:
node.run('whoami')

'plggarstka'

In [7]:
node.run('hostname')

'login01.pro.cyfronet.pl'

## Push environment

If this is your first time working on the cluster from a remote notebook,
you may want to push the current environment. Alternatively, just `add_cluster` and perform other configuration steps on the remote notebook, as demonstrated in demo notebook 01.

In [8]:
push_environment(cluster)

2018-11-18 16:12:39 INFO: Pushing the environment to cluster.


## Allocate nodes

You will need to allocate compute nodes for Jupyter notebook an Dask:

In [9]:
nodes = cluster.allocate_nodes(nodes=2,
                               cores=2,
                               memory_per_node=bitmath.GiB(10),
                               walltime=Walltime(minutes=20),
                               native_args={
                                   '--partition': 'plgrid-testing',
                                   '--account': 'intdata'
                               })

2018-11-18 16:12:46 INFO: Creating the ssh directory.


In [10]:
nodes

Nodes([Node(NotAllocated),Node(NotAllocated)], SlurmAllocation(job_id=14237582))

Wait until the nodes are allocated:

In [11]:
nodes.wait()
nodes

Nodes([Node(p0649:58077, 2018-11-18 15:32:54.142725+00:00),Node(p0662:55811, 2018-11-18 15:32:54.142725+00:00)], SlurmAllocation(job_id=14237582))

### Push the nodes to cluster (optional)

It may be useful to access the allocated nodes from another notebook.

Push the nodes to the cluster. You will be able to access them later by calling `pull_deployments`.

In [12]:
cluster.push_deployment(nodes)

2018-11-18 16:12:59 INFO: Pushing deployment: Nodes([Node(p0649:58077, 2018-11-18 15:32:54.142725+00:00),Node(p0662:55811, 2018-11-18 15:32:54.142725+00:00)], SlurmAllocation(job_id=14237582))


## Deploy notebook

You will be working from a remote Jupyter Notebook deployed on the cluster:

In [13]:
nb = nodes[0].deploy_notebook()
nb

JupyterDeployment(8080 -> Node(p0649:58077, 2018-11-18 15:32:54.142725+00:00)

Open the remote notebook in a new tab:

In [14]:
nb.open_in_browser()

### Push the notebook to cluster (optional)

You can push the notebook deployment as well:

In [15]:
cluster.push_deployment(nb)

2018-11-18 16:13:15 INFO: Pushing deployment: JupyterDeployment(8080 -> Node(p0649:58077, 2018-11-18 15:32:54.142725+00:00)


## Deploy Dask

Deploy Dask using the following command:

In [16]:
dd = deploy_dask(nodes)
dd

2018-11-18 16:13:22 INFO: Deploying Dask on 2 nodes.
2018-11-18 16:13:22 INFO: Connecting to p0649:58077 (1/2).
2018-11-18 16:13:22 INFO: Connecting to p0662:55811 (2/2).
2018-11-18 16:13:24 INFO: Deploying scheduler on the first node: p0649.
2018-11-18 16:13:41 INFO: Checking scheduler connectivity from p0649 (1/2).
2018-11-18 16:13:41 INFO: Checking scheduler connectivity from p0662 (2/2).
2018-11-18 16:13:41 INFO: Deploying workers.
2018-11-18 16:13:41 INFO: Deploying worker 1/2.
2018-11-18 16:14:27 INFO: Deploying worker 2/2.
2018-11-18 16:14:43 INFO: Validating worker 1/2.
2018-11-18 16:14:43 INFO: Validating worker 2/2.


DaskDeployment(scheduler=tcp://localhost:44303/tcp://172.20.66.139:44303, workers=2)

Get the Dask client:

In [17]:
client = dd.get_client()
client

0,1
Client  Scheduler: tcp://localhost:44303  Dashboard: http://localhost:56991/status,Cluster  Workers: 2  Cores: 4  Memory: 21.47 GB


Dask provides dashboards for the scheduler and each worker:

In [18]:
dd.diagnostics.addresses

['http://localhost:56991/status',
 'http://localhost:59470/main',
 'http://localhost:59700/main']

To open all dashboards, execute the line below. You can also click the scheduler dashboard link under `get_client` above.

In [19]:
dd.diagnostics.open_all()

### Performing computations with Dask

You shouldn't use Dask from your local computer, due to likely Python and library version mismatches.

Even if your Python environment matched the cluster exactly, the amount of data that could be transferred to your local computer could prove overwhelming.

Instead, you should push the Dask deployment to the cluster, and use the Dask client from there:

In [20]:
cluster.push_deployment(dd)

2018-11-18 16:14:44 INFO: Pushing deployment: DaskDeployment(scheduler=tcp://localhost:44303/tcp://172.20.66.139:44303, workers=2)


## Copy the next notebook to the cluster

Drag and drop `02b-DaskWorkflow-Remote.ipynb` to the deployed notebook, and open it there.

## Follow the instructions in notebook 02b

Follow the instructions until you are referred back to this notebook.

## Monitor node resources

While working with Dask, it may be useful to monitor resource usage on nodes:

In [21]:
nodes[0].resources.memory_total

GiB(10.0)

In [22]:
nodes[0].resources.cpu_cores

2

In [23]:
nodes[0].resources.memory_usage

GiB(0.3787040710449219)

In [24]:
nodes[1].resources.memory_usage

GiB(0.20428466796875)

In [25]:
nodes[0].resources.cpu_usage

4.0

In [26]:
nodes[1].resources.cpu_usage

5.0

## Cancel Jupyter and Dask deployment (optional)

These deployments will be killed anyway when cancelling nodes, albeit not gracefully.

In [27]:
nb.cancel()

2018-11-18 16:16:22 INFO: Cancelling Jupyter deployment.


In [28]:
dd.cancel()

2018-11-18 16:16:28 INFO: Cancelling worker deployment on p0662.
2018-11-18 16:16:35 INFO: Cancelling worker deployment on p0649.
2018-11-18 16:16:43 INFO: Cancelling scheduler deployment on p0649.


## Cancel the allocation

If the nodes are still running, make sure to cancel their allocation to save CPU time.

In [29]:
nodes.running()

True

In [30]:
nodes.cancel()

2018-11-18 16:16:51 INFO: Cancelling job 14237582.


In [31]:
nodes.running()

False

In [32]:
node.run('squeue')

'JOBID PARTITION     NAME     USER ST       TIME  NODES NODELIST(REASON)\n          14237582 plgrid-te     wrap plggarst CG       3:57      2 p[0649,0662]'