# Dask at CHTC Demo

We are currently playing with what it takes to spawn Dask workers on the CHTC pool. This notebook (and repository) encapsulate a prototype/demo of doing just that, running from a CHTC submit node and spawning Dask workers in the CHTC pool. We'll do some basic Dask work with CPU workers, then get a few GPU workers and prove we can talk to their GPUs.

Warning! Things will probably go wrong at some point.

This is not ready for production! There's a lot of hacks, and there's a lot of out-of-band system administration going on behind the scenes to make it work for this demo. Do not try this at home... but please get in contact with me (Josh Karpel) or Brian Bockelman if you're interested!

In [None]:
%matplotlib inline

## Creating a Cluster and Client

Dask is a Python library for flexible parallel computing. It is *flexible* in two ways:
- It supports both low-level and high-level parallelism, and anywhere in-between. In this demo, we'll see examples along that entire spectrum.
- User code generally does not need to change much when the size/shape of the Dask cluster you are connected to changes.

Our first order of business is to create a "cluster" to manage worker jobs on the pool. We feed the cluster to a "client", which is an object that we can use to ask Dask to do work. Many operations on Dask-provided objects, like data arrays, implicitly use the client to do work.

In [None]:
import dask
from dask_chtc import CHTCCluster
from dask.distributed import Client

WORKER_IMAGE = "maventree/dask-worker:demo"

In [None]:
cluster = CHTCCluster(worker_image = WORKER_IMAGE)
cluster.scale(10)
cluster

In [None]:
client = Client(cluster)
client

## Low-Level Parallelism

Dask is capable of parallelizing low-level operations on arrays. All we need to tell Dask is how big the smallest "chunk" of the array it should operate on is. Dask will perform operations on individual chunks out on the workers.

In [None]:
import dask
import dask.array as da

In [None]:
x = da.ones((15, 15), chunks=5)
x

In [None]:
y = x + x.T
y

In [None]:
y.visualize()

Note that whoever wrote Dask's transpose algorithm has realized that the chunks along the diagonal don't need to know about any other chunks, while the off-diagonal chunks do. Dask has figured out an efficient workflow automatically, without you needing to be clever yourself.

In [None]:
z = y.compute()
z

Dask implicitly does this kind of work by overriding normal operations on arrays, as well as implementing most of the `numpy` interface. It has equivalents of pandas dataframes as well. Most of the time, you can write "natural" scientific Python code, and Dask will handle the parallelization.

In [None]:
x = da.random.random((10000, 10000), chunks=1000)
x

In [None]:
y = ((x ** 2) + 1).sum() ** 2
y

In [None]:
z = y.compute()
z

This scales up to big operations as well. For example, Dask provides an implemention of SVD, built on top of its other functionality. This combines many operations and has a much more complex task graph, but you don't need to know any of that - just call `svd`! If you need to do other operations on the inputs or results, Dask will join them all together into one big task graph.

In [None]:
x = da.ones((10_000, 1_000), chunks=(1_000, 1_000))
u, s, v = da.linalg.svd(x)

In [None]:
s

In [None]:
s.visualize()

In [None]:
u, s, v = dask.compute(u, s, v)

## Machine Learning and High-Level Parallelism

Dask-ML provides Dask-based implemenations of various tools commonly used in ML, like clustering:

In [None]:
import dask_ml.datasets
import dask_ml.cluster
import matplotlib.pyplot as plt

In [None]:
X, y = dask_ml.datasets.make_blobs(
    n_samples=10_000_000,
    chunks=1_000_000,
    centers=5,
    center_box=(-10, 10),
    random_state=11
)
X = X.persist()
X

In [None]:
DENSITY = 1000

fig, ax = plt.subplots()
ax.scatter(X[::DENSITY, 0], X[::DENSITY, 1], 
           marker='.');

In [None]:
km = dask_ml.cluster.KMeans(n_clusters=5, init_max_iter=2, oversampling_factor=10)
km

In [None]:
km.fit(X)

In [None]:
fig, ax = plt.subplots()
ax.scatter(X[::DENSITY, 0], X[::DENSITY, 1], 
           marker='.', 
           c=km.labels_[::DENSITY],
           cmap='viridis', alpha=0.25);

In [None]:
client.cancel(X)

