# Dask Quantumtinkerer

## Introduction: how to improve the efficiency of our code?

Life is short and despite that we often find ourselves running long, expensive calculations. To speed things up, we can do a few things:
* **Parallel** execution - use the cluster!
* Compile the code in C. 

`Numba`/`CPython` allow us to easily convert our python code into C code - we already had a couple of good talks about them. Also, the good old [HPC05](https://gitlab.kwant-project.org/qt/init_hpc05) package allows us to run thing on the cluster.

In [None]:
import numpy as np
from numba import jit

x = np.arange(100).reshape(10, 10)


def slow_f(a):
    trace = 0.0
    for i in range(a.shape[0]):
        trace += np.tanh(a[i, i])
    return a + trace


@jit(nopython=True)  # Set "nopython" mode for best performance, equivalent to @njit
def fast_f(a):
    trace = 0.0
    for i in range(a.shape[0]):  # Numba likes loops
        trace += np.tanh(a[i, i])  # Numba likes NumPy functions
    return a + trace

In [None]:
%%timeit -n 1000
slow_f(x)

In [None]:
%%timeit -n 1000
fast_f(x)

See more on [Numba's documentation](https://numba.readthedocs.io/en/stable/user/5minguide.html).

There are more ways to increase the efficiency of your calculation:
* Splitting data into chunks to efficiently utilize memory.
* Asynchronous and parallel I/O.
* Dynamic task scheduling.

<img src="ml-dimensions.png" alt="drawing" width="400"/>

All of these are cool, but require a lot of thinking when designing your code. However, we don't need to think about it - [Dask](https://tutorial.dask.org/00_overview.html) does it all for us!

## What is Dask?

### Parallelisation with `dask.delayed`

In [None]:
from time import sleep


def inc(x):
    sleep(1)
    return x + 1


def add(x, y):
    sleep(1)
    return x + y

In [None]:
%%time
x = inc(1)
y = inc(2)
z = add(x, y)

In [None]:
%%time
import dask
from dask import delayed

x = delayed(inc)(1)
y = delayed(inc)(2)
z = delayed(add)(x, y)

In [None]:
z

In [None]:
%%time
z.compute()

In [None]:
z.visualize()

In [None]:
data = [1, 2, 3, 4, 5, 6, 7, 8]

In [None]:
%%time
# Sequential code

results = []
for x in data:
    y = inc(x)
    results.append(y)

total = sum(results)

In [None]:
%%time
results = []

for x in data:
    y = delayed(inc)(x)
    results.append(y)

total = delayed(sum)(results)
print("Before computing:", total)  # Let's see what type of thing total is
result = total.compute()
print("After computing :", result)  # After it's computed

In [None]:
total.visualize()

### Data chunking with `dask.array`

In [None]:
import dask.array as da

In [None]:
x = da.random.normal(10, 0.1, size=(4, 4), chunks=(2, 2))
x

In [None]:
x[0, 0]

In [None]:
x.mean(axis=1).visualize()

In [None]:
y = da.random.normal(10, 0.1, size=(4, 4), chunks=(2, 2))

In [None]:
inv = da.linalg.inv(x @ y)
inv.visualize()

In [None]:
inv

### I/O with chunked data

Dask supports writing and reading chunks in parallel. This means that you never have to load the full data into memory during I/O.

The most common file format you'll find using are HDF5, netCDF, zarr.

In [None]:
inv.to_zarr("invere_matrix", overwrite=True)

In [None]:
loaded_zarr = da.from_zarr("invere_matrix")
loaded_zarr.visualize()

In [None]:
loaded_zarr.compute()

`zarr` works great on io, but it will **not work** when using the cluster (workers can't find the files created by local). For that, we use a messy `HDF5` solution which we will see later.

### Messy arrays with `Dask.bag` 

Some arrays cannot be converted to `dask.array` object, and that is where `dask.bag` comes in. Dask bags are often used to parallelize simple computations on unstructured or semi-structured data like text data, log files, JSON records, or user defined Python objects.

Lets take a look at a piece of code that I often use for applying a function over a mesh of input parameters:

In [None]:
import itertools as it
from random import random

import dask.bag as db


def f(x, y, calc_type="coarse"):
    x = float(x)
    y = float(y)
    sleep(random())
    if calc_type == "clean":
        a = 0.2
    if calc_type == "coarse":
        a = 0.6
    return x + np.exp(-((x ** 2 + y ** 2 - 0.75 ** 2) ** 2) / a ** 4), 1


N = 2
params = {
    "x": np.linspace(-1.0, 1.0, N),
    "y": np.linspace(-1.0, 1.0, N),
    "calc_type": ["coarse", "clean"],
}

In [None]:
values = list(params.values())
args = np.array(list(it.product(*values)))

In [None]:
values

In [None]:
args

In [None]:
def wrapped_fn(args):
    return f(*args)

In [None]:
args_db = db.from_sequence(args, partition_size=2)

In [None]:
args_db.visualize()

In [None]:
args_db.compute()

In [None]:
result = args_db.map(wrapped_fn)

In [None]:
result.visualize()

In [None]:
result.compute()

Dask can also convert things into panda's dataframe object which are dask compatible (i.e. lazy execution).

In [None]:
result.to_dataframe()

In [None]:
result.to_dataframe().compute()

I use the dask pandas dataframe to utilize its `to_hdf` function which writes files asynchronously and in parallel

In [None]:
result.to_dataframe().to_hdf("bag_demo/data*", "group")

In [None]:
result_loaded = dask.dataframe.read_hdf("bag_demo/data*", "group")

In [None]:
result_loaded.visualize()

In [None]:
result_loaded.compute()

## Using the cluster with `dask_quantumtinkerer`

### Prerequisites
1. You must be able to ssh hpc05 without a password ([detailed instructions here](https://gitlab.kwant-project.org/qt/cookbook/-/blob/master/hpc05/README.md)).
2. On IO, uncomment all lines in the file~/.config/dask/quantumtinkerer.yaml (it should appear automatically, if not -- restart your Jupyterhub notebook) and edit the gateway_port option. This is a TCP port number), that must be unique for you. A random number between 10000 and 40000 should be good.
3. Restart your notebook.

In [None]:
from dask_quantumtinkerer import Cluster, cluster_options

In [None]:
options = cluster_options()
options

In [None]:
options.worker_cores = 1  # you should always leave this to 1 (I think)
options.worker_memory = 2  # Need to atdjust this to meet your needs
options.extra_path = "/home/kostasvilkelis/work/qt-dask-demo/"  # Make sure to do this if you import any local modules!

To utilize local modules imported in this notebook, you need to specify the path in hpc05 to the import.
Make sure to sync your code on io and hpc05 (through git)!

In [None]:
cluster = Cluster(options)
cluster

We can use the dashboard to monitor the progress of the calculation and obtain valuable information (we shall see that later).

To access it, take the `8000/clusters/.../status` part from above dashboard link and combine it with io proxy link as shown below:

In [None]:
print(
    "http://io.quantumtinkerer.tudelft.nl/user/Kostas/proxy/"
    + cluster.dashboard_link[17:]
)

We have logged into the cluster, however, we still have no workers connected. There are two ways to connect/scale the workers:
* **Manual scaling**: you connect to a fixed number of workers right until you disconnect from the cluster.
* **Adaptive scaling**: workers scale adaptively based on the calculation you are running.

In [None]:
cluster.scale(10)

In [None]:
cluster.adapt(minimum=0, maximum=10)

In [None]:
import time


@delayed
def do_nothing(x):
    time.sleep(5)
    return x


x = da.concatenate(
    [da.from_delayed(do_nothing(i), shape=(1,), dtype=int) for i in range(10)]
)

In [None]:
x.visualize()

In [None]:
x.compute()

In order to utilize the cluster, we need to run the `cluster.get_client()` command. This makes it so that all subsequent calls for `compute()` or `persist()` get transferred over to the cluster

In [None]:
client = cluster.get_client()

In [None]:
x.compute()

The other command I mentioned, `persist()`, is the same as compute, only difference is that after calculations it leaves the data distributed all over the workers.

In [None]:
x_distr = x.persist()

In [None]:
x_distr

In [None]:
x_distr.visualize()

In [None]:
cluster.close()

Some things you **need** to make sure of before running the calculation:

* Worker Memory $>$ Single Chunk memory requirements (+account for memory usage from the task itself)

* #Cores  $\leq$ #Chunks 

*  Chunk Size s.t. Task on that chunk takes $\geq$ 100ms

## Walktrough example: Calculation, storage, analysis

### Adaptive Integration

In [None]:
import itertools as it
from random import random
from time import sleep

import dask
import dask.array as da
import dask.bag as db
import dask.dataframe as df
import numpy as np
from dask_quantumtinkerer import Cluster, cluster_options

In [None]:
import adaptive

adaptive.notebook_extension()

In [None]:
def ring(xy, wait=True):
    if wait:
        sleep(random() / 10)
    x, y = xy
    a = 0.2
    return x + np.exp(-((x ** 2 + y ** 2 - 0.75 ** 2) ** 2) / a ** 4)

In [None]:
options = cluster_options()
options.worker_memory = 0.25
options.extra_path = "/home/kostasvilkelis/work/qt-dask-demo/"


cluster = Cluster(options)  # ADAPTIVE REQUIRES MANUAL CORE SCALING!
cluster.scale(10)
print(
    "http://io.quantumtinkerer.tudelft.nl/user/Kostas/proxy/"
    + cluster.dashboard_link[17:]
)

In [None]:
client = cluster.get_client()

In [None]:
learner = adaptive.Learner2D(ring, bounds=[(-1, 1), (-1, 1)])

In [None]:
runner = adaptive.Runner(learner, executor=client, goal=lambda l: l.loss() < 0.01)

In [None]:
runner.live_info()

In [None]:
def plot(learner):
    plot = learner.plot(tri_alpha=0.2)
    return (plot.Image + plot.EdgePaths.I + plot).cols(2)


runner.live_plot(plotter=plot, update_interval=0.1)

### Sampling on a mesh and saving data

In [None]:
def f(x, y, a):
    x = float(x)
    y = float(y)
    a = float(a)
    sleep(0.5)
    return x + np.exp(-((x ** 2 + y ** 2 - 0.75 ** 2) ** 2) / a ** 4), 1.0


N = 20
params = {
    "x": np.linspace(-1.0, 1.0, N),
    "y": np.linspace(-1.0, 1.0, N),
    "a": np.array([0.2, 0.6]),
}


values = list(params.values())
args = np.array(list(it.product(*values)))


def wrapped_fn(args):
    return f(*args)

In [None]:
baged_input = db.from_sequence(args, partition_size=1)

In [None]:
client = cluster.get_client()
baged_input.map(wrapped_fn).to_dataframe().to_hdf(
    "/home/kostasvilkelis/work/qt-dask-demo/cluster_demo/temp_data*", "group", mode="w"
);

In [None]:
cluster.close()

### Same thing, but less code

In [None]:
from time import sleep

import numpy as np
from dask_io import dask_io

In [None]:
def f(x, y, a):
    sleep(0.5)
    return x + np.exp(-((x ** 2 + y ** 2 - 0.75 ** 2) ** 2) / a ** 4), 1


N = 20
params = {
    "x": np.linspace(-1.0, 1.0, N),
    "y": np.linspace(-1.0, 1.0, N),
    "a": np.array([0.2, 0.6]),
}

output_dir = "cluster_demo_clean/"

In [None]:
calc = dask_io(f, params, output_dir)
calc.cluster_calc(worker_memory=0.3, nodes=10, partition_size=2)

### Storing and viewing data with `xarray` and `holoviews`

`HDF5` format is nice, but is messy and doesn't provide a straightforward way to provide metadata/parameters. That is where [xarray](http://xarray.pydata.org/en/stable/) comes in! It is essentially pandas for multidimensional data.

In [None]:
import xarray as xr

In [None]:
xr_data = calc.to_xr()

In [None]:
xr_data

In [None]:
xr_computed = xr_data.compute()

In [None]:
xr_computed

For more on how to use xarray, I refer you to the tutorial notebook made by Bas [here](https://gist.github.com/basnijholt/a3a91fb919aa500b940f51e3d9890bd9).

The good thing about `xarray` is that is it compatible with [holoviews](https://holoviews.org/)!

In [None]:
%matplotlib inline
import holoviews as hv
from holoviews import opts

hv.extension("matplotlib")

In [None]:
hv_data = hv.Dataset(xr_computed)

In [None]:
dmap = (
    hv_data.to(hv.Image, kdims=["x", "y"], dynamic=True)
    .opts(cmap="viridis")
    .opts(plot=dict(colorbar=True, fig_size=200, aspect="square"))
)
dmap

In [None]:
dmap = hv_data.to(hv.Curve, kdims=["x"], dynamic=True)
dmap

### Sampling on a mesh and saving data (not working :| )

In [None]:
def f(x, y, a):
    sleep(5 * random())
    return x + np.exp(-((x ** 2 + y ** 2 - 0.75 ** 2) ** 2) / a ** 4)


N = 40
params = {
    "x": np.linspace(-1.0, 1.0, N),
    "y": np.linspace(-1.0, 1.0, N),
    "a": np.array([0.2, 0.6]),
}


values = list(params.values())
args = np.array(list(it.product(*values)))
shapes = [len(values[i]) for i in range(len(values))]


def wrapped_fn(args):
    return f(*args)

In [None]:
da_args = da.from_array(args.T, chunks=(3, 100))
results = da_args.map_blocks(wrapped_fn, dtype=float, drop_axis=0)

In [None]:
results

In [None]:
df.from_dask_array(results).to_hdf(
    "/home/kostasvilkelis/work/qt-dask-demo/ring/temp_data*", "key", mode="w"
)

In [None]:
cluster.close()

In [None]:
load_data = (
    dask.dataframe.read_hdf("ring/temp_data*", "key")
    .to_dask_array(lengths=True)
    .reshape(shapes)
)

In [None]:
# shaped_data = da.reshape(load_data, shapes)

In [None]:
# shaped_data

In [None]:
import xarray as xr

In [None]:
xr_array = xr.DataArray(
    data=shaped_data,
    dims=params.keys(),
    coords=params,
    attrs={"description": "Ring Example"},
)

In [None]:
xr_array.to_netcdf("xarray_results")

In [None]:
%matplotlib inline
import holoviews as hv
from holoviews import opts

hv.extension("matplotlib")

In [None]:
xr_dataset = xr.open_dataset("xarray_results")

In [None]:
hv_data = hv.Dataset(xr_dataset)

In [None]:
dmap = (
    hv_data.to(hv.Image, kdims=["x", "y"], dynamic=True)
    .opts(cmap="viridis")
    .opts(plot=dict(colorbar=True, fig_size=200, aspect="square"))
)
dmap

In [None]:
dmap = hv_data.to(hv.Curve, kdims=["x"], dynamic=True)
dmap