# Running Dask on AzureML

This notebook shows how to run a Dask cluster on an AzureML Compute cluster. 
For setup instructions of you python environment, please see the [Readme](./README.md)

## Starting the cluster

In [None]:
import mlflow
from azureml.core import Workspace, Experiment, Environment, Datastore, Dataset, ScriptRunConfig
from azureml.core.runconfig import PyTorchConfiguration
from azureml.widgets import RunDetails
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.compute_target import ComputeTargetException
from IPython.display import clear_output
import time

In [None]:
# Access to Azure Machine Learning Service
ws = Workspace.from_config()

# Create AML Compute Cluster 

As you need to connect to compute cluster via Compute Instance, make sure you create __SSH enabled__ AML Compute Cluster. And of course remember the ID and Password forport-forwarding from your local host (PC/MAC) to Dask Scheduler.


In [None]:
# Choose a name for your CPU cluster
cpu_cluster_name = "dask-inter-cpu-1"
VMSIZE='STANDARD_DS3_V2'
USERNAME=''
ADMINUSERSSHKEY = ''
MAXNODE=2
VNETRGNAME=''
VNETNAME=''
SUBNETNAME=''


if username != '' or adminusersshkey !='':
    # Verify that cluster does not exist already
    try:
        dask_cluster = ComputeTarget(workspace=ws, name=cpu_cluster_name)
        print('Found existing cluster, use it.')
    except ComputeTargetException:
        compute_config = AmlCompute.provisioning_configuration(vm_size=VMSIZE,
                                                            min_nodes=0, max_nodes=MAXNODE,
                                                            remote_login_port_public_access='Enabled',
                                                            admin_username=USERNAME, 
                                                            admin_user_ssh_key=ADMINUSERSSHKEY,
                                                            vnet_resourcegroup_name=VNETRGNAME,
                                                            vnet_name=VNETNAME,
                                                            subnet_name=SUBNETNAME)
        dask_cluster = ComputeTarget.create(ws, cpu_cluster_name, compute_config)

    dask_cluster.wait_for_completion(show_output=True)
else:
    print('Check your user name and password')
    

Create Environment

In [None]:
daskEnv = Environment.from_conda_specification('interactieDask', './conda.yml')

Register sample storage account for this 

If you want to use your contain in a storage account, please update following account name and sas_token accordingly.

In [None]:
datastore_name = 'mtcseattle'
container_name = 'azure-service-classifier'
account_name = 'mtcseattle'
sas_token = '?sv=2020-04-08&st=2021-05-26T04%3A39%3A46Z&se=2022-05-27T04%3A39%3A00Z&sr=c&sp=rl&sig=CTFMEu24bo2X06G%2B%2F2aKiiPZBzvlWHELe15rNFqULUk%3D'

datastore = Datastore.register_azure_blob_container(workspace=ws, 
                                                    datastore_name=datastore_name, 
                                                    container_name=container_name,
                                                    account_name=account_name, 
                                                    sas_token=sas_token,
                                                    overwrite=True)
datastore = Datastore.get(ws, 'mtcseattle')

inputDataset = Dataset.File.from_files(path=(datastore, 'data'))

# This is optional 
# inputDataset = inputDataset.register(workspace=ws,
#                                        name='Azure Services Dataset',
#                                        description='Dataset containing azure related posts on Stackoverflow',
#                                        create_new_version=True)

# inputDataset = Dataset.get_by_name(ws, 'Azure Services Dataset')
# inputDataset

Set ScriptRunConfig

In [None]:
src = ScriptRunConfig(source_directory='./source',
                       script='startDask.py',
                       arguments=[inputDataset.as_mount(path_on_compute='data')],
                       environment=daskEnv,
                       compute_target=dask_cluster,
                       distributed_job_config=PyTorchConfiguration(node_count=MAXNODE)
                       )

Run the ScriptRunConfig in a Experiment

In [None]:
expName = 'Interactive Dask Cluster'
run = Experiment(ws, expName).submit(src)

In [None]:
RunDetails(run).show()

Get IP address of headnode

In [None]:
mlflow.set_tracking_uri(ws.get_mlflow_tracking_uri())
mlflow.set_experiment(expName)
mlflowrun = mlflow.get_run(run.id)

print("waiting for scheduler node's ip")
while not 'headnode' in mlflowrun._data.params:
    print('.', end ="")
    time.sleep(5)

clear_output()
headnode_private_ip = mlflowrun._data.params['headnode']
print('Headnode has IP:', headnode_private_ip)

In [None]:
# let's find the public IP and ssh port of the head node
headnode_public_ip = None
headnode_ssh_port = None
for node in dask_cluster.list_nodes():
    if node['privateIpAddress'] == headnode_private_ip:
        headnode_public_ip = node['publicIpAddress']
        headnode_ssh_port = node['port']
        break
        
if headnode_public_ip == None:
    print('Headnode not found in cluster')
else:
    print(f'Headnode is at {headnode_public_ip}:{headnode_ssh_port}')

## Establish the port-forwarding from your localhost to Dask Scheduler
Since Notebook VM does not yet support VNets, you need to build an SSH port forwarder through SSH login.

In the prior cell we looked up the public IP and port of the headnode of the cluster 

Now, open the terminal on the Notebook VM and type what the following cell outputs  


In [None]:
print(f'ssh {USERNAME}@{headnode_public_ip} -p {headnode_ssh_port} -L 8786:localhost:8786 -L 8788:{headnode_private_ip}:8787 -L 9999:localhost:8888')

Make sure to leave the terminal tab open to keep the port-forward running

![ssh](./img/port-forwarding.png)

As you see, you are forwarding 3 ports 

1. 8786 is for the scheduler and will be used to connect the client to the cluster
2. 8788 is for the Bokeh app that shows the activity on the cluster (we are mapping to the local port 8788 to avoid a conflict with the RStudio Server running on the Notebook VM)
3. 9999 is for a jupyter instance running on the head node. You can connect to the scheduler from the jupyter running on your Notebook VM or from this jupyter instance on the head node.   

To access the Bokeh app, change the URL to your notebook VM by adding `-8788` right after the machine name. If you are running this notebook on a Notebook VM, then you can create the URLs by excuting the next cell:

In [None]:
print("waiting for jupyter token")
while not 'jupyter-token' in mlflowrun._data.params:
    print('.', end ="")
    time.sleep(5)

clear_output()
jupyterToken = mlflowrun._data.params['jupyter-token']
print('Notebook url:')
print(f'http://localhost:9999/notebooks?token={jupyterToken}')
print('Bokeh url:')
print(f'http://localhost:8788')

Hopefully, you are seeing this after you clicked on the Bokeh link and then select 'Status':

![Bokeh](./img/bokeh.png)

If you are wondering what all this port business in accomplishing, please see the graph below that tries to illustrate who talks to whom and how.

![Network](./img/network.png)

## Run some jobs on the cluster
If you are able to see the Bokeh app, it is time to use the cluster. Thanks to the port forward, the scheduler appears to the notebook VM at `tcp://localhost:8786`. You should see 10 workers.

## Shut cluster down
To shut the cluster down, cancel the job that runs the cluster. 

In [None]:
for run in ws.experiments[expName].get_runs():
    if run.get_status() == "Running":
        print(f'cancelling run {run.id}')
        run.cancel()