# Dask Extension

This notebook will give you a short introduction into the Dask Extension on JURECA. It allows you to run Jobs on the compute nodes, even if your JupyterLab is running interactively on the login node.  
First you have to define on which project and partition it should be running.

In [1]:
queue = "batch" #  batch, gpus, develgpus, etc.
project = "cstvs" # your project: zam, training19xx, etc.

# Monte-Carlo Estimate of $\pi$

We want to estimate the number $\pi$ using a [Monte-Carlo method](https://en.wikipedia.org/wiki/Pi#Monte_Carlo_methods) exploiting that the area of a quarter circle of unit radius is $\pi/4$ and that hence the probability of any randomly chosen point in a unit square to lie in a unit circle centerd at a corner of the unit square is $\pi/4$ as well.  So for N randomly chosen pairs $(x, y)$ with $x\in[0, 1)$ and $y\in[0, 1)$, we count the number $N_{circ}$ of pairs that also satisfy $(x^2 + y^2) < 1$ and estimage $\pi \approx 4 \cdot N_{circ} / N$.

[<img src="https://upload.wikimedia.org/wikipedia/commons/8/84/Pi_30K.gif" 
     width="50%" 
     align=top
     alt="PI monte-carlo estimate">](https://en.wikipedia.org/wiki/Pi#Monte_Carlo_methods)

## Core Lessons

- setting up SLURM (and other jobqueue) clusters
- Scaling clusters
- Adaptive clusters

## Set up a Slurm cluster

We'll create a SLURM cluster and have a look at the job-script used to start workers on the HPC scheduler.

In [2]:
import dask
from dask.distributed import Client
from dask_jobqueue import SLURMCluster
import os

cluster = SLURMCluster(
    cores=24,
    processes=2,
    memory="100GB",
    shebang='#!/usr/bin/env bash',
    queue=queue,
    dashboard_address=":56755",
    walltime="00:30:00",
    local_directory='/tmp',
    death_timeout="15s",
    interface="ib0",
    log_directory=f'{os.environ["HOME"]}/dask_jobqueue_logs/',
    project=project)

In [3]:
print(cluster.job_script())

#!/usr/bin/env bash

#SBATCH -J dask-worker
#SBATCH -e /p/home/jusers/kreuzer1/jureca/dask_jobqueue_logs//dask-worker-%J.err
#SBATCH -o /p/home/jusers/kreuzer1/jureca/dask_jobqueue_logs//dask-worker-%J.out
#SBATCH -p batch
#SBATCH -A cstvs
#SBATCH -n 1
#SBATCH --cpus-per-task=24
#SBATCH --mem=94G
#SBATCH -t 00:30:00

JOB_ID=${SLURM_JOB_ID%;*}

/usr/local/software/jureca/Stages/Devel-2019a/software/Python/3.6.8-GCCcore-8.3.0/bin/python -m distributed.cli.dask_worker tcp://10.80.32.31:35250 --nthreads 12 --nprocs 2 --memory-limit 50.00GB --name name --nanny --death-timeout 15s --local-directory /tmp --interface ib0



In [4]:
client = Client(cluster)
client

0,1
Client  Scheduler: tcp://10.80.32.31:35250  Dashboard: http://10.80.32.31:56755/status,Cluster  Workers: 0  Cores: 0  Memory: 0 B


## You can visit the Dask Dashboard at the following url:  
```
https://jupyter-jsc.fz-juelich.de/user/<user_name>/<lab_name>/proxy/<port>/status
```

## You can integrate it into your JupyterLab environment by putting the link into the Dask Extension

!["Dask"](https://zam10183.zam.kfa-juelich.de/hub/static/images/dask2.png "dask")

Afterwards you can press on the orange buttons to open a new tab in your JupyterLab Environment.

## Scale the cluster to two nodes

A look at the Dashboard reveals that there are no workers in the clusetr.  Let's start 4 workers (in 2 SLURM jobs).

For the distiction between _workers_ and _jobs_, see [the Dask jobqueue docs](https://jobqueue.dask.org/en/latest/howitworks.html#workers-vs-jobs).

In [5]:
cluster.scale(4)  # scale to 4 _workers_

## The Monte Carlo Method

In [6]:
import dask.array as da
import numpy as np


def calc_pi_mc(size_in_bytes, chunksize_in_bytes=200e6):
    """Calculate PI using a Monte Carlo estimate."""

    size = int(size_in_bytes / 8)
    chunksize = int(chunksize_in_bytes / 8)

    xy = da.random.uniform(0, 1, size=(size / 2, 2), chunks=(chunksize / 2, 2))

    in_circle = (xy ** 2).sum(axis=-1) < 1
    pi = 4 * in_circle.mean()

    return pi


def print_pi_stats(size, pi, time_delta, num_workers):
    """Print pi, calculate offset from true value, and print some stats."""
    print(
        f"{size / 1e9} GB\n"
        f"\tMC pi: {pi : 13.11f}"
        f"\tErr: {abs(pi - np.pi) : 10.3e}\n"
        f"\tWorkers: {num_workers}"
        f"\t\tTime: {time_delta : 7.3f}s"
    )

## The actual calculations

We loop over different volumes of double-precision random numbers and estimate $\pi$ as described above.

In [7]:
from time import time, sleep

In [8]:
for size in (1e9 * n for n in (1, 10, 100)):

    start = time()
    pi = calc_pi_mc(size).compute()
    elaps = time() - start

    print_pi_stats(
        size, pi, time_delta=elaps, num_workers=len(cluster.scheduler.workers)
    )

1.0 GB
	MC pi:  3.14163532800	Err:  4.267e-05
	Workers: 4		Time:  14.035s
10.0 GB
	MC pi:  3.14159999360	Err:  7.340e-06
	Workers: 4		Time:   1.755s
100.0 GB
	MC pi:  3.14160576512	Err:  1.311e-05
	Workers: 4		Time:   9.881s


## Scaling the Cluster to twice its size

We increase the number of workers by 2 and the re-run the experiments.

In [9]:
new_num_workers = 2 * len(cluster.scheduler.workers)

print(f"Scaling from {len(cluster.scheduler.workers)} to {new_num_workers} workers.")

cluster.scale(new_num_workers)

sleep(10)

Scaling from 4 to 8 workers.


In [10]:
client

0,1
Client  Scheduler: tcp://10.80.32.31:35250  Dashboard: http://10.80.32.31:56755/status,Cluster  Workers: 4  Cores: 48  Memory: 200.00 GB


## Re-run same experiments with doubled cluster

In [11]:
for size in (1e9 * n for n in (1, 10, 100)):

    start = time()
    pi = calc_pi_mc(size).compute()
    elaps = time() - start

    print_pi_stats(
        size, pi, time_delta=elaps, num_workers=len(cluster.scheduler.workers)
    )

1.0 GB
	MC pi:  3.14156992000	Err:  2.273e-05
	Workers: 8		Time:   0.863s
10.0 GB
	MC pi:  3.14159750400	Err:  4.850e-06
	Workers: 8		Time:   2.203s
100.0 GB
	MC pi:  3.14156159168	Err:  3.106e-05
	Workers: 8		Time:   5.774s


## Automatically Scaling the Cluster

We want each calculation to take only a few seconds.  Dask will try to add more workers to the cluster when workloads are high and remove workers when idling.

_**Watch** how the cluster will scale down to the minimum a few seconds after being made adaptive._

In [12]:
ca = cluster.adapt(minimum=4, maximum=100)

sleep(4)  # Allow for scale-down

In [13]:
client

0,1
Client  Scheduler: tcp://10.80.32.31:35250  Dashboard: http://10.80.32.31:56755/status,Cluster  Workers: 4  Cores: 48  Memory: 200.00 GB


## Repeat the calculation from above with larger work loads

(And watch the dash board!)

In [15]:
for size in (n * 1e9 for n in (1, 10, 100)):

    start = time()
    pi = calc_pi_mc(size, min(size / 1000, 500e6)).compute()
    elaps = time() - start

    print_pi_stats(
        size, pi, time_delta=elaps, num_workers=len(cluster.scheduler.workers)
    )

    sleep(20)  # allow for scale-down time

1.0 GB
	MC pi:  3.14140582400	Err:  1.868e-04
	Workers: 4		Time:   2.682s
10.0 GB
	MC pi:  3.14159994880	Err:  7.295e-06
	Workers: 4		Time:   3.225s
100.0 GB
	MC pi:  3.14156351552	Err:  2.914e-05
	Workers: 20		Time:  31.772s
