# Running a DASK cluster with RAPIDS

This notebook runs a DASK cluster with NVIDIA RAPIDS. RAPIDS uses NVIDIA CUDA for high-performance GPU execution, exposing GPU parallelism and high memory bandwidth through a user-friendly Python interface. It includes a dataframe library called cuDF which will be familiar to Pandas users, as well as an ML library called cuML that provides GPU versions of all machine learning algorithms available in Scikit-learn. 

This notebook shows how through DASK, RAPIDS can take advantage of multi-node, multi-GPU configurations on AzureML.

Note: This notebook is deploying the AzureML cluster to a VNet. In this case the following names are used to identify the VNet and subnet:

In [1]:
vnet_resourcegroup_name='demo'
vnet_name='myvnet'
subnet_name='default'

In [2]:
import os
import json
import time

from azureml.core import Workspace, Experiment, Environment
from azureml.core.conda_dependencies import CondaDependencies
from azureml.core.compute import AmlCompute, ComputeTarget
from azureml.data.data_reference import DataReference
from azureml.core.runconfig import RunConfiguration, MpiConfiguration
from azureml.core import ScriptRunConfig
from azureml.train.estimator import Estimator
from azureml.exceptions import ComputeTargetException
from azureml.widgets import RunDetails

In [3]:
gpu_cluster_name = "nd12-vnet-clustr"

ws = Workspace.from_config()

### Deply the AmlCompute cluster
The next cell is deploying the AmlCompute cluster. The cluster is configured to scale down to 0 nodes after 2 minuten, so no cost is incurred while DASK is not running (and thus no nodes are spun up on the cluster as the result of this cell, yet). This cell only needs to be executed once and the cluster can be reused going forward.

In [4]:
try:
    gpu_cluster = ComputeTarget(workspace=ws, name=gpu_cluster_name)
    print('Found existing compute target')
    
except ComputeTargetException:
    print("Creating new cluster")

    provisioning_config = AmlCompute.provisioning_configuration(
        vm_size="Standard_ND12s", 
        min_nodes=0, 
        max_nodes=10,
        idle_seconds_before_scaledown=120,
        vnet_resourcegroup_name=vnet_resourcegroup_name,
        vnet_name=vnet_name,
        subnet_name=subnet_name
    )
    gpu_cluster = ComputeTarget.create(ws, gpu_cluster_name, provisioning_config)

    print("waiting for nodes")
    gpu_cluster.wait_for_completion(show_output=True)

Found existing compute target


### Copy the data to Azure Blob Storage

This next cell is pulling the NYC taxi data set down and then uploads it to the AzureML workspace's default data store. The all nodes of the DASK cluster we are creating further down will then be able to access the data.

In [5]:
import io
import os
import sys
import urllib.request
from tqdm import tqdm
from time import sleep

cwd = os.getcwd()

data_dir = os.path.abspath(os.path.join(cwd, 'data'))
if not os.path.exists(data_dir):
    os.makedirs(data_dir)

taxidir = os.path.join(data_dir, 'nyctaxi')
if not os.path.exists(taxidir):
    os.makedirs(taxidir)

filenames = []
local_paths = []
for i in range(1, 13):
    filename = "yellow_tripdata_2015-{month:02d}.csv".format(month=i)
    filenames.append(filename)
    
    local_path = os.path.join(taxidir, filename)
    local_paths.append(local_path)

