# NVIDIA RAPIDS on Azure ML
## GTC 2020 DLI WORKSHOP

In this notebook we use NYC Taxi dataset to showcase how to scale computations to multiple nodes on Azure ML using Dask. We will also estimate a large XGBoost model.

**AUTHORS**
* Tom Drabas (Microsoft)
* Manuel Reyes Gomez (NVIDIA)

**GREATER TEAM**
* Joshua Patterson (NVIDIA)
* Keith Kraus (NVIDIA)
* Brad Rees (NVIDIA)
* John Zedlewski (NVIDIA)
* Paul Mahler (NVIDIA)
* Nick Becker (NVIDIA)
* Michael Beaumont (NVIDIA)
* Chau Dang (NVIDIA)


# Import modules

In [1]:
import dask
import dask_cudf
import cudf
import cuspatial
import os
import socket
import dask_xgboost as dxgb
import distributed
import pickle
import glob
from collections import OrderedDict

from dask.delayed import delayed

from azureml.core import Run

# Setup Dask cluster and communications

In [2]:
print("Setting dask settings...")
dask.config.set({'distributed.scheduler.work-stealing': False})
dask.config.set({'distributed.scheduler.bandwidth': 20})
print("Changes to dask settings")
print("-> Setting work-stealing to ", dask.config.get('distributed.scheduler.work-stealing'))
print("-> Setting scheduler bandwidth to ", dask.config.get('distributed.scheduler.bandwidth'))
print("Settings updates complete")

Setting dask settings...
Changes to dask settings
-> Setting work-stealing to  False
-> Setting scheduler bandwidth to  20
Settings updates complete


In [3]:
run = Run.get_context()
ip = socket.gethostbyname(socket.gethostname())
scheduler = run.get_metrics()["scheduler"]
client = distributed.Client(scheduler)
client.restart()

0,1
Client  Scheduler: tcp://10.2.0.54:8786  Dashboard: http://10.2.0.54:8787/status,Cluster  Workers: 2  Cores: 2  Memory: 0 B


In [4]:
!nvidia-smi

Thu Apr  2 06:02:07 2020       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 440.33.01    Driver Version: 440.33.01    CUDA Version: 10.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|   0  Tesla P40           On   | 000017BD:00:00.0 Off |                    0 |
| N/A   20C    P0    50W / 250W |    581MiB / 22919MiB |      0%      Default |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Processes:                                                       GPU Memory |
|  GPU       PID   Type   Process name                             Usage      |
+-------

# Read, clean and featurize data

In [5]:
STORAGE_OPTIONS = {
    'account_name': run.experiment.workspace.datastores['datafileshare'].account_name,
    'account_key' : run.experiment.workspace.datastores['datafileshare'].account_key
}

protocol  = 'abfs'      # change to 'adl' for gen 1
container = 'datafilestore'

In [9]:
data_path = '../../../../../../../datafileshare'
datafiles = glob.glob(data_path + '/data/nyctaxi/2016/yellow_tripdata_2016-0[1-3].csv')
datafiles = ['/'.join(f.split('/')[8:]) for f in datafiles[0:8]]

files = [f'{protocol}://{container}/{f}' for f in datafiles]
files

['abfs://datafilestore/data/nyctaxi/2016/yellow_tripdata_2016-01.csv',
 'abfs://datafilestore/data/nyctaxi/2016/yellow_tripdata_2016-02.csv',
 'abfs://datafilestore/data/nyctaxi/2016/yellow_tripdata_2016-03.csv']

In [10]:
# list of column names that need to be re-mapped
remap = {}
remap['tpep_pickup_datetime'] = 'pickup_datetime'
remap['tpep_dropoff_datetime'] = 'dropoff_datetime'
remap['ratecodeid'] = 'rate_code'

#create a list of columns & dtypes the df must have
must_haves = {
    'pickup_datetime': 'datetime64[ms]',
    'dropoff_datetime': 'datetime64[ms]',
    'passenger_count': 'int32',
    'trip_distance': 'float32',
    'pickup_longitude': 'float32',
    'pickup_latitude': 'float32',
    'rate_code': 'int32',
    'dropoff_longitude': 'float32',
    'dropoff_latitude': 'float32',
    'fare_amount': 'float32'
}

def run_etl(filename, remap, must_haves):
    print(os.getcwd())
    return cudf.read_csv(filename).map_partitions(clean, remap, must_haves)

