# Running Dask on AzureML


In [1]:
import subprocess, socket
from azureml.core import Workspace, Experiment
from azureml.widgets import RunDetails
from azureml.core.runconfig import RunConfiguration, MpiConfiguration
from azureml.train.estimator import Estimator

In [2]:
class PortForwarder():
    '''A helper to forward ports from the Compute Instance to the Azure ML Cluster in the same VNet'''
    active_instances = set()
    
    def __init__(self, from_port, to_ip, to_port):
        self.from_port = from_port
        self.to_ip = to_ip
        self.to_port = to_port
        
    def start(self):
        self._socat = subprocess.Popen(["/usr/bin/socat", 
                             f"tcp-listen:{self.from_port},reuseaddr,fork", 
                             f"tcp:{self.to_ip}:{self.to_port}"],
                            stderr=subprocess.PIPE, stdout=subprocess.PIPE, universal_newlines=True)
        PortForwarder.active_instances.add(self)
        return self
        
    def stop(self):
        PortForwarder.active_instances.remove(self)
        return self._socat.terminate()
    
    def stop_all():
        for instance in list(PortForwarder.active_instances):
            instance.stop()

## Starting the cluster

In [3]:
ws = Workspace.from_config()
ws

Workspace.create(name='ncus-azureml', subscription_id='6560575d-fa06-4e7d-95fb-f962e74efd7a', resource_group='copetersrg')

In [4]:
ct = ws.compute_targets['dask-cluster']
ct

AmlCompute(workspace=Workspace.create(name='ncus-azureml', subscription_id='6560575d-fa06-4e7d-95fb-f962e74efd7a', resource_group='copetersrg'), name=dask-cluster, id=/subscriptions/6560575d-fa06-4e7d-95fb-f962e74efd7a/resourceGroups/copetersrg/providers/Microsoft.MachineLearningServices/workspaces/ncus-azureml/computes/dask-cluster, type=AmlCompute, provisioning_state=Succeeded, location=northcentralus, tags=None)

In [5]:
if 'weather-files' not in ws.datasets:
    ds = Dataset.File.from_files('https://azureopendatastorage.blob.core.windows.net/isdweatherdatacontainer/ISDWeather/*/*/*.parquet', validate=False)
    ds = ds.register(ws, 'weather-files')
else:
    ds = ws.datasets['weather-files']
    
ds

{
  "source": [
    "https://azureopendatastorage.blob.core.windows.net/isdweatherdatacontainer/ISDWeather/*/*/*.parquet"
  ],
  "definition": [
    "GetFiles"
  ],
  "registration": {
    "id": "c0fe2407-f46d-4462-b993-f23ed423bd15",
    "name": "weather-files",
    "version": 1,
    "workspace": "Workspace.create(name='ncus-azureml', subscription_id='6560575d-fa06-4e7d-95fb-f962e74efd7a', resource_group='copetersrg')"
  }
}

Starting the Dask cluster using an Estimator with MpiConfiguration. Make sure the cluster is able to scale up to 10 nodes or change the `node_count` below. 

In [6]:
est = Estimator('dask', 
                compute_target=ct, 
                entry_script='startDask.py', 
                conda_dependencies_file='environment.yml', 
                script_params={'--datastore': ws.get_default_datastore()},
                inputs=[ds.as_named_input('weather').as_download('/tmp/noaa')],
                node_count=10,
                distributed_training=MpiConfiguration())

run = Experiment(ws, 'dask').submit(est)

In [8]:
RunDetails(run).show()

_UserRunWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'NOTSET',…

In [23]:
import time
from IPython.display import clear_output

print("waiting for scheduler node's ip")
while run.get_status() != 'Canceled' and 'headnode' not in run.get_metrics():
    print('.', end ="")
    time.sleep(5)

clear_output()

ci_port = 9798

if run.get_status() == 'Canceled':
    print('Run was canceled')
else:
    dashboard_url = 'https://{}-{}.{}.instances.azureml.net/status'.format(socket.gethostname(), ci_port, ws.get_details()['location'])
    run.log('dashboard_url', dashboard_url) if 'dashboard_url' not in run.get_metrics() else 0
    
    scheduler = run.get_metrics()['scheduler']
    headnode = run.get_metrics()['headnode']
    
    dashboard = PortForwarder(ci_port, headnode, scheduler.split(':')[1]).start()
    
    print('Setup complete, cluster is ready to use.')

