# Running Dask on AzureML in a VNet

This notebook shows how to run a Dask cluster on an AzureML Compute cluster from a VM in the same VNet. 
For setup, please see the [these instruction](setup-vnet.md).

## Starting the cluster

In [48]:
from azureml.core import Workspace, Experiment
from azureml.train.estimator import Estimator
from azureml.widgets import RunDetails
from azureml.core.runconfig import MpiConfiguration
from azureml.core import VERSION
import time
VERSION

'1.0.62'

In [49]:
ws = Workspace.from_config()

In [50]:
dask_cluster = ws.compute_targets['daskcluster']

Starting the Dask cluster using an Estimator with MpiConfiguration. Make sure the cluster is able to scale up to 10 nodes or change the `node_count` below. Also, this example is launching 2 workers per node (assuming 2 core machines). If you are running on more cores, you can change the `process_count_per_node` below.

In [51]:
mpi_configuration = MpiConfiguration()
mpi_configuration.process_count_per_node = 2

est = Estimator('dask', 
                compute_target=dask_cluster, 
                entry_script='startDask.py', 
                conda_dependencies_file_path='environment.yml', 
                script_params={'--data': ws.get_default_datastore()},
                node_count=10,
                distributed_training=mpi_configuration)

run = Experiment(ws, 'dask').submit(est)



In [54]:
RunDetails(run).show()

_UserRunWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', '…

In [59]:
while not 'headnode' in run.get_metrics():
    print("waiting for scheduler node's ip")
    time.sleep(5)

print('Headnode has IP:', run.get_metrics()['headnode'])

Headnode has IP: 172.17.0.5


## Run some jobs on the cluster
Thanks to the DSVM and the cluster sharing the vnet, you can now connect to the cluster directly.

In [60]:
from dask.distributed import Client

c = Client(run.get_metrics()['scheduler'])
c

0,1
Client  Scheduler: tcp://172.17.0.5:8786  Dashboard: http://172.17.0.5:8787/status,Cluster  Workers: 19  Cores: 19  Memory: 138.86 GB


If you also want to use the Bokeh app, then you need to add another port forward to the ssh command you use to log in to the DSVM, like so:

In [65]:
print("ssh -L 8888:localhost:8888 -L 8787:{}:8787 <dns-name of your DSVM>".format(run.get_metrics()['headnode']))

ssh -L 8888:localhost:8888 -L 8787:172.17.0.5:8787 <dns-name of your DSVM>


Now you should see the Bokeh app at http://localhost:8787 -- let's see if the cluster works

In [62]:
def inc(x):
    return x + 1

fut = c.submit(inc, 1)
fut

In [63]:
fut.result()

2