# helper function which takes a DataFrame partition
def clean(df_part, remap, must_haves):    
    # some col-names include pre-pended spaces remove & lowercase column names
    tmp = {col:col.strip().lower() for col in list(df_part.columns)}
    df_part = df_part.rename(tmp)
    
    # rename using the supplied mapping
    df_part = df_part.rename(remap)
    
    # iterate through columns in this df partition
    for col in df_part.columns:
        # drop anything not in our expected list
        if col not in must_haves:
            df_part = df_part.drop(col)
            continue

        if df_part[col].dtype == 'object' and col in ['pickup_datetime', 'dropoff_datetime']:
            df_part[col] = df_part[col].astype('datetime64[ms]')
            continue
            
        # if column was read as a string, recast as float
        if df_part[col].dtype == 'object':
            df_part[col] = df_part[col].str.fillna('-1')
            df_part[col] = df_part[col].astype('float32')
        else:
            # downcast from 64bit to 32bit types
            if 'int' in str(df_part[col].dtype):
                df_part[col] = df_part[col].astype('int32')
            if 'float' in str(df_part[col].dtype):
                df_part[col] = df_part[col].astype('float32')
            df_part[col] = df_part[col].fillna(-1)
    
    return df_part

In [11]:
%%time
taxi_df = (
    dask_cudf
    .read_csv(files, storage_options=STORAGE_OPTIONS)
    .repartition(npartitions=32)
    .map_partitions(clean, remap, must_haves)
    .persist()
)

print(f' Number of records: {len(taxi_df):,} '.center(80, '#'))

######################## Number of records: 34,499,859 #########################
CPU times: user 688 ms, sys: 50.6 ms, total: 738 ms
Wall time: 2min 15s


# Remove bad data

In [12]:
# apply a list of filter conditions to throw out records with missing or outlier values
query_frags = [
    'fare_amount > 0 and fare_amount < 500',
    'passenger_count > 0 and passenger_count < 6',
    'pickup_longitude > -75 and pickup_longitude < -73',
    'dropoff_longitude > -75 and dropoff_longitude < -73',
    'pickup_latitude > 40 and pickup_latitude < 42',
    'dropoff_latitude > 40 and dropoff_latitude < 42'
]
taxi_df = taxi_df.query(' and '.join(query_frags))

# Add some additional features

In [13]:
import math
from math import cos, sin, asin, sqrt, pi
import numpy as np

def haversine_distance_kernel(pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude, h_distance):
    for i, (x_1, y_1, x_2, y_2) in enumerate(zip(pickup_latitude, pickup_longitude, dropoff_latitude, dropoff_longitude)):
        x_1 = pi / 180 * x_1
        y_1 = pi / 180 * y_1
        x_2 = pi / 180 * x_2
        y_2 = pi / 180 * y_2
        
        dlon = y_2 - y_1
        dlat = x_2 - x_1
        a = sin(dlat / 2)**2 + cos(x_1) * cos(x_2) * sin(dlon / 2)**2
        
        c = 2 * asin(sqrt(a)) 
        r = 6371 # Radius of earth in kilometers
        
        h_distance[i] = c * r