It also has equivalents of scikit-learn's hyperparameter optimization tools... as well as some fancier ones, like Scott Sievert's hyperband implementation! (https://examples.dask.org/machine-learning/hyperparam-opt.html)

In [None]:
from demo import make_hyperparameter_optimization_problem, make_train_test

X, y = make_hyperparameter_optimization_problem()
X_train, X_test, y_train, y_test = make_train_test(X, y)

In [None]:
import numpy as np
from sklearn.neural_network import MLPClassifier

model = MLPClassifier()

params = {
    "hidden_layer_sizes": [
        (24,),
        (12, 12),
        (6, 6, 6, 6),
        (4, 4, 4, 4, 4, 4),
        (12, 6, 3, 3),
    ],
    "activation": ["relu", "logistic", "tanh"],
    "alpha": np.logspace(-6, -3, num=1000),
    "batch_size": [16, 32, 64, 128, 256, 512],
}

In [None]:
n_examples = 10 * len(X_train)
n_params = 20

max_iter = n_params  # number of times partial_fit will be called
chunks = n_examples // n_params  # number of examples each call sees

X_train2 = da.from_array(X_train, chunks=chunks)
y_train2 = da.from_array(y_train, chunks=chunks)
X_train2

In [None]:
from dask_ml.model_selection import HyperbandSearchCV

search = HyperbandSearchCV(
    model,
    params,
    max_iter=max_iter,
    patience=True,
)
search

In [None]:
search.fit(X_train2, y_train2, classes=[0, 1, 2, 3])

In [None]:
search.best_estimator_

Dask-ML does not provide its own model description language. Instead, it can work with anything obeying the scikit-learn API, namely scikit-learn itself and PyTorch with the Skorch wrapper layer, which we'll see in the final example.

## GPUs

Nowe we'll shut down the "CPU" client we made earlier, and start up a new "GPU" client. The only difference is that the underlying HTCondor jobs now request GPUs.

You may need to poke the Dask Dashboard to get it to register the new client.

In [None]:
client.shutdown()

In [None]:
import dask
from dask_chtc import CHTCCluster
from dask.distributed import Client

WORKER_IMAGE = "maventree/dask-worker:demo"

cluster = CHTCCluster(worker_image = WORKER_IMAGE, gpus = True)
cluster.scale(2)

client = Client(cluster)
client

We'll use PyTorch, a popular Python machine learning framework. PyTorch can can tell use whether it can has access to GPUs:

In [None]:
import socket
import torch
import os

def probe():
    if torch.cuda.is_available():
        device = torch.device('cuda')
    else:
        device = torch.device('cpu')
        
    return (socket.gethostname(), device)

probe()

`Client.run` executes a function on each worker and send us back the results. This is not `Client.map`! `Client.run` is mainly used for diagnostics.

In [None]:
client.run(probe)

Let's prove we can actually do some GPU math:

In [None]:
def tensor_addition():
    device = torch.device("cuda")
    
    a = torch.tensor([1., 2.], device = device)
    
    a = a.add(1.0)
    
    a = a.cpu()
    
    return a

In [None]:
dask.delayed(tensor_addition)().compute()

### Training a PyTorch model on GPUs

And now, for the grand finale, we'll perform hyperparameter optimization on a PyTorch model using our GPUs. We'll need to use the wrapper library Skorch, which overlays PyTorch with the scikit-learn API that Dask-ML expects to use. Then we just just feed the Skorch model specification to Dask-ML and it takes over.

In [None]:
import numpy as np
from sklearn.datasets import make_classification
from torch import nn
import torch.nn.functional as F
from skorch import NeuralNetClassifier

X, y = make_classification(1000, 20, n_informative=10, random_state=0)
X = X.astype(np.float32)
y = y.astype(np.int64)

class MyModule(nn.Module):
    def __init__(self, num_units=10, nonlin=F.relu):
        super(MyModule, self).__init__()

        self.dense0 = nn.Linear(20, num_units)
        self.nonlin = nonlin
        self.dropout = nn.Dropout(0.5)
        self.dense1 = nn.Linear(num_units, 10)
        self.output = nn.Linear(10, 2)

    def forward(self, X, **kwargs):
        X = self.nonlin(self.dense0(X))
        X = self.dropout(X)
        X = F.relu(self.dense1(X))
        X = F.softmax(self.output(X), dim=-1)
        return X

In [None]:
net = NeuralNetClassifier(
    MyModule,
    max_epochs=10,
    lr=0.1,
    iterator_train__shuffle=True,
    device="cuda",
)
net

In [None]:
from dask_ml.model_selection import GridSearchCV

params = {
    'lr': [0.01, 0.02, 0.03],
    'max_epochs': [10, 20, 20],
    'module__num_units': [10, 20, 30],
}

gs = GridSearchCV(net, params, refit=False, cv=3, scoring='accuracy')

In [None]:
gs.fit(X, y)

In [None]:
net = NeuralNetClassifier(
    MyModule,
    max_epochs=10,
    lr=0.1,
    iterator_train__shuffle=True,
    device="cuda",
)
net

In [None]:
from dask_ml.model_selection import GridSearchCV

params = {
    'lr': [0.01, 0.02, 0.03],
    'max_epochs': [10, 20, 20],
    'module__num_units': [10, 20, 30],
}

gs = GridSearchCV(net, params, refit=False, cv=3, scoring='accuracy')

In [None]:
gs.fit(X, y)

In [None]:
gs.cv_results_