# Dask and cuDF working together

Converting some of the code to work with Dask

In [2]:
# Import sytem and python modules

import os
import time
import random
from pprint import pprint
import numpy as np
import cupy as cp

# Import RAPIDS specific modules

import cudf as df
import cuml
from cuml import train_test_split
from cuml.metrics.regression import r2_score as r2d2

# Import Dask specific modules
from cuml.dask.common import utils as dask_utils
from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
import dask_cudf

from cuml.dask.ensemble import RandomForestRegressor as cumlDaskRF

# Import sklearn specific modules
from sklearn.model_selection import KFold
from sklearn.inspection import permutation_importance

# Import data-visualization modules

import matplotlib.pyplot as plt

# Start Dask Cluster

In [3]:
# This will use all GPUs on the local host by default
cluster = LocalCUDACluster(threads_per_worker=1)
c = Client(cluster)

# Query the client for all connected workers
workers = c.has_what().keys()
n_workers = len(workers)
n_streams = 8 # Performance optimization

Perhaps you already have a cluster running?
Hosting the HTTP server on port 41157 instead
  http_address["port"], self.http_server.port


# Data Preprocessing

In [4]:
# Declare some globals variables and paths
FEATURES_PATH = '../data/pts_merged_final.csv'
DEPTH = 'Depth_m'
DATE = 'Date'
FID = 'FID'

TEST_SIZE = 0.2
RANDOM_STATE = 42

In [5]:
# Load everything into GPU-based DF
lakes_depth_df = df.read_csv(FEATURES_PATH)

In [6]:
# Drop unnecessary values from DF
lakes_depth_nd = lakes_depth_df.drop(['FID', 'Date'], axis = 1)
lakes_depth_nd.head(5)

Unnamed: 0,Depth_m,b1_LC8_075,b2_LC8_075,b3_LC8_075,b4_LC8_075,b5_LC8_075,b6_LC8_075,b7_LC8_075,b8_LC8_075,b9_LC8_075,...,b26_LC8_07,b27_LC8_07,b28_LC8_07,b29_LC8_07,b30_LC8_07,b31_LC8_07,b32_LC8_07,b33_LC8_07,b34_LC8_07,b35_LC8_07
0,0.63,164,271,199,42,27,16,605,824,3905,...,2625,165,100,136,643,98,59,80,381,593
1,0.672727,165,272,196,44,29,16,607,842,3750,...,2750,176,107,148,659,97,59,82,364,552
2,0.670588,154,260,193,40,32,19,592,798,3850,...,2105,208,123,166,800,123,73,98,475,594
3,0.822222,156,250,195,48,40,26,624,800,3250,...,1846,256,160,205,833,167,104,133,542,650
4,1.725,117,164,78,38,23,17,713,1500,3079,...,2235,197,140,295,605,145,104,218,447,739


In [7]:
# Inspect data for any anomolies or anything else odd-looking
lakes_depth_nd.describe()

Unnamed: 0,Depth_m,b1_LC8_075,b2_LC8_075,b3_LC8_075,b4_LC8_075,b5_LC8_075,b6_LC8_075,b7_LC8_075,b8_LC8_075,b9_LC8_075,...,b26_LC8_07,b27_LC8_07,b28_LC8_07,b29_LC8_07,b30_LC8_07,b31_LC8_07,b32_LC8_07,b33_LC8_07,b34_LC8_07,b35_LC8_07
count,23177.0,23177.0,23177.0,23177.0,23177.0,23177.0,23177.0,23177.0,23177.0,23177.0,...,23177.0,23177.0,23177.0,23177.0,23177.0,23177.0,23177.0,23177.0,23177.0,23177.0
mean,2.2214,424.288001,553.758079,427.147301,312.433188,175.490055,132.437373,558.585451,922.828753,1900.034603,...,4054.567977,473.428356,410.961686,544.808215,685.495103,332.496527,303.484877,394.607499,499.36588,768.926047
std,1.872953,755.932158,731.549284,630.296958,479.544589,309.819959,244.022402,402.546098,2850.09596,3253.968664,...,6661.9865,2223.950009,539.94768,2837.452024,879.992656,1868.261448,395.559938,2580.861184,801.365571,455.468874
min,0.25,-151.0,47.0,-39.0,-38.0,-3.0,-9.0,-3213.0,-32768.0,-32768.0,...,-32768.0,-32768.0,-8.0,-32768.0,-32000.0,-32768.0,-22.0,-32768.0,-31000.0,-9667.0
25%,1.40517,128.0,235.0,116.0,52.0,29.0,20.0,429.0,607.0,753.0,...,1413.0,126.0,88.0,130.0,391.0,87.0,63.0,94.0,265.0,651.0
50%,1.87075,189.0,324.0,224.0,118.0,51.0,37.0,621.0,933.0,1626.0,...,2126.0,246.0,155.0,271.0,661.0,173.0,105.0,189.0,453.0,734.0
75%,2.34877,262.0,441.0,379.0,337.0,164.0,120.0,789.0,1241.0,3294.0,...,3500.0,643.0,588.0,843.0,895.0,458.0,460.0,620.0,679.0,828.0
max,21.0375,5277.0,5442.0,4984.0,5370.0,2879.0,2568.0,4171.0,32767.0,32767.0,...,32767.0,32767.0,4128.0,32767.0,32767.0,32767.0,2771.0,32767.0,32767.0,32767.0


In [8]:
# Make our acutal_predictions i.e. labels and our covariates dataframes
labels = lakes_depth_nd['Depth_m']
covariates = lakes_depth_nd.drop(['Depth_m'], axis=1)

