# Scaling Python with Azure ML and Dask

![Describe gif](media/describe.gif)

## Environment setup

This notebook assumes you are using an Azure ML Compute Instance with the default kernel `azureml_py36`. This contains many unneccesary packages. If you want to avoid a long image build time, you may want to create a new conda environment with the minimal packages needed for your scenario. 

It is important that the local environment matches the remote environment to avoid mismatch issues when submitting commands to the remote cluster. To help with this, we will use Azure ML Environments. 

In [None]:
pip install --upgrade dask[complete] dask-ml[complete] tpot  adlfs lz4 distributed fastparquet pyarrow azureml-sdk[notebooks] lightgbm

Uninstall some bugs.

In [None]:
pip uninstall azureml-samples azureml-mlflow -y

## Important! 

Restart your kernel.

In [None]:
# this is for a strange bug with compute instances 
import os

os.system('sudo cp /etc/nginx/nginx.conf setup/temp.conf') # stupid

nginx = ''

with open('setup/temp.conf') as f:
    for line in f.readlines():
        if 'websocket/|/ws/' in line:
            nginx += line.replace('websocket/|/ws/', 'websocket/|/ws')
        else:
            nginx += line
       
with open('setup/temp2.conf', 'w') as f:
    f.write(nginx)
    
os.system('sudo mv setup/temp2.conf /etc/nginx/nginx.conf')
os.system('sudo service nginx restart')
os.system('rm setup/temp.conf');

## Imports

Import all packages used in this notebook.

In [1]:
import os
import sys
import dask
import glob
import time
import fsspec
import socket
import matplotlib

import pandas as pd
import dask.dataframe as dd
import matplotlib.pyplot as plt

from datetime import datetime
from dask.distributed import Client
from IPython.core.display import HTML

from azureml.widgets import RunDetails
from azureml.train.estimator import Estimator
from azureml.core.runconfig import MpiConfiguration
from azureml.core.compute import ComputeTarget, AmlCompute
from azureml.core.conda_dependencies import CondaDependencies 
from azureml.core import Workspace, Experiment, Dataset, Environment

%matplotlib inline 

Failure while loading azureml_run_type_providers. Failed to load entrypoint hyperdrive = azureml.train.hyperdrive:HyperDriveRun._from_run_dto with exception cannot import name '_DistributedTraining'.


## Azure ML setup

Get the workspace.

In [2]:
ws = Workspace.from_config()
ws

Workspace.create(name='ncus-azureml', subscription_id='6560575d-fa06-4e7d-95fb-f962e74efd7a', resource_group='copeters-rg')

### Data access setup

This is for a ADLS gen2 account I have provisioned with data. It should be read-able publicly.

You **should not** keep storage account keys in plain text format, and you definitely should not upload them to github in a public repo. 

Use the keyvault with the workspace through the Python SDK or Azure Portal to set the account name and key for your storage account, and use the keyvault to retrieve the secrets and pass them through. 

In [3]:
keyvault = ws.get_default_keyvault()
keyvault.set_secret('daskdataaccount', 'data4dask')
keyvault.set_secret('daskdatakey', 'mupxHTCWrYQC252cFAWCAm7lSlMPTCt5J3j7FCXIlXW/k3OIdLrWssVnMGKVX6N96XoIlw9O8PkQya3cNB9xKw==')

In [4]:
STORAGE_OPTIONS = {
    'account_name': keyvault.get_secret('daskdataaccount'), 
    'account_key' : keyvault.get_secret('daskdatakey')
}

### Create environment 

Create the environment to be used on the remote cluster. 

In [5]:
env_name = 'dask-ml'

if env_name not in ws.environments:
    env = Environment.from_existing_conda_environment(env_name, 'azureml_py36')
    env.python.conda_dependencies.add_pip_package('mpi4py') # needed for remote cluster
    env = env.register(ws)
else:
    env = ws.environments[env_name]
    
env.name, env.version

('dask-ml', '1')

### Create VM pool

Create Azure ML VM pool for creating remote dask cluster(s).

In [6]:
pool_name = 'raspberrypis'

if pool_name not in ws.compute_targets:
    # create config for Azure ML cluster
    # change properties as needed
    config = AmlCompute.provisioning_configuration(
             vm_size                       = 'STANDARD_D13_V2',   # 8 vCPUS 56 GB RAM 112 GB disk 
             max_nodes                     = 100,
             vnet_resourcegroup_name       = ws.resource_group,   # replace if needed
             vnet_name                     = 'dialup-network',    # replace if needed
             subnet_name                   = 'default',           # replace if needed
             idle_seconds_before_scaledown = 300
    )
    ct = ComputeTarget.create(ws, pool_name, config)
    ct.wait_for_completion(show_output=True)    
else:
    ct = ws.compute_targets[pool_name]
    
ct

AmlCompute(workspace=Workspace.create(name='ncus-azureml', subscription_id='6560575d-fa06-4e7d-95fb-f962e74efd7a', resource_group='copeters-rg'), name=raspberrypis, id=/subscriptions/6560575d-fa06-4e7d-95fb-f962e74efd7a/resourceGroups/copeters-rg/providers/Microsoft.MachineLearningServices/workspaces/ncus-azureml/computes/raspberrypis, type=AmlCompute, provisioning_state=Succeeded, location=northcentralus, tags=None)

## Startup cluster

Start the run now. The first time, this will take 

In [7]:
exp_name   = 'dask-lightgbm'