Setup complete, cluster is ready to use.


In [24]:
print(run.get_metrics()['dashboard_url'])

https://dask-instance-9797.northcentralus.instances.azureml.net/status


In [22]:
dashboard

<__main__.PortForwarder at 0x7f48893201d0>

In [15]:
import dask
import pandas as pd
import dask.dataframe as dd
from dask.distributed import Client

c = Client(f'tcp://{scheduler}')
c.restart()
c

0,1
Client  Scheduler: tcp://10.1.0.5:8786  Dashboard: http://10.1.0.5:8787/status,Cluster  Workers: 10  Cores: 200  Memory: 1.48 TB


In [16]:
import glob

files = dask.delayed(glob.glob)('/tmp/noaa/**/*.parquet', recursive=True).compute()
files

['/tmp/noaa/year=2017/month=5/part-00002-tid-1321158002197267978-8e3eb092-4b7a-42de-97ee-e23297ed8955-120.c000.snappy.parquet',
 '/tmp/noaa/year=2017/month=10/part-00004-tid-1321158002197267978-8e3eb092-4b7a-42de-97ee-e23297ed8955-122.c000.snappy.parquet',
 '/tmp/noaa/year=2017/month=7/part-00006-tid-1321158002197267978-8e3eb092-4b7a-42de-97ee-e23297ed8955-124.c000.snappy.parquet',
 '/tmp/noaa/year=2017/month=6/part-00010-tid-1321158002197267978-8e3eb092-4b7a-42de-97ee-e23297ed8955-128.c000.snappy.parquet',
 '/tmp/noaa/year=2017/month=12/part-00005-tid-1321158002197267978-8e3eb092-4b7a-42de-97ee-e23297ed8955-123.c000.snappy.parquet',
 '/tmp/noaa/year=2017/month=1/part-00001-tid-1321158002197267978-8e3eb092-4b7a-42de-97ee-e23297ed8955-119.c000.snappy.parquet',
 '/tmp/noaa/year=2017/month=4/part-00007-tid-1321158002197267978-8e3eb092-4b7a-42de-97ee-e23297ed8955-125.c000.snappy.parquet',
 '/tmp/noaa/year=2017/month=3/part-00000-tid-1321158002197267978-8e3eb092-4b7a-42de-97ee-e23297ed8955-

In [17]:
df = dd.from_delayed([dask.delayed(pd.read_parquet)(file) for file in files])

In [18]:
df = df.persist()

In [19]:
len(df)

KeyboardInterrupt: 

In [25]:
run.cancel()

## Establish the port-forwarding from Compute Instance to Dask Dashboard

## Run some jobs on the cluster
If you are able to see the Bokeh app, it is time to use the cluster. Thanks to the port forward, the scheduler appears to the notebook VM at `tcp://localhost:8786`. You should see 10 workers.

In [None]:
import dask
import dask.dataframe as dd

from azureml.opendatasets import NoaaIsdWeather

from datetime import datetime, timedelta

In [None]:
start = datetime(2010, 1, 1)

In [None]:
fns = [dask.delayed(NoaaIsdWeather(start+timedelta(days=i), start+timedelta(days=i+1)).to_pandas_dataframe) for i in range(365*10)]

In [None]:
sample_df = NoaaIsdWeather(start, start+timedelta(days=1)).to_pandas_dataframe()
sample_df.head()

In [None]:
fns[0].compute()

In [None]:
df = dd.from_delayed(fns)

In [None]:
c

In [None]:
help(dset._dataflow)

In [None]:
exp = Experiment(ws, 'dask')
exp

In [None]:
runs = exp.get_runs()
run = next(runs)
run

In [None]:
import dask
import dask.dataframe as dd

In [None]:
import matplotlib
import matplotlib.pyplot as plt

from datetime import datetime

%matplotlib inline

In [None]:
from dask.distributed import Client

c = Client('tcp://localhost:8786')
c.restart()
c

In [None]:
ds = run.get_metrics()['datastore']
ds

In [None]:
path = ds + ''
path

In [None]:
def load_data(path):
    df = dd.read_csv(path+'/datasets/isd/*data.csv', dtype={'usaf': 'object'})
    return df

In [None]:
NoaaIsd().to_dask_dataframe()

In [None]:
df = dask.delayed(load_data)(path).compute()

In [None]:
df.head()

In [None]:
df.npartitions

In [None]:
%time len(df)

In [None]:
df.datetime = dd.to_datetime(df.datetime).dt.floor('d')

In [None]:
df = df.repartition(npartitions=150)

In [None]:
df = df.set_index(df.datetime, sorted=True).persist()

In [None]:
df.head()

In [None]:
%time len(df)

In [None]:
df.npartitions

In [None]:
df2 = df.persist()

In [None]:
df.describe().compute()

In [None]:
means = df.groupby(df.index).mean().compute()
means.head()

In [None]:
df = df.drop(['datetime'], axis=1)

In [None]:
df.index

In [None]:
def write_data(path):
    df.to_parquet(path)

In [None]:
a = dask.delayed(write_data)(ds+'/dask/outputs/isd').compute()

In [None]:
counts = df.groupby([df.index.month, df.index.year]).day.count().compute()

In [None]:
cs = [counts[month][2015] for month in range(1, 13)]
cs

In [None]:
for col in list(means.columns):
    fig = plt.figure(figsize=(16, 8))
    plt.style.use('dark_background')
    means[col].plot(color='b')
    plt.title('Average of {}'.format(col))
    plt.xlim([datetime(2015, 1, 1), datetime(2015, 12, 1)])
    plt.grid()
    
    run.log_image(col, plot=plt)

In [None]:
df.memory_usage(index=True, deep=True).sum().compute()

In [None]:
df.info()

See if the cluster works

In [None]:
import time
import numpy as np
from dask import delayed, visualize

def inc(x):
    time.sleep(abs(np.random.normal(5, 2)))
    return x + 1

fut = []
for i in range(10):
    fut.append( c.submit(delayed(inc), i) )

fut

In [None]:
for i in fut:
    print(i.result())

In [None]:
def sum(a):
    x = 0
    for y in a:
        x += y
    return x

results = []
for f in fut:
    results.append(f.result())
    
fut2 = c.submit(sum, results)
fut2

In [None]:
fut2.result().compute()

In [None]:
visualize(fut2.result())

# Training on Large Datasets
(from https://github.com/dask/dask-tutorial)

Sometimes you'll want to train on a larger than memory dataset. `dask-ml` has implemented estimators that work well on dask arrays and dataframes that may be larger than your machine's RAM.

In [None]:
from dask.distributed import Client
import joblib
import dask.array as da
import dask.delayed
from sklearn.datasets import make_blobs
import numpy as np

We'll make a small (random) dataset locally using scikit-learn.

In [None]:
n_centers = 12
n_features = 20

X_small, y_small = make_blobs(n_samples=1000, centers=n_centers, n_features=n_features, random_state=0)

centers = np.zeros((n_centers, n_features))

for i in range(n_centers):
    centers[i] = X_small[y_small == i].mean(0)
    
centers[:4]

The small dataset will be the template for our large random dataset.
We'll use `dask.delayed` to adapt `sklearn.datasets.make_blobs`, so that the actual dataset is being generated on our workers. 

In [None]:
n_samples_per_block = 200000
n_blocks = 500

delayeds = [dask.delayed(make_blobs)(n_samples=n_samples_per_block,
                                     centers=centers,
                                     n_features=n_features,
                                     random_state=i)[0]
            for i in range(n_blocks)]
arrays = [da.from_delayed(obj, shape=(n_samples_per_block, n_features), dtype='float64')
          for obj in delayeds]
X = da.concatenate(arrays)
X

In [None]:
# Check the size of the array
X.nbytes / 1e9

In [None]:
# Only run this on the cluster.
X = X.persist()  

The algorithms implemented in Dask-ML are scalable. They handle larger-than-memory datasets just fine.

They follow the scikit-learn API, so if you're familiar with scikit-learn, you'll feel at home with Dask-ML.

In [None]:
from dask_ml.cluster import KMeans
clf = KMeans(init_max_iter=3, oversampling_factor=10)

In [None]:
%time clf.fit(X)

In [None]:
clf.labels_

In [None]:
clf.labels_[:10].compute()

## Shut cluster down
To shut the cluster down, cancel the job that runs the cluster. 

In [None]:
for run in ws.experiments['dask'].get_runs():
    if run.get_status() == "Running":
        print(f'cancelling run {run.id}')
        run.cancel()

### Just for convenience, get the latest running Run

In [None]:
for run in ws.experiments['dask'].get_runs():
    if run.get_status() == "Running":
        print(f'latest running run is {run.id}')
        break