for idx, filename in enumerate(filenames):
    url = "http://dask-data.s3.amazonaws.com/nyc-taxi/2015/" + filename
    print("- Downloading " + url)
    if not os.path.exists(local_paths[idx]):
        with open(local_paths[idx], 'wb') as file:
            with urllib.request.urlopen(url) as resp:
                length = int(resp.getheader('content-length'))
                blocksize = max(4096, length // 100)
                with tqdm(total=length, file=sys.stdout) as pbar:
                    while True:
                        buff = resp.read(blocksize)
                        if not buff:
                            break
                        file.write(buff)
                        pbar.update(len(buff))
    else:
        print("- File already exists locally")

print("- Uploading taxi data... ")
ws = Workspace.from_config()
ds = ws.get_default_datastore()

ds.upload(
    src_dir=taxidir,
    target_path='nyctaxi',
    show_progress=True)

print("- Data transfer complete")

- Downloading http://dask-data.s3.amazonaws.com/nyc-taxi/2015/yellow_tripdata_2015-01.csv
- File already exists locally
- Downloading http://dask-data.s3.amazonaws.com/nyc-taxi/2015/yellow_tripdata_2015-02.csv
- File already exists locally
- Downloading http://dask-data.s3.amazonaws.com/nyc-taxi/2015/yellow_tripdata_2015-03.csv
- File already exists locally
- Downloading http://dask-data.s3.amazonaws.com/nyc-taxi/2015/yellow_tripdata_2015-04.csv
- File already exists locally
- Downloading http://dask-data.s3.amazonaws.com/nyc-taxi/2015/yellow_tripdata_2015-05.csv
- File already exists locally
- Downloading http://dask-data.s3.amazonaws.com/nyc-taxi/2015/yellow_tripdata_2015-06.csv
- File already exists locally
- Downloading http://dask-data.s3.amazonaws.com/nyc-taxi/2015/yellow_tripdata_2015-07.csv
- File already exists locally
- Downloading http://dask-data.s3.amazonaws.com/nyc-taxi/2015/yellow_tripdata_2015-08.csv
- File already exists locally
- Downloading http://dask-data.s3.amazon

### Create the DASK Cluster

On the AMLCompute cluster we are now running a Python job that will run a DASK cluster. 

In [6]:
mpi_config = MpiConfiguration()
mpi_config.process_count_per_node = 2

est = Estimator(
    source_directory='./dask',
    compute_target=gpu_cluster,
    entry_script='init-dask.py',
    script_params={
        '--data': ws.get_default_datastore(),
        '--gpus': str(2)  # The number of GPUs available on each node
        },
    node_count=3,
    use_gpu=True,
    distributed_training=mpi_config,
    conda_dependencies_file='rapids-0.9.yaml')

run = Experiment(ws, "init-dask-env").submit(est)



Let's use the widget to monitor how the DASK cluster spins up. When run for the first time on a workspace, the following thing will happen:

1. The docker image will to be created, which takes about 20 minutes. 
2. Then AzureML will start to scale the cluster up by provisioning the required number of nodes (`node_count` above), which will take another 5-10 minutes with the chosen Standard_ND12s
3. The docker image is being transferred over to the compute nodes, which, given the size of about 8 GB takes another 3-5 minutes

So alltogether the process will take up to 30 minutes when run for the first time.

In [9]:
from azureml.widgets import RunDetails
RunDetails(run).show()

_UserRunWidget(widget_settings={'childWidgetDisplay': 'popup', 'send_telemetry': False, 'log_level': 'INFO', '…

In [10]:
from IPython.display import clear_output
import time

it = 0
while not "headnode" in run.get_metrics():
    clear_output(wait=True)
    print("waiting for scheduler node's ip " + str(it) )
    time.sleep(1)
    it += 1

headnode = run.get_metrics()["headnode"]
print("headnode has ip: ", headnode)

headnode has ip:  172.17.0.5


In [None]:
print("ssh -vvv -N -L 0.0.0.0:8786:{headnode}:8786 -L 0.0.0.0:8787:{headnode}:8787 dask@{nodeip} -p {nodeport}".format(
    headnode=headnode,
    nodeip=gpu_cluster.list_nodes()[0]['ipAddress'],
    nodeport=gpu_cluster.list_nodes()[0]['port']))

In [11]:
import distributed
client = distributed.Client('tcp://{}:8786'.format(run.get_metrics()["headnode"]))
client.restart()
client

0,1
Client  Scheduler: tcp://172.17.0.5:8786  Dashboard: http://172.17.0.5:8787/status,Cluster  Workers: 6  Cores: 6  Memory: 0 B


In [12]:
import dask

print("- setting dask settings")
dask.config.set({'distributed.scheduler.work-stealing': False})
dask.config.set({'distributed.scheduler.bandwidth': 1})

print("-- Changes to dask settings")
print("--- Setting work-stealing to ", dask.config.get('distributed.scheduler.work-stealing'))
print("--- Setting scheduler bandwidth to ", dask.config.get('distributed.scheduler.bandwidth'))
print("-- Settings updates complete")

- setting dask settings
-- Changes to dask settings
--- Setting work-stealing to  False
--- Setting scheduler bandwidth to  1
-- Settings updates complete


In [13]:
# helper function which takes a DataFrame partition
def clean(df_part, remap, must_haves):    
    # some col-names include pre-pended spaces remove & lowercase column names
    tmp = {col:col.strip().lower() for col in list(df_part.columns)}
    df_part = df_part.rename(tmp)
    
    # rename using the supplied mapping
    df_part = df_part.rename(remap)
    
    # iterate through columns in this df partition
    for col in df_part.columns:
        # drop anything not in our expected list
        if col not in must_haves:
            df_part = df_part.drop(col)
            continue
        
        # if column was read as a string, recast as float
        if df_part[col].dtype == 'object':
            df_part[col] = df_part[col].str.fillna('-1')
            df_part[col] = df_part[col].astype('float32')
        else:
            # downcast from 64bit to 32bit types
            # Tesla T4 are faster on 32bit ops
            if 'int' in str(df_part[col].dtype):
                df_part[col] = df_part[col].astype('int32')
            if 'float' in str(df_part[col].dtype):
                df_part[col] = df_part[col].astype('float32')
            df_part[col] = df_part[col].fillna(-1)
    
    return df_part

In [14]:
import os
import cudf

def read_csv(path):
    import cudf
    # list of column names that need to be re-mapped
    remap = {}
    remap['tpep_pickup_datetime'] = 'pickup_datetime'
    remap['tpep_dropoff_datetime'] = 'dropoff_datetime'
    remap['ratecodeid'] = 'rate_code'

    #create a list of columns & dtypes the df must have
    must_haves = {
     'pickup_datetime': 'datetime64[ms]',
     'dropoff_datetime': 'datetime64[ms]',
     'passenger_count': 'int32',
     'trip_distance': 'float32',
     'pickup_longitude': 'float32',
     'pickup_latitude': 'float32',
     'rate_code': 'int32',
     'dropoff_longitude': 'float32',
     'dropoff_latitude': 'float32',
     'fare_amount': 'float32'
    }
    
    df = cudf.read_csv(path)
    return clean(df, remap, must_haves)

paths = [os.path.join(run.get_metrics()["data"], "nyctaxi/") + filename for filename in filenames]
data_paths = client.scatter(paths)
dfs = [client.submit(read_csv, data_path) for data_path in data_paths]

In [15]:
import dask_cudf

taxi_df = dask_cudf.from_delayed(dfs)

ERROR - Call to cuInit results in CUDA_ERROR_NO_DEVICE
distributed.protocol.pickle - INFO - Failed to deserialize b"\x80\x04\x95\xa0\x06\x00\x00\x00\x00\x00\x00\x8c\x18cudf.dataframe.dataframe\x94\x8c\tDataFrame\x94\x93\x94)\x81\x94}\x94(\x8c\x06_index\x94\x8c\x14cudf.dataframe.index\x94\x8c\nRangeIndex\x94\x93\x94)\x81\x94}\x94(\x8c\x06_start\x94K\x00\x8c\x05_stop\x94K\x00\x8c\x04name\x94N\x8c\x0e_cached_values\x94Nub\x8c\x05_size\x94K\x00\x8c\x05_cols\x94\x8c\x0bcollections\x94\x8c\x0bOrderedDict\x94\x93\x94)R\x94(\x8c\x0fpickup_datetime\x94\x8c\x15cudf.dataframe.series\x94\x8c\x06Series\x94\x93\x94)\x81\x94}\x94(\x8c\x07_column\x94\x8c\x18cudf.dataframe.numerical\x94\x8c\x0fNumericalColumn\x94\x93\x94)\x81\x94}\x94(\x8c\x05_data\x94\x8c\x15cudf.dataframe.buffer\x94\x8c\x06Buffer\x94\x93\x94\x8c\x15numpy.core.multiarray\x94\x8c\x0c_reconstruct\x94\x93\x94\x8c\x05numpy\x94\x8c\x07ndarray\x94\x93\x94K\x00\x85\x94C\x01b\x94\x87\x94R\x94(K\x01K\x00\x85\x94h(\x8c\x05dtype\x94\x93\x94\x8c\

CudaSupportError: Error at driver init: 
[100] Call to cuInit results in CUDA_ERROR_NO_DEVICE:

In [None]:
import numpy as np
import numba, xgboost, socket
import dask, dask_cudf
from dask.distributed import Client, wait

In [None]:
taxi_df.columns

In [None]:
# apply a list of filter conditions to throw out records with missing or outlier values
query_frags = [
    'fare_amount > 0 and fare_amount < 500',
    'passenger_count > 0 and passenger_count < 6',
    'pickup_longitude > -75 and pickup_longitude < -73',
    'dropoff_longitude > -75 and dropoff_longitude < -73',
    'pickup_latitude > 40 and pickup_latitude < 42',
    'dropoff_latitude > 40 and dropoff_latitude < 42'
]
taxi_df = taxi_df.query(' and '.join(query_frags))

# inspect the results of cleaning
taxi_df.head().to_pandas()

In [None]:
import math
from math import cos, sin, asin, sqrt, pi
import numpy as np

def haversine_distance_kernel(pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude, h_distance):
    for i, (x_1, y_1, x_2, y_2) in enumerate(zip(pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude)):
        x_1 = pi/180 * x_1
        y_1 = pi/180 * y_1
        x_2 = pi/180 * x_2
        y_2 = pi/180 * y_2
        
        dlon = y_2 - y_1
        dlat = x_2 - x_1
        a = sin(dlat/2)**2 + cos(x_1) * cos(x_2) * sin(dlon/2)**2
        
        c = 2 * asin(sqrt(a)) 
        r = 6371 # Radius of earth in kilometers
        
        h_distance[i] = c * r

def day_of_the_week_kernel(day, month, year, day_of_week):
    for i, (d_1, m_1, y_1) in enumerate(zip(day, month, year)):
        if month[i] <3:
            shift = month[i]
        else:
            shift = 0
        Y = year[i] - (month[i] < 3)
        y = Y - 2000
        c = 20
        d = day[i]
        m = month[i] + shift + 1
        day_of_week[i] = (d + math.floor(m*2.6) + y + (y//4) + (c//4) -2*c)%7
        
def add_features(df):
    df['hour'] = df['pickup_datetime'].dt.hour
    df['year'] = df['pickup_datetime'].dt.year
    df['month'] = df['pickup_datetime'].dt.month
    df['day'] = df['pickup_datetime'].dt.day
    df['diff'] = df['dropoff_datetime'].astype('int32') - df['pickup_datetime'].astype('int32')
    
    df['pickup_latitude_r'] = df['pickup_latitude']//.01*.01
    df['pickup_longitude_r'] = df['pickup_longitude']//.01*.01
    df['dropoff_latitude_r'] = df['dropoff_latitude']//.01*.01
    df['dropoff_longitude_r'] = df['dropoff_longitude']//.01*.01
    
    df = df.drop('pickup_datetime')
    df = df.drop('dropoff_datetime')
    
    
    df = df.apply_rows(haversine_distance_kernel,
                   incols=['pickup_latitude', 'pickup_longitude', 'dropoff_latitude', 'dropoff_longitude'],
                   outcols=dict(h_distance=np.float32),
                   kwargs=dict())
    
    
    df = df.apply_rows(day_of_the_week_kernel,
                      incols=['day', 'month', 'year'],
                      outcols=dict(day_of_week=np.float32),
                      kwargs=dict())
    
    
    df['is_weekend'] = (df['day_of_week']<2)
    return df

In [None]:
# actually add the features
taxi_df = taxi_df.map_partitions(add_features).persist()
# inspect the result
taxi_df.head().to_pandas()

In [None]:
%matplotlib inline
taxi_df.groupby('hour').fare_amount.mean().compute().to_pandas().sort_index().plot();

In [None]:
%%time
X_train = taxi_df.query('day < 25').persist()

# create a Y_train ddf with just the target variable
Y_train = X_train[['fare_amount']].persist()
# drop the target variable from the training ddf
X_train = X_train[X_train.columns.difference(['fare_amount'])]

# this wont return until all data is in GPU memory
done = wait([X_train, Y_train])

## Notes on training with XGBoost with Azure

* Because Dask-XGBoost parses the `client` for the raw IP address, it passes `"localhost"` to RABIT if the `client` was configured to use `"localhost"` with SSH forwarding. This means Dask-XGBoost, as it exists, does not support Azure with this method.
* There are several bugs and issues with the Dask submodule of XGBoost:
    1. Data co-locality is not enforced (labels and data may not be on the same worker)
    2. Data locality is not enforced (a data partition, x, may not be assigned to the worker, n, upon which it resides originally ... so, data may need to be shuffled

The latter (Dask submodule of XGBoost) is being fixed in this PR: https://github.com/dmlc/xgboost/pull/4819

This means the code below (Dask submodule of XGBoost) will not work, and replacing the call with Dask-XGBoost will not work.

In [None]:
params = {
  'num_rounds':   100,
  'max_depth':    8,
  'max_leaves':   2**8,
  'tree_method':  'gpu_hist',
  'objective':    'reg:squarederror',
  'grow_policy':  'lossguide'
}

bst = dask_xgboost.train(client, params, X_train, Y_train, num_boost_round=params['num_rounds'])

In [16]:
run.cancel()

distributed.client - ERROR - Failed to reconnect to scheduler after 10.00 seconds, closing client
distributed.utils - ERROR - 
Traceback (most recent call last):
  File "/data/anaconda/envs/rapids-0.9/lib/python3.7/site-packages/distributed/utils.py", line 666, in log_errors
    yield
  File "/data/anaconda/envs/rapids-0.9/lib/python3.7/site-packages/distributed/client.py", line 1268, in _close
    await gen.with_timeout(timedelta(seconds=2), list(coroutines))
concurrent.futures._base.CancelledError
distributed.utils - ERROR - 
Traceback (most recent call last):
  File "/data/anaconda/envs/rapids-0.9/lib/python3.7/site-packages/distributed/utils.py", line 666, in log_errors
    yield
  File "/data/anaconda/envs/rapids-0.9/lib/python3.7/site-packages/distributed/client.py", line 998, in _reconnect
    await self._close()
  File "/data/anaconda/envs/rapids-0.9/lib/python3.7/site-packages/distributed/client.py", line 1268, in _close
    await gen.with_timeout(timedelta(seconds=2), list(co