# Start Interactive Dask Cluster

In [None]:
pip install --upgrade git+https://github.com/drabastomek/dask-cloudprovider

**RESTART YOUR KERNEL**

## Imports

Import all packages used in this notebook.

In [1]:
import os

from azureml.widgets import RunDetails
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core import Workspace, Experiment, Datastore, Dataset, Environment

## Azure ML setup

Get the workspace.

In [2]:
ws = Workspace.from_config()
ws

Workspace.create(name='azureml', subscription_id='6560575d-fa06-4e7d-95fb-f962e74efd7a', resource_group='cody-eastus-rg')

### Enter your name

Enter your name and virtual network information.

In [3]:
### name
name        = 'cody'             # replace or not to replace

### vnet settings
vnet_rg     = ws.resource_group  # replace if needed
vnet_name   = f'wifi'            # replace if needed
subnet_name = '5GHz'             # replace if needed

### azure ml names 
ct_name     = f'{name}-ct'       # replace if desired
env_name    = f'{name}-env'      # replace if desired

### trust but verify
verify = f'''
Name: {name}

vNET RG: {vnet_rg}
vNET name: {vnet_name}
vNET subnet name: {subnet_name}

Compute target: {ct_name}
Environment name: {env_name}
'''

print(verify)


Name: cody

vNET RG: cody-eastus-rg
vNET name: wifi
vNET subnet name: 5GHz

Compute target: cody-ct
Environment name: cody-env



### Create VM pool

Create Azure ML VM pool for creating remote dask cluster(s).

In [4]:
if ct_name not in ws.compute_targets:
    # create config for Azure ML cluster
    # change properties as needed
    config = AmlCompute.provisioning_configuration(
             vm_size                       = 'STANDARD_DS13_V2', # 8 core 56 GiB 112 SSD 
             min_nodes                     = 0,
             max_nodes                     = 100,
             vnet_resourcegroup_name       = vnet_rg,              
             vnet_name                     = vnet_name,         
             subnet_name                   = subnet_name,          
             idle_seconds_before_scaledown = 300
    )
    ct = ComputeTarget.create(ws, ct_name, config)
    ct.wait_for_completion(show_output=True)    
else:
    ct = ws.compute_targets[ct_name]
    
ct

AmlCompute(workspace=Workspace.create(name='azureml', subscription_id='6560575d-fa06-4e7d-95fb-f962e74efd7a', resource_group='cody-eastus-rg'), name=cody-ct, id=/subscriptions/6560575d-fa06-4e7d-95fb-f962e74efd7a/resourceGroups/cody-eastus-rg/providers/Microsoft.MachineLearningServices/workspaces/azureml/computes/cody-ct, type=AmlCompute, provisioning_state=Succeeded, location=eastus, tags=None)

### Mount Compute Instance code fileshare

This will create the compute instance code fileshare as a datastore. The default name `code-391ff5ac-6576-460f-ba4d-7e03433c68b6` and has the same credentials as the default fileshare for the workspace. This will be mounted for easy access to notebooks on the cluster.

In [5]:
filesharename = 'codefileshare'

if filesharename not in ws.datastores:
    Datastore.register_azure_file_share(ws, filesharename,
                                        'code-391ff5ac-6576-460f-ba4d-7e03433c68b6', # lol                    
                                        account_name = ws.datastores['workspacefilestore'].account_name, 
                                        account_key  = ws.datastores['workspacefilestore'].account_key   
                                       )

### Get data

This will get NOAA ISD Weather data which is used in the demo. If you already have data in Blob or ALDSv1v2 you want to use, skip this.

In [6]:
dsetdata = 'noaa-isd-files'
data_url = 'https://azureopendatastorage.blob.core.windows.net/isdweatherdatacontainer/ISDWeather'

if dsetdata not in ws.datasets:
    os.system('sudo chmod 777 /mnt')
    for year in range(2008, 2020+1):
        ds = Dataset.File.from_files(f'{data_url}/year={year}/month=*/*.parquet', validate=False)
        print('Downloading...')
        %time ds.download(f'/mnt/data/isd/year={year}', overwrite=True)
    print('Uploading...')
    %time ws.get_default_datastore().upload('/mnt/data/isd', '/noaa-isd', show_progress=False)
    ds = Dataset.File.from_files((ws.get_default_datastore(), '/noaa-isd/**/*.parquet'))
    ds = ds.register(ws, dsetdata)

### Start cluster



In [7]:
from dask_cloudprovider import AzureMLCluster

In [8]:
packages = ['mpi4py',
            'distributed',
            'dask[complete]',
            'dask-ml[complete]',
            'fastparquet',
            'pyarrow',
            'jupyterlab',
            'joblib',
            'notebook',
            'adlfs', 
            'fsspec', 
            'azureml-sdk',
            'lz4']

env = Environment(name=env_name)

for package in packages:
    env.python.conda_dependencies.add_pip_package(package)

In [9]:
cluster = AzureMLCluster(ws, 
                         ct, 
                         env, 
                         jupyter=True, 
                         datastores=[ws.datastores[datastore] for datastore in ws.datastores]
                        )

############################## Setting up cluster ##############################
########################## Submitting the experiment ###########################




####################### Waiting for scheduler node's IP ########################
....................


########################### Scheduler: 10.0.0.5:8786 ###########################
############################### On the same VNET ###############################
########################### Connections established ############################
############################# Scaling to 1 workers #############################
############################### Scaling is done ################################


In [10]:
cluster.run

Experiment,Id,Type,Status,Details Page,Docs Page
dask-cloudprovider,dask-cloudprovider_1584241485_a696ee8f,azureml.scriptrun,Running,Link to Azure Machine Learning studio,Link to Documentation


In [11]:
cluster.scale(30) # need more than default quota for this 

In [12]:
cluster

VBox(children=(HTML(value='<h2>AzureMLCluster</h2>'), HBox(children=(HTML(value='\n<div>\n  <style scoped>\n  …

In [13]:
#from dask.distributed import Client
#c = Client(cluster)

In [14]:
#cluster.close()

In [15]:
#help(AzureMLCluster)