# Training on Large Datasets
(from https://github.com/dask/dask-tutorial)

Sometimes you'll want to train on a larger than memory dataset. `dask-ml` has implemented estimators that work well on dask arrays and dataframes that may be larger than your machine's RAM.

In [64]:
from dask.distributed import Client
import joblib
import dask.array as da
import dask.delayed
from sklearn.datasets import make_blobs
import numpy as np

We'll make a small (random) dataset locally using scikit-learn.

In [36]:
n_centers = 12
n_features = 20

X_small, y_small = make_blobs(n_samples=1000, centers=n_centers, n_features=n_features, random_state=0)

centers = np.zeros((n_centers, n_features))

for i in range(n_centers):
    centers[i] = X_small[y_small == i].mean(0)
    
centers[:4]

array([[ 1.00796679,  4.34582168,  2.15175661,  1.04337835, -1.82115164,
         2.81149666, -1.18757701,  7.74628882,  9.36761449, -2.20570731,
         5.71142324,  0.41084221,  1.34168817,  8.4568751 , -8.59042755,
        -8.35194302, -9.55383028,  6.68605157,  5.34481483,  7.35044606],
       [ 9.49283024,  6.1422784 , -0.97484846,  5.8604399 , -7.61126963,
         2.86555735, -7.25390288,  8.89609285,  0.33510318, -1.79181328,
        -4.66192239,  5.43323887, -0.86162507,  1.3705568 , -9.7904172 ,
         2.3613231 ,  2.20516237,  2.20604823,  8.76464833,  3.47795068],
       [-2.67206588, -1.30103177,  3.98418492, -8.88040428,  3.27735964,
         3.51616445, -5.81395151, -7.42287114, -3.73476887, -2.89520363,
         1.49435043, -1.35811028,  9.91250767, -7.86133474, -5.78975793,
        -6.54897163,  3.08083281, -5.18975209, -0.85563107, -5.06615534],
       [-6.85980599, -7.87144648,  3.33572279, -7.00394241, -5.97224874,
        -2.55638942,  6.36329802, -7.97988653,  

The small dataset will be the template for our large random dataset.
We'll use `dask.delayed` to adapt `sklearn.datasets.make_blobs`, so that the actual dataset is being generated on our workers. 

In [37]:
n_samples_per_block = 200000
n_blocks = 500

delayeds = [dask.delayed(make_blobs)(n_samples=n_samples_per_block,
                                     centers=centers,
                                     n_features=n_features,
                                     random_state=i)[0]
            for i in range(n_blocks)]
arrays = [da.from_delayed(obj, shape=(n_samples_per_block, n_features), dtype='float64')
          for obj in delayeds]
X = da.concatenate(arrays)
X

Unnamed: 0,Array,Chunk
Bytes,16.00 GB,32.00 MB
Shape,"(100000000, 20)","(200000, 20)"
Count,2000 Tasks,500 Chunks
Type,float64,numpy.ndarray
"Array Chunk Bytes 16.00 GB 32.00 MB Shape (100000000, 20) (200000, 20) Count 2000 Tasks 500 Chunks Type float64 numpy.ndarray",20  100000000,

Unnamed: 0,Array,Chunk
Bytes,16.00 GB,32.00 MB
Shape,"(100000000, 20)","(200000, 20)"
Count,2000 Tasks,500 Chunks
Type,float64,numpy.ndarray


In [38]:
# Check the size of the array
X.nbytes / 1e9

16.0

In [39]:
# Only run this on the cluster.
X = X.persist()  

The algorithms implemented in Dask-ML are scalable. They handle larger-than-memory datasets just fine.

They follow the scikit-learn API, so if you're familiar with scikit-learn, you'll feel at home with Dask-ML.

In [40]:
from dask_ml.cluster import KMeans
clf = KMeans(init_max_iter=3, oversampling_factor=10)

In [41]:
%time clf.fit(X)

CPU times: user 19.4 s, sys: 664 ms, total: 20.1 s
Wall time: 58.5 s


KMeans(algorithm='full', copy_x=True, init='k-means||', init_max_iter=3,
       max_iter=300, n_clusters=8, n_jobs=1, oversampling_factor=10,
       precompute_distances='auto', random_state=None, tol=0.0001)

In [42]:
clf.labels_

Unnamed: 0,Array,Chunk
Bytes,400.00 MB,800.00 kB
Shape,"(100000000,)","(200000,)"
Count,3000 Tasks,500 Chunks
Type,int32,numpy.ndarray
"Array Chunk Bytes 400.00 MB 800.00 kB Shape (100000000,) (200000,) Count 3000 Tasks 500 Chunks Type int32 numpy.ndarray",100000000  1,

Unnamed: 0,Array,Chunk
Bytes,400.00 MB,800.00 kB
Shape,"(100000000,)","(200000,)"
Count,3000 Tasks,500 Chunks
Type,int32,numpy.ndarray


In [43]:
clf.labels_[:10].compute()

array([4, 7, 4, 5, 5, 5, 2, 7, 6, 0], dtype=int32)

## Shut cluster down
To shut the cluster down, cancel the job that runs the cluster. 

In [47]:
run.cancel()