# Dask deployment on Grid'5000

This notebook will deploy a Dask cluster on Grid'5000 and launch a simpe computation.

Requirements: 
  - A conda[[1]] environment setup on the Grid'5000 frontend with dask installed and EnOSlib.
  - The same environment can be use to run this notebook from your local machine. 

[1]: https://docs.conda.io/en/latest/miniconda.html#linux-installers


## Initial impors
    

In [21]:
from enoslib import *
import logging

# get some logs
logging.basicConfig(level=logging.INFO)

## Get some resources on Grid'5000

This will reserve two nodes, where the Dask cluster will be deployed later.

In [22]:
prod = G5kNetworkConf(id="prod", roles=["network"], type="prod", site="rennes")
conf = (
    G5kConf.from_settings(job_name="dask", job_type="allow_classic_ssh")
    .add_machine(roles=["scheduler"], cluster="parapide", nodes=1, primary_network=prod)
    .add_machine(roles=["worker"], cluster="parapide", nodes=1, primary_network=prod)
    .add_network_conf(prod)
).finalize()
provider = G5k(conf)
roles, _ = provider.init()

INFO:enoslib.infra.enos_g5k.g5k_api_utils:Reloading dask from grenoble
INFO:enoslib.infra.enos_g5k.g5k_api_utils:Reloading dask from lille
{'roles': ['scheduler'], 'primary_network': 'prod', 'secondary_networks': [], 'cluster': 'parapide', 'nodes': 1}
{'roles': ['worker'], 'primary_network': 'prod', 'secondary_networks': [], 'cluster': 'parapide', 'nodes': 1}
{
    "dhcp": true,
    "force_deploy": false,
    "env_name": "debian10-x64-nfs",
    "job_name": "dask",
    "job_type": "allow_classic_ssh",
    "key": "/home/msimonin/.ssh/id_rsa.pub",
    "queue": "default",
    "walltime": "02:00:00",
    "resources": {
        "machines": [
            {
                "roles": [
                    "scheduler"
                ],
                "primary_network": "prod",
                "secondary_networks": [],
                "cluster": "parapide",
                "nodes": 1
            },
            {
                "roles": [
                    "worker"
                ],
         

# Deploy Dask on the nodes
This assumes that the conda environment (dask-base) is configured in your home directory in `/home/<user>/miniconda3`.

If the installation path differs, you can specify it using the `conda_prefix` parameter. 

In [23]:
username = g5k_api_utils.get_api_username()
dask = Dask("dask-base", scheduler=roles["scheduler"][0], workers=roles["worker"], run_as=username)
dask.deploy()

INFO:enoslib.api:Running playbook /home/msimonin/workspace/repos/enoslib/docs/apidoc/examples/tmpc775senv with vars:
{}
source /home/msimonin/miniconda3/etc/profile.d/conda.sh && conda activate andromak && dask-scheduler

PLAY [scheduler] ***************************************************************

TASK [(tmux ls | grep dask-scheduler )|| tmux new-session -s dask-scheduler -d 'source /home/msimonin/miniconda3/etc/profile.d/conda.sh && conda activate andromak && dask-scheduler'] ***
Tuesday 12 January 2021  14:14:40 +0100 (0:13:33.402)       0:28:23.917 ******* 
changed: [parapide-12.rennes.grid5000.fr]

TASK [__calling__ wait_for] ****************************************************
Tuesday 12 January 2021  14:14:42 +0100 (0:00:01.219)       0:28:25.136 ******* 
ok: [parapide-12.rennes.grid5000.fr]

PLAY RECAP *********************************************************************
parapide-12.rennes.grid5000.fr : ok=2    changed=1    unreachable=0    failed=0    skipped=0    rescued=

## Using Dask

Here we go with a simple computation (3 tasks, 2 dependent-ones). 
The below code will create all the tunnels needed to access the Dask dashboard and the scheduler.

In [24]:
from dask import delayed
import time

def inc(x):
    time.sleep(5)
    return x + 1

def dec(x):
    time.sleep(3)
    return x - 1

def add(x, y):
    time.sleep(7)
    return x + y

x = delayed(inc)(1)
y = delayed(dec)(2)
total = delayed(add)(x, y)

## Launch the computation

In the mean time you can check the web dashboard. The connection URL will be displayed.

In [25]:
from dask.distributed import Client
# Tunnel to the dashboard
addr, port, tunnel = G5kTunnel(dask.scheduler.address, 8787).start()
print(f"dashboard: http://{addr}:{port}")
with G5kTunnel(dask.scheduler.address, 8786) as (addr, port, _):
    print(f"Scheduler address: {addr}:{port}")
    client = Client(f"tcp://{addr}:{port}")
    # launch a computation
    print(f"result={total.compute()}")



INFO:paramiko.transport:Connected (version 2.0, client OpenSSH_7.4p1)
INFO:paramiko.transport:Authentication (publickey) successful!
INFO:paramiko.transport:Connected (version 2.0, client OpenSSH_7.4p1)
dashboard: http://0.0.0.0:38383
INFO:paramiko.transport:Authentication (publickey) successful!
Scheduler address: 0.0.0.0:35945
result=3


In [26]:
# will stop the tunnel to the dashboard and the Dask cluster.
if tunnel is not None:
    tunnel.stop(force=True)
dask.destroy()

INFO:enoslib.api:Running playbook /home/msimonin/workspace/repos/enoslib/docs/apidoc/examples/tmpt2mbeu_y with vars:
{}

PLAY [scheduler] ***************************************************************

TASK [Killing the dask scheduler] **********************************************
Tuesday 12 January 2021  14:14:57 +0100 (0:00:13.913)       0:28:40.736 ******* 
changed: [parapide-12.rennes.grid5000.fr]

PLAY RECAP *********************************************************************
parapide-12.rennes.grid5000.fr : ok=1    changed=1    unreachable=0    failed=0    skipped=0    rescued=0    ignored=0   

Tuesday 12 January 2021  14:14:57 +0100 (0:00:00.203)       0:28:40.940 ******* 
Killing the dask scheduler ---------------------------------------------- 0.21s
INFO:enoslib.api:Running playbook /home/msimonin/workspace/repos/enoslib/docs/apidoc/examples/tmpr0joxpnq with vars:
{}
{'code': 0, 'result': [{'parapide-12.rennes.grid5000.fr': {'ok': 1, 'failures': 0, 'unreachable': 0, 'chang