est = Estimator('../setup', 
                compute_target          = ct, 
                entry_script            = 'start.py',          # sets up Dask cluster
                environment_definition  = env,                 # use same env as local
                node_count              = 40,                  # 20 nodes -> 160 vCPUs, 1 TB RAM
                distributed_training    = MpiConfiguration()
               )

#run = next(ws.experiments[exp_name].get_runs()) # use this to get existing run (if kernel restarted, etc)
run = Experiment(ws, exp_name).submit(est)
run



Experiment,Id,Type,Status,Details Page,Docs Page
dask-lightgbm,dask-lightgbm_1578874759_ec12121e,azureml.scriptrun,Starting,Link to Azure Machine Learning studio,Link to Documentation


## Scale up with Dask and Azure ML

In [8]:
RunDetails(run).show()

_UserRunWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', '…

### Connect to cluster

In [9]:
# port to forward the dask dashboard to on the compute instance
# we do not use 8787 because it is already in use 
dashboard_port = 4242

print("waiting for scheduler node's ip")
while run.get_status() != 'Canceled' and 'scheduler' not in run.get_metrics():
    print('.', end ="")
    time.sleep(5)

if run.get_status() == 'Canceled':
    print('\nRun was canceled')
else:
    print(f'\nSetting up port forwarding...')
    os.system(f'killall socat') # kill all socat processes - cleans up previous port forward setups 
    os.system(f'setsid socat tcp-listen:{dashboard_port},reuseaddr,fork tcp:{run.get_metrics()["dashboard"]} &')
    print(f'Cluster is ready to use.')

c = Client(f'tcp://{run.get_metrics()["scheduler"]}')

print(f'\n\n{c}')

c.restart()

# need to get the dashboard link 
dashboard_url = f'https://{socket.gethostname()}-{dashboard_port}.{ws.get_details()["location"]}.instances.azureml.net/status'
HTML(f'<a href="{dashboard_url}">Dashboard link</a>')

waiting for scheduler node's ip

Setting up port forwarding...
Cluster is ready to use.


<Client: 'tcp://10.2.0.5:8786' processes=40 threads=640, memory=4.73 TB>


In [10]:
protocol  = 'abfs'      # use 'adl' for Azure Data Lake Gen 1
container = 'datasets'  # 

In [11]:
fs = fsspec.filesystem(protocol, **STORAGE_OPTIONS, container_name=container)

In [12]:
files = []
for file in fs.glob('noaa/isd/*/*'):
    #print(fs.ls(f'{month}/'))
    files += fs.ls(f'{file}/')

In [13]:
files = [f'{protocol}://{container}/{file}' for file in files if '2019' not in file]
files[-5:]

['abfs://datasets/noaa/isd/year=2018/month=5/part-00066-tid-9138739344806125380-ef942066-1d58-49f9-8ecb-3329cbe6e57e-383761.c000.snappy.parquet',
 'abfs://datasets/noaa/isd/year=2018/month=6/part-00049-tid-9138739344806125380-ef942066-1d58-49f9-8ecb-3329cbe6e57e-383744.c000.snappy.parquet',
 'abfs://datasets/noaa/isd/year=2018/month=7/part-00107-tid-9138739344806125380-ef942066-1d58-49f9-8ecb-3329cbe6e57e-383859.c000.snappy.parquet',
 'abfs://datasets/noaa/isd/year=2018/month=8/part-00103-tid-9138739344806125380-ef942066-1d58-49f9-8ecb-3329cbe6e57e-383807.c000.snappy.parquet',
 'abfs://datasets/noaa/isd/year=2018/month=9/part-00089-tid-9138739344806125380-ef942066-1d58-49f9-8ecb-3329cbe6e57e-383784.c000.snappy.parquet']

In [14]:
df = dask.delayed(dd.read_parquet)(files, engine='pyarrow', storage_options=STORAGE_OPTIONS).compute()

In [15]:
df = df.fillna(0) # optional but highly recommended

In [16]:
df['month'] = df['datetime'].dt.month

In [17]:
cols = list(df.columns)
cols = [col for col in cols if col not in ['version', 'datetime']]
cols

['usaf',
 'wban',
 'latitude',
 'longitude',
 'elevation',
 'windAngle',
 'windSpeed',
 'temperature',
 'seaLvlPressure',
 'cloudCoverage',
 'presentWeatherIndicator',
 'pastWeatherIndicator',
 'precipTime',
 'precipDepth',
 'snowDepth',
 'stationName',
 'countryOrRegion',
 'p_k',
 'year',
 'day',
 'month']

In [18]:
X = df[[col for col in cols if col not in ['temperature']]].persist()
y = df.temperature.persist()

In [20]:
import dask_lightgbm.core as dlgbm

In [21]:
def listen_port():
    listen_port.port += 10
    return listen_port.port

In [22]:
listen_port.port = 13000


In [23]:
lgbm = dlgbm.LGBMRegressor(local_listen_port=listen_port, seed=42)

In [24]:
lgbm = lgbm.fit(X, y)



ModuleNotFoundError: No module named 'dask_lightgbm'

In [None]:
# need to work around not being a pip package...

In [None]:
%time y_pred = xgb.predict(X).compute()

In [None]:
rmse = (((y.to_dask_array().compute()-y_pred)**2).mean())**.5

In [None]:
print(rmse)

## End the run

Cluster will return to 0 nodes.

In [None]:
c.close()
run.cancel()