# Running a DASK cluster with RAPIDS

This notebook runs a DASK cluster with NVIDIA RAPIDS. RAPIDS uses NVIDIA CUDA for high-performance GPU execution, exposing GPU parallelism and high memory bandwidth through a user-friendly Python interface. It includes a dataframe library called cuDF which will be familiar to Pandas users, as well as an ML library called cuML that provides GPU versions of all machine learning algorithms available in Scikit-learn. 

This notebook shows how through DASK, RAPIDS can take advantage of multi-node, multi-GPU configurations on AzureML. 

This notebook is deploying the AzureML cluster to a VNet. Prior to running this, setup a VNet and DSVM according to [../setup-vnet.md](../setup-vnet.md). In this case the following names are used to identify the VNet and subnet.

In addition, you need to forward the following ports to the DSVM 

- port 8888 to port 8888 for the jupyter server running on the DSVM (see [../setup-vnet.md](../setup-vnet.md))
- port 9999 to port 9999 for the jupyter server running on the AML Cluster (will be explained below)
- port 9797 to port 9797 for the jupyter server running on the AML Cluster (will be explained below)

The easiert way to accomplish that is by logging into the DSVM using ssh with the following flags (assuming `mydsvm.westeurope.cloudapp.azure.com` is the DNS name for your DSVM:

    ssh mydsvm.westeurope.cloudapp.azure.com -L 9797:localhost:9797 -L 9999:localhost:9999 -L 8888:localhost:8888


In [1]:
import os
import json
import time

from azureml.core import Workspace, Experiment, Environment
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.data.data_reference import DataReference
from azureml.core.runconfig import RunConfiguration, MpiConfiguration
from azureml.core import ScriptRunConfig
from azureml.train.estimator import Estimator
from azureml.exceptions import ComputeTargetException
from azureml.widgets import RunDetails

from subprocess import Popen, PIPE

class PortForwarder():
    '''A helper to forward ports from the Notebook VM to the AML Cluster in the same VNet'''
    active_instances = set()
    
    def __init__(self, from_port, to_ip, to_port):
        self.from_port = from_port
        self.to_ip = to_ip
        self.to_port = to_port
        
    def start(self):
        self._socat = Popen(["/usr/bin/socat", 
                             f"tcp-listen:{self.from_port},reuseaddr,fork", 
                             f"tcp:{self.to_ip}:{self.to_port}"],
                            stderr=PIPE, stdout=PIPE, universal_newlines=True)
        PortForwarder.active_instances.add(self)
        return self
        
    def stop(self):
        PortForwarder.active_instances.remove(self)
        return self._socat.terminate()
    
    def stop_all():
        for instance in list(PortForwarder.active_instances):
            instance.stop()

In [2]:
gpu_cluster_name = "nd12-vnet-clustr"
vnet_resourcegroup_name='demo'
vnet_name='myvnet'
subnet_name='default'

ws = Workspace.from_config()

### Deploy the AmlCompute cluster
The next cell is deploying the AmlCompute cluster. The cluster is configured to scale down to 0 nodes after 2 minuten, so no cost is incurred while DASK is not running (and thus no nodes are spun up on the cluster as the result of this cell, yet). This cell only needs to be executed once and the cluster can be reused going forward.

In [3]:
try:
    gpu_cluster = ComputeTarget(workspace=ws, name=gpu_cluster_name)
    print('Found existing compute target')
    
except ComputeTargetException:
    print("Creating new cluster")

    provisioning_config = AmlCompute.provisioning_configuration(
        vm_size="Standard_ND12s", 
        min_nodes=0, 
        max_nodes=10,
        idle_seconds_before_scaledown=120,
        vnet_resourcegroup_name=vnet_resourcegroup_name,
        vnet_name=vnet_name,
        subnet_name=subnet_name
    )
    gpu_cluster = ComputeTarget.create(ws, gpu_cluster_name, provisioning_config)

    print("waiting for nodes")
    gpu_cluster.wait_for_completion(show_output=True)

Found existing compute target


### Copy the data to Azure Blob Storage

This next cell is pulling the NYC taxi data set down and then uploads it to the AzureML workspace's default data store. The all nodes of the DASK cluster we are creating further down will then be able to access the data.

In [4]:
import io
import os
import sys
import urllib.request
from tqdm import tqdm
from time import sleep

cwd = os.getcwd()

data_dir = os.path.abspath(os.path.join(cwd, 'data'))
if not os.path.exists(data_dir):
    os.makedirs(data_dir)

taxidir = os.path.join(data_dir, 'nyctaxi')
if not os.path.exists(taxidir):
    os.makedirs(taxidir)

filenames = []
local_paths = []
for i in range(1, 13):
    filename = "yellow_tripdata_2015-{month:02d}.csv".format(month=i)
    filenames.append(filename)
    
    local_path = os.path.join(taxidir, filename)
    local_paths.append(local_path)

for idx, filename in enumerate(filenames):
    url = "http://dask-data.s3.amazonaws.com/nyc-taxi/2015/" + filename
    print("- Downloading " + url)
    if not os.path.exists(local_paths[idx]):
        with open(local_paths[idx], 'wb') as file:
            with urllib.request.urlopen(url) as resp:
                length = int(resp.getheader('content-length'))
                blocksize = max(4096, length // 100)
                with tqdm(total=length, file=sys.stdout) as pbar:
                    while True:
                        buff = resp.read(blocksize)
                        if not buff:
                            break
                        file.write(buff)
                        pbar.update(len(buff))
    else:
        print("- File already exists locally")

print("- Uploading taxi data... ")
ws = Workspace.from_config()
ds = ws.get_default_datastore()

ds.upload(
    src_dir=taxidir,
    target_path='nyctaxi',
    show_progress=True)

print("- Data transfer complete")

- Downloading http://dask-data.s3.amazonaws.com/nyc-taxi/2015/yellow_tripdata_2015-01.csv
- File already exists locally
- Downloading http://dask-data.s3.amazonaws.com/nyc-taxi/2015/yellow_tripdata_2015-02.csv
- File already exists locally
- Downloading http://dask-data.s3.amazonaws.com/nyc-taxi/2015/yellow_tripdata_2015-03.csv
- File already exists locally
- Downloading http://dask-data.s3.amazonaws.com/nyc-taxi/2015/yellow_tripdata_2015-04.csv
- File already exists locally
- Downloading http://dask-data.s3.amazonaws.com/nyc-taxi/2015/yellow_tripdata_2015-05.csv
- File already exists locally
- Downloading http://dask-data.s3.amazonaws.com/nyc-taxi/2015/yellow_tripdata_2015-06.csv
- File already exists locally
- Downloading http://dask-data.s3.amazonaws.com/nyc-taxi/2015/yellow_tripdata_2015-07.csv
- File already exists locally
- Downloading http://dask-data.s3.amazonaws.com/nyc-taxi/2015/yellow_tripdata_2015-08.csv
- File already exists locally
- Downloading http://dask-data.s3.amazon

### Create the DASK Cluster

On the AMLCompute cluster we are now running a Python job that will run a DASK cluster. 

In [5]:
mpi_config = MpiConfiguration()
mpi_config.process_count_per_node = 2

est = Estimator(
    source_directory='./dask',
    compute_target=gpu_cluster,
    entry_script='init-dask.py',
    script_params={
        '--data': ws.get_default_datastore(),
        '--gpus': str(2)  # The number of GPUs available on each node
        },
    node_count=3,
    use_gpu=True,
    distributed_training=mpi_config,
    conda_dependencies_file='rapids-0.9.yaml')

run = Experiment(ws, "init-dask-jupyter").submit(est)



Let's use the widget to monitor how the DASK cluster spins up. When run for the first time on a workspace, the following thing will happen:

1. The docker image will to be created, which takes about 20 minutes. 
2. Then AzureML will start to scale the cluster up by provisioning the required number of nodes (`node_count` above), which will take another 5-10 minutes with the chosen Standard_ND12s
3. The docker image is being transferred over to the compute nodes, which, given the size of about 8 GB takes another 3-5 minutes

So alltogether the process will take up to 30 minutes when run for the first time.

In [8]:
from azureml.widgets import RunDetails
RunDetails(run).show()

_UserRunWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', '…

### Wait for the cluster to come up

In [9]:
from IPython.display import clear_output
import time

it = 0
while not "headnode" in run.get_metrics():
    clear_output(wait=True)
    print("waiting for scheduler node's ip " + str(it) )
    time.sleep(1)
    it += 1

headnode = run.get_metrics()["headnode"]
jupyter_ip = run.get_metrics()["jupyter-ip"]
jupyter_port = run.get_metrics()["jupyter-port"]
jupyter_token = run.get_metrics()["jupyter-token"]

### Establish port forwarding to the cluster

In [10]:
dashboard = PortForwarder(9797, headnode, 8787).start()
jupyter = PortForwarder(9999, headnode, 8888).start()

print("If you are forwarding the ports from your local machine as described at the top of this notebook,")
print("then you should now be able to connect to the Dashboard and Jupyter Server via the following URLs:")
print()
print(f"     Dashboard: http://localhost:9797")
print(f"     Jupyter on cluster: http://localhost:9999/notebooks/azure_taxi_on_cluster.ipynb?token={jupyter_token}")

If you are forwarding the ports from your local machine as described at the top of this notebook,
then you should now be able to connect to the Dashboard and Jupyter Server via the following URLs:

     Dashboard: http://localhost:9797
     Jupyter on cluster: http://localhost:9999/notebooks/azure_taxi_on_cluster.ipynb?token=0ed225c7db00699b10d80d86dc09d8149d6eae21e7200aac


## Shutting the cluster down

Terminate the run to shut the cluster down. Once you are done with your interactive work, make sure to do this so the AML Compute cluster gets spun down again. 

In [None]:
# stop the run representing the cluster
run.cancel()
# shut down the port forwards
PortForwarder.stop_all()

### Useful for debugging

In [5]:
# get the last run
run = Experiment(ws, "init-dask-jupyter").get_runs().__next__()

In [23]:
run.get_metrics()

{'headnode': '172.17.0.6',
 'scheduler': '172.17.0.6:8786',
 'dashboard': '172.17.0.6:8787',
 'data': '/mnt/batch/tasks/shared/LS_root/jobs/vnettest/azureml/init-dask-jupyter_1570114867_699d20d4/mounts/workspaceblobstore',
 'jupyter-url': 'http://172.17.0.6:8888/?token=0f85e874d045185e175027bab126bd404ebe444c237a765a',
 'jupyter-port': 8888,
 'jupyter-token': '0f85e874d045185e175027bab126bd404ebe444c237a765a',
 'jupyter-ip': '172.17.0.6'}

In [8]:
run.status

'Running'