# Check to ensure everything looks good
labels.head(5)

0    0.630000
1    0.672727
2    0.670588
3    0.822222
4    1.725000
Name: Depth_m, dtype: float64

In [9]:
covariates.head(5)

Unnamed: 0,b1_LC8_075,b2_LC8_075,b3_LC8_075,b4_LC8_075,b5_LC8_075,b6_LC8_075,b7_LC8_075,b8_LC8_075,b9_LC8_075,b10_LC8_07,...,b26_LC8_07,b27_LC8_07,b28_LC8_07,b29_LC8_07,b30_LC8_07,b31_LC8_07,b32_LC8_07,b33_LC8_07,b34_LC8_07,b35_LC8_07
0,164,271,199,42,27,16,605,824,3905,6074,...,2625,165,100,136,643,98,59,80,381,593
1,165,272,196,44,29,16,607,842,3750,5690,...,2750,176,107,148,659,97,59,82,364,552
2,154,260,193,40,32,19,592,798,3850,4813,...,2105,208,123,166,800,123,73,98,475,594
3,156,250,195,48,40,26,624,800,3250,3900,...,1846,256,160,205,833,167,104,133,542,650
4,117,164,78,38,23,17,713,1500,3079,5087,...,2235,197,140,295,605,145,104,218,447,739


In [10]:
# Make sure we change all our covariate and label data to float32

labels = labels.astype(cp.float32)
covariates = covariates.astype(cp.float32)

In [11]:
cv_train, cv_test, labels_train, labels_test = train_test_split(covariates, labels,
                                                               test_size=TEST_SIZE, random_state=RANDOM_STATE)

In [12]:
# Ensure we have the right size and shapes on our split data
print('Training features shape:', cv_train.shape)
print('Testing features shape:', cv_test.shape)
print('Training labels shape:', labels_train.shape)
print('Testing labels shape:', labels_test.shape)

Training features shape: (18542, 35)
Testing features shape: (4635, 35)
Training labels shape: (18542,)
Testing labels shape: (4635,)


## Distribute data to worker GPUs

In [13]:
n_partitions = n_workers

def distribute(covariates, labels):

    # Partition with Dask
    # In this case, each worker will train on 1/n_partitions fraction of the data
    cv_dask = dask_cudf.from_cudf(covariates, npartitions=n_partitions)
    labels_dask = dask_cudf.from_cudf(labels, npartitions=n_partitions)

    # Persist to cache the data in active memory
    cv_dask, labels_dask = \
      dask_utils.persist_across_workers(c, [cv_dask, labels_dask], workers=workers)
    
    return cv_dask, labels_dask

cv_train_dask, labels_train_dask = distribute(cv_train, labels_train)
cv_test_dask, labels_test_dask = distribute(cv_test, labels_test)

## Train the distributed cuML model

In [14]:
# Declare some global variables for training phase

# Hyper-paramters
N_ESTIMATORS = 1000
SPLIT_ALGO = 1
SPLIT_CRITERION = 2
BOOTSTRAP = True
BOOTSTRAP_FEATURES = False
ROWS_SAMPLE = 1.0
MAX_DEPTH = 16
MAX_LEAVES = -1
MAX_FEATURES = 'auto'
N_BINS = 8
MIN_ROWS_PER_NODE = 2
MIN_IMPURITY_DECREASE = 0.0
ACCURACY_METRIC = 'mean_ae' # 'mse' #'r2' # 'median_aw' # 
QUANTILEPT = False
SEED = 42
VERBOSE = False

In [15]:
depth_rf_model_0 = cumlDaskRF(n_estimators = N_ESTIMATORS, 
                        split_algo = SPLIT_ALGO, 
                        split_criterion = SPLIT_CRITERION, 
                        bootstrap = BOOTSTRAP,
                        bootstrap_features = BOOTSTRAP_FEATURES, 
                        rows_sample = ROWS_SAMPLE,
                        max_depth = MAX_DEPTH, 
                        max_leaves = MAX_LEAVES, 
                        max_features = MAX_FEATURES,
                        n_bins = N_BINS,
                        min_rows_per_node = MIN_ROWS_PER_NODE,
                        min_impurity_decrease = MIN_IMPURITY_DECREASE,
                        accuracy_metric = ACCURACY_METRIC,
                        quantile_per_tree = QUANTILEPT,
                        seed = SEED,
                        verbose = VERBOSE)

In [16]:
%%time

depth_rf_model_0.fit(cv_train_dask, labels_train_dask)
wait(depth_rf_model_0.rfs) # Allow asynchronous training tasks to finish

CPU times: user 145 ms, sys: 32.6 ms, total: 178 ms
Wall time: 4.07 s


DoneAndNotDoneFutures(done={<Future: finished, type: cuml.RandomForestRegressor, key: _construct_rf-63fbe5e5-4ad9-4f01-adee-8bff43462bcc>}, not_done=set())

# Predict and check accuracy

In [18]:
cuml_y_pred = depth_rf_model_0.predict(cv_test_dask).compute().to_array()

In [27]:
# Let's get some prediction
from sklearn.metrics import mean_absolute_error as m_a_e, r2_score as r2d2

mae_score = m_a_e(labels_test.to_array(), cuml_y_pred)
r2_score = r2d2(labels_test.to_array(), cuml_y_pred)
print("Scores --")
print("MAE: ", mae_score)
print("r2: ", r2_score)

Scores --
MAE:  1.0630636
r2:  -0.04085518364929608