def day_of_the_week_kernel(day, month, year, day_of_week):
    for i, (d_1, m_1, y_1) in enumerate(zip(day, month, year)):
        if month[i] < 3:
            shift = month[i]
        else:
            shift = 0
        Y = year[i] - (month[i] < 3)
        y = Y - 2000
        c = 20
        d = day[i]
        m = month[i] + shift + 1
        day_of_week[i] = (d + math.floor(m * 2.6) + y + (y // 4) + (c // 4) - 2 * c) % 7
        
def add_features(df):
    df['hour'] = df['pickup_datetime'].dt.hour
    df['year'] = df['pickup_datetime'].dt.year
    df['month'] = df['pickup_datetime'].dt.month
    df['day'] = df['pickup_datetime'].dt.day
    df['diff'] = df['dropoff_datetime'].astype('int32') - df['pickup_datetime'].astype('int32')
    
    #### BUG -- the below can be done on Titan V and Titan RTX but not P40 (Pascal)
#     df['pickup_latitude_r'] = df['pickup_latitude'] // .01 * .01
#     df['pickup_longitude_r'] = df['pickup_longitude'] // .01 * .01
#     df['dropoff_latitude_r'] = df['dropoff_latitude'] // .01 * .01
#     df['dropoff_longitude_r'] = df['dropoff_longitude'] // .01 * .01

    #### WORKAROUND
    df['pickup_latitude_r']   = (df['pickup_latitude']   / .01).astype('int') / 100.0
    df['pickup_longitude_r']  = (df['pickup_longitude']  / .01).astype('int') / 100.0
    df['dropoff_latitude_r']  = (df['dropoff_latitude']  / .01).astype('int') / 100.0
    df['dropoff_longitude_r'] = (df['dropoff_longitude'] / .01).astype('int') / 100.0

    
    df = df.drop('pickup_datetime')
    df = df.drop('dropoff_datetime')

    df = df.apply_rows(haversine_distance_kernel,
                       incols=['pickup_latitude', 'pickup_longitude', 'dropoff_latitude', 'dropoff_longitude'],
                       outcols=dict(h_distance=np.float32),
                       kwargs=dict())

    df = df.apply_rows(day_of_the_week_kernel,
                       incols=['day', 'month', 'year'],
                       outcols=dict(day_of_week=np.float32),
                       kwargs=dict())


    df['is_weekend'] = (df['day_of_week']<2).astype("int32")
    return df

In [14]:
%%time
# actually add the features
taxi_df = taxi_df.map_partitions(add_features).persist()
done = distributed.wait(taxi_df)

CPU times: user 2.08 s, sys: 31.5 ms, total: 2.11 s
Wall time: 22.9 s


In [15]:
%%time
print(f'Filtered number of rows: {len(taxi_df):,}')

Filtered number of rows: 32,755,964
CPU times: user 20.6 ms, sys: 567 µs, total: 21.2 ms
Wall time: 3.22 s


# Do some data science!!

In [17]:
def drop_empty_partitions(df):
    lengths = df.map_partitions(len).compute()
    nonempty = [length > 0 for length in lengths]
    return df.partitions[nonempty]

In [18]:
%%time
X_train = taxi_df.query('day < 25').persist()

# create a Y_train ddf with just the target variable
Y_train = X_train[['fare_amount']].persist()

# drop the target variable from the training ddf
X_train = X_train[X_train.columns.difference(['fare_amount'])]

# this wont return until all data is in GPU memory
done = distributed.wait([X_train, Y_train])

CPU times: user 562 ms, sys: 5.24 ms, total: 567 ms
Wall time: 1.42 s


In [20]:
print(f' Training examples: {len(X_train):,} '.center(80, '#'))

######################## Training examples: 25,895,465 #########################


In [21]:
%%time
params = {
    'max_depth'      : 3,
    'objective'      : 'reg:squarederror',
    'subsample'      : 0.5,
    'max_leaves'     : 2**4,
    'gamma'          : 1,
    'n_gpus'         : 1,
    'tree_method'    :'gpu_hist'
}

trained_model = dxgb.train(client, params, X_train, Y_train, num_boost_round=40)

CPU times: user 33.8 ms, sys: 1.14 ms, total: 35 ms
Wall time: 9.64 s


# Test the model

In [22]:
%%time
X_test = taxi_df.query('day >= 25').persist()
X_test = drop_empty_partitions(X_test)

# Create Y_test with just the fare amount
Y_test = X_test[['fare_amount']].persist()

# Drop the fare amount from X_test
X_test = X_test[X_test.columns.difference(['fare_amount'])]

CPU times: user 673 ms, sys: 21.4 ms, total: 694 ms
Wall time: 3.5 s


In [23]:
del taxi_df
print(f' Testing examples: {len(X_test):,} '.center(80, '#'))

######################### Testing examples: 6,860,499 ##########################


In [24]:
%%time
# generate predictions on the test set
Y_test['prediction'] = dxgb.predict(client, trained_model, X_test)

CPU times: user 212 ms, sys: 8.45 ms, total: 221 ms
Wall time: 228 ms


In [25]:
%%time
Y_test['squared_error'] = (Y_test['prediction'] - Y_test['fare_amount'])**2

# inspect the results to make sure our calculation looks right
Y_test.head().to_pandas()

CPU times: user 251 ms, sys: 8.33 ms, total: 259 ms
Wall time: 812 ms


In [26]:
%%time
# compute the actual RMSE over the full test set
RMSE = Y_test.squared_error.mean().compute()
print(math.sqrt(RMSE))

2.288085251263267
CPU times: user 62.1 ms, sys: 7.93 ms, total: 70.1 ms
Wall time: 5.55 s


# Save the model

In [28]:
pickle.dump(trained_model, open('../saved_models/trained_model_xgboost.mdl', 'wb'))