# Random Forests Multi-node, Multi-GPU demo

The experimental cuML multi-node, multi-GPU (MNMG) implementation of random forests leverages Dask to do embarrassingly-parallel model fitting. For a random forest with `N` trees being fit by `W` workers, each worker will build `N / W` trees. During inference, predictions from all `N` trees will be combined.

The caller is responsible for partitioning the data efficiently via Dask. To build an accurate model, it's important to ensure that each worker has a representative chunk of the data. This can come by distributing the data evenly after ensuring that it is well shuffled. Or, given sufficient memory capacity, the caller can replicate the data to all workers. This approach will most closely simulate the single-GPU building approach.

**Note:** cuML 0.9 contains the first, experimental preview release of the MNMG random forest model. The API is subject to change in future releases, and some known limitations remain (listed in the documentation).

For more information on MNMG Random Forest models, see the documentation:
 * https://docs.rapids.ai/api/cuml/stable/api.html#cuml.dask.ensemble.RandomForestClassifier
 * https://docs.rapids.ai/api/cuml/stable/api.html#cuml.dask.ensemble.RandomForestRegressor

In [2]:
import numpy as np
import sklearn

import pandas as pd
import cudf
import cuml

from sklearn import model_selection

from cuml import datasets
from cuml.metrics import accuracy_score
from cuml.dask.common import utils as dask_utils
from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
import dask_cudf

from cuml.dask.ensemble import RandomForestClassifier as cumlDaskRF
from sklearn.ensemble import RandomForestClassifier as sklRF

## Start Dask cluster

In [3]:
# This will use all GPUs on the local host by default
cluster = LocalCUDACluster(threads_per_worker=1)
c = Client(cluster)

# Query the client for all connected workers
workers = c.has_what().keys()
n_workers = len(workers)
n_streams = 8 # Performance optimization

Debugging information
---------------------
old task state: released
old run_spec: DataNode(RandomForestClassifier-5a55a00089dab66bc43e0ab8fc1f183c, type=<class 'cuml.ensemble.randomforestclassifier.RandomForestClassifier'>, RandomForestClassifier())
new run_spec: DataNode(RandomForestClassifier-5a55a00089dab66bc43e0ab8fc1f183c, type=<class 'cuml.ensemble.randomforestclassifier.RandomForestClassifier'>, RandomForestClassifier())
old token: ('DataNode', 'd90c56e9272ac5c35b8f3d5a5567660a')
new token: ('DataNode', 'ce877e9237b21540a8733c49e330d627')
old dependencies: set()
new dependencies: set()



## Define Parameters

In addition to the number of examples, random forest fitting performance depends heavily on the number of columns in a dataset and (especially) on the maximum depth to which trees are allowed to grow. Lower `max_depth` values can greatly speed up fitting, though going too low may reduce accuracy.

In [4]:
# Data parameters
train_size = 100000
test_size = 1000
n_samples = train_size + test_size
n_features = 20

# Random Forest building parameters
max_depth = 12
n_bins = 16
n_trees = 1000

## Generate Data on host

In this case, we generate data on the client (initial process) and pass it to the workers. You could also load data directly onto the workers via, for example, `dask_cudf.read_csv()`. See also the k-means MNMG notebook (kmeans_mnmg_demo.ipynb) for an alternative method of generating data on the worker nodes.

In [14]:
X, y = datasets.make_classification(n_samples=n_samples, n_features=n_features,
                                 n_clusters_per_class=1, n_informative=int(n_features / 3),
                                 random_state=123, n_classes=5)
X = X.astype(np.float32)
y = y.astype(np.int32)
X_train, X_test, y_train, y_test = model_selection.train_test_split(X, y, test_size=test_size)

## Distribute data to worker GPUs

In [18]:
n_partitions = n_workers

