# <span style="color:#8735fb; font-size:24pt"> Multi Node Multi-GPU example on Azure using XGBoost and dask-cloudprovider </span>

[Dask Cloud Provider](https://cloudprovider.dask.org/en/latest/) is a native cloud intergration library for Dask. It helps manage Dask clusters on different cloud platforms. In this notebook, we will look at how we can use this package to set-up an Azure cluster and run a multi-node, multi-GPU example with [RAPIDS](https://rapids.ai/). RAPIDS provides a suite of libraries to accelerate data science pipelines on the GPU entirely. This can be scaled to multiple nodes using Dask as we will see in this notebook. 

For the purposes of this demo, we will use a part of the NYC Taxi Dataset (only the files of 2014 calendar year will be used here). The goal is to predict the fare amount for a given trip given the times and coordinates of the taxi trip. We will download the data from [Azure Open Datasets](https://azure.microsoft.com/en-us/services/open-datasets/), where the dataset is publicly hosted by Microsoft.

## <span style="color:#8735fb; font-size:22pt"> Step -1: Set up Azure credentials and cli. </span>

Before running the notebook, run the following commands in the terminal to setup Azure CLI
```
curl -sL https://aka.ms/InstallAzureCLIDeb | sudo bash
az login
```
Then, follow the instructions on the prompt to finish setting up the account. If you are running the notebook from inside a Docker container, you can remove `sudo`.

### <span style="color:#8735fb; font-size:22pt"> Step 0: Import necessary packages. </span>

In [None]:
## Uncomment the following and install some libraries at the beginning. 
# # If azureml is not present install azureml-core. 
# # Further opendatasets is in preview mode hence it is not available in the main sdk and needs to be installed separately.
# # Installing azureml-opendatasets with for some weird reason installs an old version of Pandas 1.0.0, which we will upgrade to 1.2.4 to work with RAPIDS. 
# # We do the same for the numpy and scipy libraries that are also downgraded by azureml-opendatasets.
# ! pip install "dask-cloudprovider[azure]"
# ! pip install "dask-cloudprovider[azure]" --upgrade
# ! pip install azureml-core
# ! pip install azureml-opendatasets
# ! pip install azureml-telemetry
# ! pip install pandas==1.2.4 # reverting pandas to 1.2.4
# ! pip install numpy==1.20.2 # reverting numpy to 1.20.2
# ! pip install scipy==1.6.0 # reverting scipy to 1.16.0

In [None]:
from dask.distributed import Client, wait, get_worker
from dask_cloudprovider.azure import AzureVMCluster
import dask_cudf
from azureml.opendatasets import NycTlcYellow
from dask_ml.model_selection import train_test_split
from cuml.dask.common import utils as dask_utils
from cuml.metrics import mean_squared_error
from cuml import ForestInference
import cudf
import xgboost as xgb
from datetime import datetime
from dateutil import parser
import numpy as np
from timeit import default_timer as timer
import dask

## <span style="color:#8735fb; font-size:22pt"> Step 1: Set up the Azure VM Cluster </span>

We will now set up an [Azure VM Cluster](https://cloudprovider.dask.org/en/latest/azure.html) using `AzureVMCluster` from Dask Cloud Provider, which creates a cluster of VMs to run our workload. To do this, you will first need to set up a Resource Group, a Virtual Network and a Security Group on Azure. [Learn more about how you can set this up](https://cloudprovider.dask.org/en/latest/azure.html#resource-groups). Note that you can also set it up using the Azure portal directly via the GUI.

Once you have set it up, you can now plug in the names of the entities you have created in the cell below. Finally note that we use the RAPIDS docker image to build the VM and use the `dask_cuda.CUDAWorker` to run within the VM. This will create the worker docker image with cuda capabilities.

First create a custom vm image with the newest kernel installed in it. You can use this guide: https://github.com/MicrosoftDocs/azure-docs/blob/master/articles/virtual-machines/linux/tutorial-custom-images.md . Unless you use the VM with newest kernel, creating an Azure VM will install the newest kernel along with the NVIDIA drivers. Then the VM won't work without restarting the machine. Further, if you restart the machine, then the system would not work either. 

In [None]:
dask.config.set({"logging.kubernetes": "info",
                 "logging.distributed": "info",
                "cloudprovider.azure.azurevm.vm_image":{"id":<id of the custom image version>}})
config = dask.config.get("cloudprovider.azure.azurevm", {})
config

In [None]:
location = "some location"
resource_group = "some resource group"
vnet = "some vnet"
security_group = "some security group"

vm_size = "Standard_NC12s_v3"
docker_image = "rapidsai/rapidsai:cuda11.2-runtime-ubuntu18.04-py3.8"
worker_class = "dask_cuda.CUDAWorker"
docker_args = '--shm-size=256m'
worker_options = {'rmm-managed-memory':True}
vm_image = {"id":<id of the custom image version>}

In [None]:
vm_image

The next step may take several minutes to start the VMs and download the docker images. Sit tight!

In [None]:
print(AzureVMCluster.get_cloud_init(
resource_group=resource_group,
location = location,
vnet=vnet,
security_group=security_group,
n_workers=0,
vm_size=vm_size,
docker_image=docker_image,
docker_args=docker_args,
auto_shutdown=False,
security=True,
vm_image=vm_image,
worker_class="dask_cuda.CUDAWorker"))


In [None]:
%%time

cluster = AzureVMCluster(
    location=location,
    resource_group=resource_group,
    vnet=vnet,
    security_group=security_group,
    vm_size=vm_size,
    docker_image=docker_image,
    worker_class=worker_class,
    n_workers=0,
    security=True,
    docker_args=docker_args,
    worker_options=worker_options,
    debug=True,
    vm_image=vm_image
#    env_vars = env_vars
)

In [None]:
client = Client(cluster) 
client

In [None]:
def scale_workers(client, n_workers, timeout=300):
    import time
    client.cluster.scale(n_workers)
    m = len(client.has_what().keys())    
    start = end = time.perf_counter_ns()
    while ((m < n_workers) and (((end - start) / 1e9) < timeout) ):
        time.sleep(5)
        m = len(client.has_what().keys())
        
        end = time.perf_counter_ns()
        
    if (((end - start) / 1e9) >= timeout):
        raise RuntimeError(f"Failed to rescale cluster in {timeout} sec."
              "Try increasing timeout for very large containers, and verify available compute resources.")

In [None]:
%%time
scale_workers(client, 2, timeout=600)

In [None]:
def pretty_print(scheduler_dict):
    print(f"All workers for scheduler id: {scheduler_dict['id']}, address: {scheduler_dict['address']}")
    for worker in scheduler_dict['workers']:
        print(f"Worker: {worker} , gpu_machines: {scheduler_dict['workers'][worker]['gpu']}")

pretty_print(client.scheduler_info()) # will show information on the len(CUDA_VISIBLE_DEVICES) partitions

## <span style="color:#8735fb; font-size:22pt"> Step 2: Data Setup, Cleanup and Enhancement </span>

### <span style="color:#8735fb; font-size:18pt"> Step 2.a: Set the path for downloading the data from Azureml Opendatasets </span>

In [None]:
tic = timer()    
start_date = parser.parse('2014-05-01') # lets start at 1st May 2014
end_date = parser.parse('2014-05-31') # Lets stop at 31st May 2014
nyc_tlc = NycTlcYellow(start_date=start_date, end_date=end_date)
nyc_tlc_df = nyc_tlc.to_pandas_dataframe()
toc = timer()
print(f"Wall clock time taken for this cell : {toc-tic} s")

In [None]:
print(nyc_tlc_df.shape) # Apprx. 14 Million rows
print(type(nyc_tlc_df))
print(nyc_tlc_df.head()) 
# since we are going to send the data to the server lets use the first 10million rows
nyc_tlc_df = nyc_tlc_df[:10000000]
print(nyc_tlc_df.shape)

Let's look at the data locally to see what we're dealing with. We will make use of the data from 2014 for the purposes of the demo. We see that there are columns for pickup and dropoff times, distance, along with latitude, longitude, etc. These are the information we'll use to estimate the trip fare amount.

### <span style="color:#8735fb; font-size:18pt"> Step 2.b: Data Cleanup, Enhancement and Persisting Scripts </span>

The data needs to be cleaned up first. We remove some columns that we are not interested in. We also define the datatypes each of the columns need to be read as.

We also add some new features to our dataframe via some custom functions, namely: 
1. Haversine distance: This is used for calculating the total trip distance.

2. Day of the week: This can be useful information for determining the fare cost.

`add_features` function combined the two to produce a new dataframe that has the added features.

**NOTE**: We will also persist the test dataset in the workers. If the X_infer i.e. the test dataset is small enough, we can call compute() on it to bring the test dataset to the local machine and then perform predict on it. But in general, if the X_infer is large, it may not fit in the GPU(s) of the local machine. Moreover, moving around a large amount of data will also add to the prediction latency. Therefore it is better to persist the test dataset on the dask workers, and then call the predict functionality on the individual workers. Finally we collect the prediction results from the dask workers.

#### Adding features functions

In [None]:
import math
from math import cos, sin, asin, sqrt, pi

def haversine_distance_kernel(pickup_latitude_r, pickup_longitude_r, dropoff_latitude_r, dropoff_longitude_r, h_distance, radius):
    for i, (x_1, y_1, x_2, y_2) in enumerate(zip(pickup_latitude_r, pickup_longitude_r, dropoff_latitude_r, dropoff_longitude_r,)):
        x_1 = pi/180 * x_1
        y_1 = pi/180 * y_1
        x_2 = pi/180 * x_2
        y_2 = pi/180 * y_2
        
        dlon = y_2 - y_1
        dlat = x_2 - x_1
        a = sin(dlat/2)**2 + cos(x_1) * cos(x_2) * sin(dlon/2)**2
        
        c = 2 * asin(sqrt(a)) 
        # radius = 6371 # Radius of earth in kilometers # currently passed as input arguments
        
        h_distance[i] = c * radius

def day_of_the_week_kernel(day, month, year, day_of_week):
    for i, (d_1, m_1, y_1) in enumerate(zip(day, month, year)):
        if month[i] <3:
            shift = month[i]
        else:
            shift = 0
        Y = year[i] - (month[i] < 3)
        y = Y - 2000
        c = 20
        d = day[i]
        m = month[i] + shift + 1
        day_of_week[i] = (d + math.floor(m*2.6) + y + (y//4) + (c//4) -2*c)%7
        
def add_features(df):
    df['hour'] = df['tpepPickupDateTime'].dt.hour
    df['year'] = df['tpepPickupDateTime'].dt.year
    df['month'] = df['tpepPickupDateTime'].dt.month
    df['day'] = df['tpepPickupDateTime'].dt.day
    df['diff'] = (df['tpepPickupDateTime'] - df['tpepPickupDateTime']).dt.seconds #convert difference between pickup and dropoff into seconds
    
    df['pickup_latitude_r'] = df['startLat']//.01*.01
    df['pickup_longitude_r'] = df['startLon']//.01*.01
    df['dropoff_latitude_r'] = df['endLat']//.01*.01
    df['dropoff_longitude_r'] = df['endLon']//.01*.01
    
    df = df.drop('tpepDropoffDateTime', axis=1)
    df = df.drop('tpepPickupDateTime', axis =1)
    
    
    df = df.apply_rows(haversine_distance_kernel,
                   incols=['pickup_latitude_r', 'pickup_longitude_r', 'dropoff_latitude_r', 'dropoff_longitude_r'],
                   outcols=dict(h_distance=np.float32),
                   kwargs=dict(radius=6371))
    
    
    df = df.apply_rows(day_of_the_week_kernel,
                      incols=['day', 'month', 'year'],
                      outcols=dict(day_of_week=np.float32),
                      kwargs=dict())
    
    
    df['is_weekend'] = (df['day_of_week']<2)
    return df

#### Functions for cleaning and persisting the data in the workers.

In [None]:
def persist_train_infer_split(client, df, response_dtype, response_id, infer_frac=1.0, random_state=42, shuffle=True):
    workers = client.has_what().keys()
    X, y = df.drop([response_id], axis=1), df[response_id].astype('float32')
    infer_frac = max(0, min(infer_frac, 1.0))
    X_train, X_infer, y_train, y_infer = train_test_split(X, y, shuffle=True, random_state=random_state, test_size=infer_frac)
    
    with dask.annotate(workers=set(workers)):
        X_train, y_train = client.persist(
            collections=[X_train, y_train]) 
    
    if (infer_frac != 1.0):
        with dask.annotate(workers=set(workers)):
            X_infer, y_infer = client.persist(
                collections=[X_infer, y_infer])

        wait([X_train, y_train, X_infer, y_infer])
    else:
        X_infer = X_train
        y_infer = y_train

        wait([X_train, y_train])
    
    return X_train, y_train, X_infer, y_infer


def clean(df_part, must_haves):
    """
    This function performs the various clean up tasks for the data
    and returns the cleaned dataframe.
    """    
    # iterate through columns in this df partition
    for col in df_part.columns:
        # drop anything not in our expected list
        if col not in must_haves:
            df_part = df_part.drop(col, axis=1)
            continue

        # fixes datetime error found by Ty Mckercher and fixed by Paul Mahler
        if df_part[col].dtype == 'object' and col in ['tpepPickupDateTime', 'tpepDropoffDateTime']:
            df_part[col] = df_part[col].astype('datetime64[ms]')
            continue

        # if column was read as a string, recast as float
        if df_part[col].dtype == 'object':
            df_part[col] = df_part[col].str.fillna('-1')
            df_part[col] = df_part[col].astype('float32')
        else:
            # downcast from 64bit to 32bit types
            # Tesla T4 are faster on 32bit ops
            if 'int' in str(df_part[col].dtype):
                df_part[col] = df_part[col].astype('int32')
            if 'float' in str(df_part[col].dtype):
                df_part[col] = df_part[col].astype('float32')
            df_part[col] = df_part[col].fillna(-1)
            
    return df_part

    
def taxi_data_loader(client, nyc_tlc_df, response_dtype=np.float32, infer_frac=1.0, random_state=0):
    #create a list of columns & dtypes the df must have
    must_haves = {
     'tpepPickupDateTime': 'datetime64[ms]',
     'tpepDropoffDateTime': 'datetime64[ms]',
     'passengerCount': 'int32',
     'tripDistance': 'float32',
     'startLon': 'float32',
     'startLat': 'float32',
     'rateCodeId': 'int32',
     'endLon': 'float32',
     'endLat': 'float32',
     'fareAmount': 'float32'
    }

    workers = client.has_what().keys()
    response_id = 'fareAmount'

    taxi_data = dask_cudf.from_cudf(cudf.from_pandas(nyc_tlc_df), npartitions=len(workers))
    taxi_data = clean(taxi_data, must_haves)
    taxi_data = taxi_data.map_partitions(add_features)
    # Drop NaN values and convert to float32
    taxi_data = taxi_data.dropna()
    fields = ['passengerCount', 'tripDistance', 'startLon', 'startLat', 'rateCodeId',
                 'endLon', 'endLat', 'fareAmount', 'diff', 'h_distance', 'day_of_week', 'is_weekend']
    taxi_data = taxi_data.astype("float32")
    taxi_data = taxi_data[fields]
    taxi_data = taxi_data.reset_index()
    
    return persist_train_infer_split(client, taxi_data, response_dtype, response_id, infer_frac, random_state)

### <span style="color:#8735fb; font-size:18pt"> Step 2.c: Get the Split Data and persist across workers </span>

It takes a bit of time since the data has to be distributed across multiple nodes.

In [None]:
client.restart()
tic = timer()
X_train, y_train, X_infer, y_infer = taxi_data_loader(client, nyc_tlc_df, infer_frac=0.1, random_state=42)
toc = timer()
print(f"Wall clock time taken for ETL and persisting : {toc-tic} s")

In [None]:
X_train.head()

## <span style="color:#8735fb; font-size:22pt"> Step 3: Train a XGBoost Model </span>

We are now ready to train a XGBoost model on the data and then predict the fare for each trip.

Before we start the training process, let us take a quick look at the details of the GPUs in the worker pods that we will be using.

In [None]:
pretty_print(client.scheduler_info()) # will show information on the len(CUDA_VISIBLE_DEVICES) partitions

### <span style="color:#8735fb; font-size:18pt"> Step 3.a: Set training Parameters </span>

In this training example, we will use RMSE as the evaluation metric. It is also worth noting that performing HPO will lead to a set of more optimal hyperparameters.

Refer to the notebook [HPO-RAPIDS](./HPO-RAPIDS.ipynb) in this repository for how to perform HPO on Azure.

In [None]:
params = {
    'learning_rate': 0.15,
    'max_depth': 8,
    'objective': 'reg:squarederror',
    'subsample': 0.7,
    'colsample_bytree': 0.7,
    'min_child_weight': 1,
    'gamma': 1,
    'silent': True,
    'verbose_eval': True,
    'booster' : 'gbtree', # 'gblinear' not implemented in dask
    'debug_synchronize': True,
    'eval_metric': 'rmse',
    'tree_method':'gpu_hist',
    'num_boost_rounds': 100,
}

### <span style="color:#8735fb; font-size:18pt"> Step 3.b: Train XGBoost Model </span>

Since the data is already persisted in the dask workers in the Kubernetes Cluster, the next steps should not take a lot of time.

In [None]:
data_train = xgb.dask.DaskDMatrix(client, X_train, y_train)
tic = timer()
xgboost_output = xgb.dask.train(client, params,data_train, 
                                    num_boost_round=params['num_boost_rounds'])
xgb_cuda_model = xgboost_output['booster']
toc = timer()
print(f"Wall clock time taken for this cell : {toc-tic} s")

### <span style="color:#8735fb; font-size:18pt"> Step 3.c: Save the trained model to disk locally </span>

In [None]:
model_filename = 'trained-model_nyctaxi.xgb'
xgb_cuda_model.save_model(model_filename)

## <span style="color:#8735fb; font-size:22pt"> Step 4: Predict & Score using vanilla XGBoost Predict </span>

Here we will use the predict and inplace_predict methods provided by the xgboost.dask library, out of the box. Later we will also use [Forest Inference Library (FIL)](https://docs.rapids.ai/api/cuml/stable/api.html?highlight=forestinference#cuml.ForestInference) to perform prediction.

In [None]:
_y_test = y_infer.compute()
wait(_y_test)

In [None]:
d_test = xgb.dask.DaskDMatrix(client, X_infer)
tic = timer()
y_pred = xgb.dask.predict(client, xgb_cuda_model, d_test)
y_pred= y_pred.compute()
wait(y_pred)
toc = timer()
print(f"Wall clock time taken for xgb.dask.predict : {toc-tic} s")

#### Inference with the inplace predict method of dask XGBoost

In [None]:
tic = timer()
y_pred = xgb.dask.inplace_predict(client, xgb_cuda_model, X_infer)
y_pred = y_pred.compute()
wait(y_pred)
toc = timer()
print(f"Wall clock time taken for inplace inference : {toc-tic} s")

In [None]:
tic = timer()
print("Calculating MSE")
score = mean_squared_error(y_pred, _y_test)
print("Workflow Complete - RMSE: ", np.sqrt(score))
toc = timer()
print(f"Wall clock time taken for this cell : {toc-tic} s")

## <span style="color:#8735fb; font-size:22pt"> Step 5: Predict & Score using FIL or Forest Inference Library </span>

[Forest Inference Library (FIL)](https://docs.rapids.ai/api/cuml/stable/api.html?highlight=forestinference#cuml.ForestInference) provides GPU accelerated inference capabilities for tree models. We will import the FIL functionality from cuML library.

It accepts a trained tree model in a treelite format (currently LightGBM, XGBoost and SKLearn GBDT and random forest models are supported). In general, using FIL allows for faster inference while using a large number of workers, and the latency benefits are more pronounced as the size of the dataset grows large.

In [None]:
from cuml import ForestInference
from dask.distributed import get_worker

Here if the `X_test` is small enough we can call `compute()` on it. But in general, if the `X_test` is quite large, it is better to persist it on the different workers and then call the predict on the individual workers separately. We we show both. 

### <span style="color:#8735fb; font-size:18pt"> Step 5.a: Predict using `compute` on a single worker in case the test dataset is small. </span>

In [None]:
tic = timer()
X_test_computed = X_infer.compute()
wait(X_test_computed)
loaded_model = ForestInference.load(model_filename, model_type='xgboost')
toc = timer()
print(f"Wall clock time taken for this cell : {toc-tic} s")

In [None]:
tic = timer()
fil_pred = loaded_model.predict(X_test_computed)
print("Final - RMSE: ", np.sqrt(score))
toc = timer()
print(f"Wall clock time taken for this cell : {toc-tic} s")

In [None]:
tic=timer()
score = mean_squared_error(fil_pred, _y_test)
print("Final - RMSE: ", np.sqrt(score))
toc = timer()
print(f"Wall clock time taken for this cell : {toc-tic} s")

Compare the RMSE results with that obtained without using FIL. You would see they are approximately same.

### <span style="color:#8735fb; font-size:18pt"> Step 5.b: Predict using `persist` on multiple workers in case the test dataset is huge. </span>

As noted in Step 2.b, in case the test dataset is huge, it makes sense to call predict individually on the dask workers instead of bringing the entire test dataset to the local machine.

To perform prediction individually on the dask workers, each dask worker needs to load the XGB model using FIL. However, the dask workers are remote and do not have access to the locally saved model. Hence we need to send the locally saved XGB model to the dask workers.

In [None]:
workers = client.has_what().keys()
print(workers)
n_workers = len(workers)
n_partitions = n_workers

In [None]:
def unzipFile(zipname):
    worker = get_worker()
    import zipfile
    import os
    with zipfile.ZipFile(os.path.join(worker.local_directory, zipname)) as zf:
        zf.extractall(worker.local_directory)

def checkOrMakeLocalDir():
    worker = get_worker()
    import os
    if not os.path.exists(worker.local_directory):
        os.makedirs(worker.local_directory)
    
def workerModelInit(model_file):   
    # this function will run in each worker and initialize the worker 
    import os
    worker = get_worker()
    worker.data["fil_model"] = ForestInference.load(filename=os.path.join(worker.local_directory, model_file),model_type='xgboost')
    
def predict(input_df):
    # this function will run in each worker and predict 
    worker = get_worker()
    return worker.data["fil_model"].predict(input_df)

def persistModelinWorkers(client, zip_file_name, model_file_name):
    import zipfile
    zf = zipfile.ZipFile(zip_file_name, mode='w')
    zf.write(f"./{model_file_name}")
    zf.close()
    # check to see if local directory present in workers
    # if not present make it
    fut = client.run(checkOrMakeLocalDir)
    wait(fut)
    # upload the zip file in workers
    fut = client.upload_file(f"./{zip_file_name}")
    wait(fut)
    # unzip file in the workers
    fut = client.run(unzipFile, zip_file_name)
    wait(fut)
    # load model using FIL in workers
    fut = client.run(workerModelInit, model_file_name)
    wait(fut)
    
    

#### Persist the local model in the remote dask workers

In [None]:
%%time
persistModelinWorkers(client, "zipfile_write.zip", "trained-model_nyctaxi.xgb")

#### Inference with distributed predict with FIL

In [None]:
tic = timer()
predictions = X_infer.map_partitions(predict, meta="float") # this is like MPI reduce
y_pred = predictions.compute()
wait(y_pred)
toc = timer()
print(f"Wall clock time taken for this cell : {toc-tic} s")

In [None]:
rows_csv = X_infer.iloc[:,0].shape[0].compute()
print(f"It took {toc-tic} seconds to predict on {rows_csv} rows using FIL distributedly on each worker")

In [None]:
tic = timer()
score = mean_squared_error(y_pred, _y_test)
toc = timer()
print("Final - RMSE: ", np.sqrt(score))

## <span style="color:#8735fb; font-size:22pt"> Step 6: Clean up </span>

In [None]:
client.close()
cluster.close()

https://distributed.dask.org/en/latest/limitations.html