def distribute(X, y):
    # First convert to cudf (with real data, you would likely load in cuDF format to start)
    X_cudf = cudf.DataFrame(X)
    y_cudf = cudf.Series(y)

    # Partition with Dask
    # In this case, each worker will train on 1/n_partitions fraction of the data
    X_dask = dask_cudf.from_cudf(X_cudf, npartitions=n_partitions)
    y_dask = dask_cudf.from_cudf(y_cudf, npartitions=n_partitions)

    # Persist to cache the data in active memory
    X_dask, y_dask = \
      dask_utils.persist_across_workers(c, [X_dask, y_dask], workers=workers)
    
    return X_dask, y_dask

X_train_dask, y_train_dask = distribute(X_train, y_train)
X_test_dask, y_test_dask = distribute(X_test, y_test)

## Build a scikit-learn model (single node)

Dask does not currently have a simple wrapper for scikit-learn's RandomForest, but scikit-learn does offer multi-CPU support via joblib, which we'll use.

In [15]:
%%time

# Use all available CPU cores
skl_model = sklRF(max_depth=max_depth, n_estimators=n_trees, n_jobs=-1)
skl_model.fit(X_train.get(), y_train.get())

CPU times: user 11min 12s, sys: 4.17 s, total: 11min 16s
Wall time: 9.5 s


## Train the distributed cuML model

In [16]:
%%time

cuml_model = cumlDaskRF(max_depth=max_depth, n_estimators=n_trees, n_bins=n_bins, n_streams=n_streams)
cuml_model.fit(X_train_dask, y_train_dask)

wait(cuml_model.rfs) # Allow asynchronous training tasks to finish

  return func(**kwargs)
  return func(**kwargs)
  return func(**kwargs)
  return func(**kwargs)
  return func(**kwargs)
  return func(**kwargs)
  return func(**kwargs)
  return func(**kwargs)


CPU times: user 363 ms, sys: 261 ms, total: 624 ms
Wall time: 1.16 s


DoneAndNotDoneFutures(done={<Future: finished, type: cuml.ensemble.randomforestclassifier.RandomForestClassifier, key: _construct_rf-9c350013-4c5c-4578-b8d1-d55cc9f07547>, <Future: finished, type: cuml.ensemble.randomforestclassifier.RandomForestClassifier, key: _construct_rf-e6d6e5ba-fd13-41f1-a606-e304c7ed42d4>, <Future: finished, type: cuml.ensemble.randomforestclassifier.RandomForestClassifier, key: _construct_rf-6c6d4656-2c7f-43b2-8085-747db5c5f157>, <Future: finished, type: cuml.ensemble.randomforestclassifier.RandomForestClassifier, key: _construct_rf-c3fb02a6-7778-4208-91ea-e33d214c7d0c>, <Future: finished, type: cuml.ensemble.randomforestclassifier.RandomForestClassifier, key: _construct_rf-877be5cb-2acd-4cbb-8de7-47d6ba268cea>, <Future: finished, type: cuml.ensemble.randomforestclassifier.RandomForestClassifier, key: _construct_rf-106e2f4f-aa1a-4031-9fc7-6fd2234c36a3>, <Future: finished, type: cuml.ensemble.randomforestclassifier.RandomForestClassifier, key: _construct_rf-48d

# Predict and check accuracy

In [17]:
skl_y_pred = skl_model.predict(X_test.get())
cuml_y_pred = cuml_model.predict(X_test_dask).compute().to_numpy()

# Due to randomness in the algorithm, you may see slight variation in accuracies
print("SKLearn accuracy:  ", accuracy_score(y_test, skl_y_pred))
print("CuML accuracy:     ", accuracy_score(y_test, cuml_y_pred))

This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.
This may cause some slowdown.
Consider loading the data with Dask directly
 or using futures or delayed objects to embed the data into the graph without repetition.
See also https://docs.dask.org/en/stable/best-practices.html#load-data-with-dask for more information.


SKLearn accuracy:   0.9649999737739563
CuML accuracy:      0.19900000095367432
