# XGBoost -- Distributed Computing with Dask and Rapids

In this notebook, we look at the wildly different results, as well as syntax, required when XGBoost is trained in a (simulated) distributing computing environment.

While it is possible to train XGBoost using distributed CPU resources only with vanilla Dask, in this notebook we will be demonstrating NVIDIA's RAPIDS. 

Just as Dask extends the Pandas dataframe into a distributed CPU environment, Rapids extends the Pandas dataframe into a distributed GPU environment by storing tabular data in columnar PyArrow format across an arbitrary number of GPUs.

Unlike PyTorch, XGBoost has no native support for this kind of distributed processing, and so it falls to these specialist frameworks to handle most of the heavy lifting.

RESOURCES AND DATASETS

[NVIDIA XGBoost/CuDF Walkthrough](https://developer.nvidia.com/blog/accelerating-xgboost-on-gpu-clusters-with-dask/)

[Dask-ml Documentation](https://ml.dask.org/)

# Environment Sanity Check #

Click the _Runtime_ dropdown at the top of the page, then _Change Runtime Type_ and confirm the instance type is _GPU_.

Check the output of `!nvidia-smi` to make sure you've been allocated a Tesla T4, P4, or P100.

In [None]:
!nvidia-smi

Tue Nov 30 13:54:23 2021       
+-----------------------------------------------------------------------------+
| NVIDIA-SMI 495.44       Driver Version: 460.32.03    CUDA Version: 11.2     |
|-------------------------------+----------------------+----------------------+
| GPU  Name        Persistence-M| Bus-Id        Disp.A | Volatile Uncorr. ECC |
| Fan  Temp  Perf  Pwr:Usage/Cap|         Memory-Usage | GPU-Util  Compute M. |
|                               |                      |               MIG M. |
|   0  Tesla P100-PCIE...  Off  | 00000000:00:04.0 Off |                    0 |
| N/A   44C    P0    29W / 250W |      0MiB / 16280MiB |      0%      Default |
|                               |                      |                  N/A |
+-------------------------------+----------------------+----------------------+
                                                                               
+-----------------------------------------------------------------------------+
| Proces

#Setup xgboost, dask-ml, RAPIDS

Unfortunately, setting up Dask and RAPIDS in Colab is quite a chore. That said, if you follow the instructions in the cells below, you'll be up and running in about a half an hour. 

Expect to have to monitor the first 5-10 minutes, as there will be multiple restarts required. You also lose access to the handy file browser included in Colab, because in order to run RAPIDS, you actually have to change the environment of the underlying cloud server to Conda, because RAPIDS doesn't support pip installation.

Procedure:
1. Update gcc in Colab
1. Install Conda
1. Install RAPIDS' current stable version of its libraries, as well as some external libraries including:
  1. cuDF
  1. cuML
  1. cuGraph
  1. cuSpatial
  1. cuSignal
  1. BlazingSQL
  1. xgboost
1. Copy RAPIDS .so files into current working directory, a neccessary workaround for RAPIDS+Colab integration.
5. Install additional modules (dask-ml, pytorch lightning) using pip to avoid extremely slow conda environment checks. The modules will be visible in conda, as will the native Colab modules.

Overall, expect the installs to take 20-30 minutes -- **they will also require multiple restarts of the Colab instance**, so if you are running this on your own, be prepared to monitor this process pretty closely. Once the conda installs are running, you should be OK, unless you have changed something elsewhere in the environment.

In [None]:
# This get the RAPIDS-Colab install files and test check your GPU.  Run this and the next cell only.
# Please read the output of this cell.  If your Colab Instance is not RAPIDS compatible, it will warn you and give you remediation steps.
!git clone https://github.com/rapidsai/rapidsai-csp-utils.git
!python rapidsai-csp-utils/colab/env-check.py

fatal: destination path 'rapidsai-csp-utils' already exists and is not an empty directory.
***********************************************************************
Woo! Your instance has the right kind of GPU, a Tesla P100-PCIE-16GB!
***********************************************************************



In [None]:
# This will update the Colab environment and restart the kernel.  Don't run the next cell until you see the session crash.
!bash rapidsai-csp-utils/colab/update_gcc.sh
import os
os._exit(00)

Updating your Colab environment.  This will restart your kernel.  Don't Panic!
Get:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease [3,626 B]
Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
Hit:3 http://ppa.launchpad.net/c2d4u.team/c2d4u4.0+/ubuntu bionic InRelease
Get:4 http://security.ubuntu.com/ubuntu bionic-security InRelease [88.7 kB]
Hit:5 http://archive.ubuntu.com/ubuntu bionic InRelease
Get:6 http://archive.ubuntu.com/ubuntu bionic-updates InRelease [88.7 kB]
Ign:7 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
Hit:8 http://ppa.launchpad.net/cran/libgit2/ubuntu bionic InRelease
Hit:9 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
Hit:10 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  Release
Hit:13 http://ppa.launchpad.net/deadsnakes/ppa/ubuntu bionic InRelease
Get:14 http://archive.ubu

In [None]:
!pip install dask-ml rasterio ipython-autotime pymongo[tls,srv]
%load_ext autotime

Collecting dask-ml
  Using cached dask_ml-2021.11.30-py3-none-any.whl (148 kB)
Collecting rasterio
  Using cached rasterio-1.2.10-cp37-cp37m-manylinux1_x86_64.whl (19.3 MB)
Collecting ipython-autotime
  Using cached ipython_autotime-0.3.1-py2.py3-none-any.whl (6.8 kB)
Collecting pymongo[srv,tls]
  Downloading pymongo-4.0-cp37-cp37m-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (452 kB)
[K     |████████████████████████████████| 452 kB 4.1 MB/s 
[?25hCollecting scikit-learn>=1.0.0
  Downloading scikit_learn-1.0.1-cp37-cp37m-manylinux_2_12_x86_64.manylinux2010_x86_64.whl (23.2 MB)
[K     |████████████████████████████████| 23.2 MB 1.3 MB/s 
Collecting dask-glm>=0.2.0
  Using cached dask_glm-0.2.0-py2.py3-none-any.whl (12 kB)
Collecting dnspython<3.0.0,>=1.16.0
  Using cached dnspython-2.1.0-py3-none-any.whl (241 kB)
Collecting snuggs>=1.4.1
  Using cached snuggs-1.4.7-py3-none-any.whl (5.4 kB)
Collecting affine
  Using cached affine-2.3.0-py2.py3-none-any.whl (15 kB)
Installing collect

In [None]:
# This will install CondaColab.  This will restart your kernel one last time.  Run this cell by itself and only run the next cell once you see the session crash.
import condacolab
condacolab.install()

✨🍰✨ Everything looks OK!
time: 12.1 ms (started: 2021-12-05 19:52:11 +00:00)


In [None]:
# you can now run the rest of the cells as normal
import condacolab
condacolab.check()

✨🍰✨ Everything looks OK!
time: 1.99 ms (started: 2021-12-05 19:54:36 +00:00)


In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
# Installing RAPIDS is now 'python rapidsai-csp-utils/colab/install_rapids.py <release> <packages>'
# The <release> options are 'stable' and 'nightly'.  Leaving it blank or adding any other words will default to stable.
# The <packages> option are default blank or 'core'.  By default, we install RAPIDSAI and BlazingSQL.  The 'core' option will install only RAPIDSAI and not include BlazingSQL, 
!python rapidsai-csp-utils/colab/install_rapids.py stable
import os
os.environ['NUMBAPRO_NVVM'] = '/usr/local/cuda/nvvm/lib64/libnvvm.so'
os.environ['NUMBAPRO_LIBDEVICE'] = '/usr/local/cuda/nvvm/libdevice/'
os.environ['CONDA_PREFIX'] = '/usr/local'

Installing RAPIDS Stable 21.10
Starting the RAPIDS install on Colab.  This will take about 15 minutes.
Collecting package metadata (current_repodata.json): ...working... done
done

# All requested packages already installed.

RAPIDS conda installation complete.  Updating Colab's libraries...
Copying /usr/local/lib/libcudf.so to /usr/lib/libcudf.so
Copying /usr/local/lib/libnccl.so to /usr/lib/libnccl.so
Copying /usr/local/lib/libcuml.so to /usr/lib/libcuml.so
Copying /usr/local/lib/libcugraph.so to /usr/lib/libcugraph.so
Copying /usr/local/lib/libxgboost.so to /usr/lib/libxgboost.so
Copying /usr/local/lib/libcuspatial.so to /usr/lib/libcuspatial.so
Copying /usr/local/lib/libgeos.so to /usr/lib/libgeos.so
Copying /usr/local/lib/libgeos_c.so to /usr/lib/libgeos_c.so
time: 47 s (started: 2021-12-05 19:52:18 +00:00)


In [None]:
import warnings
warnings.filterwarnings("ignore")

time: 1.12 ms (started: 2021-12-05 19:53:06 +00:00)


In [None]:
from pymongo import MongoClient
uri = "mongodb+srv://baffree.pm9ae.mongodb.net/myFirstDatabase?authSource=%24external&authMechanism=MONGODB-X509&retryWrites=true&w=majority"
client = MongoClient(uri,
                     tls=True,
                     tlsCertificateKeyFile='/content/drive/MyDrive/Vault/Mongo/X509-cert-7734489939085958648.pem')
db = client['bd']
collection = db['pt_xgb']

time: 285 ms (started: 2021-12-05 19:53:05 +00:00)


In [None]:
try:
  test_doc = {"name" : "test"}
  collection.insert_one(test_doc)
  print("insertion complete")
  query = {"name": "test"}
  cursor = collection.find(query)
  for document in cursor:
        print(document)
  d = collection.delete_many(query)
  print(d.deleted_count, " documents deleted !!")
except:
  print("check mongoDB connection")

In [None]:
import sys
import os
import random
import time
import argparse
from time import time
from datetime import datetime
import pickle
from typing import Tuple

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn import metrics
from sklearn.model_selection import cross_val_score

import cv2
import rasterio
from rasterio.plot import show
from imutils import paths
import tqdm.notebook as tq
import imutils

import dask
from dask.distributed import Client
import xgboost as xgb
from xgboost import dask as dxgb
from dask import dataframe as dd
import dask_cudf
from dask_cuda import LocalCUDACluster
from distributed import Client, wait
from dask_ml.model_selection import train_test_split
import pyarrow as pa
import cupy

time: 3.82 s (started: 2021-12-05 19:53:07 +00:00)


# Background Information on Dask and RAPIDS

At this point, it's worth taking a few moments to explain the interrelated data formats we're using for this demonstration. Although they appear redundant, and in many ways they're intended to BE redundant, each of these DataFrame formats is its own object with its own set of defined methods, its own use case, living on its own type of device.

[This](https://code-love.com/2020/12/06/rapids-introduction/) guide covers some, but not all, of the following comparisons --

* **PANDAS**: Full functionality, great documentation and support. DataFrame lives in RAM, all functions run on 1 CPU, limited to 1 node.
* **DASK-DF**: Lazy evaluation extension of a Pandas DataFrame.
* **DASK ARRAY**: Lazy evaluation extension of a Numpy ndarray.
Both support multiple nodes, and multiple CPUs. The dataframe lives in RAM.
* **cuDF**: Real-time evaluation. Limited functionality and support. DataFrame lives in VRAM, all functions run in parallel on a single GPU, limited to 1 node.
* **Dask-cuDF**: Lazy evaluation extension of cuDF. The dataframe lives in VRAM, divided across an arbitrary number of GPUs and nodes. Quite limited functionality. Can handle arbitrarily large datasets, and will intelligently make use of RAM when VRAM runs out.

The above is not a comprehensive list -- for instance, we aren't discussing Series, which are implemented in some degree on all of the above frameworks. In some of them, Series break down to ndarrays, and in others, they do not.

Managing the interplay between these formats can be extremely tricky -- not only do we have to keep track of lazy evaluation, we also have to keep track of which device our data is stored on. This is a particular problem if we wish to go beyond the very limited set of built-in functions available in Dask on the CuDF itself -- going back and forth is extremely expensive, and it's something we only want to do once if we can possibly manage it.

Like Spark, Dask uses the analogy of a computation graph, which it manages via Python's "with" context manager. This means that a Dask cluster is only 'alive' for a very limited period of time. This makes it challenging to use in an interactive environment, where we grow accustomed to having our variables 'hang around' for as long as we want them.

# Image Classification: Ground-Level Fire Imagery

For discussion of the datasets and models, please see the previous notebooks.

## Setup

Distributed systems gain theoretical speed benefits when working from data that supports partitioning, like parquet files -- therefore, we are using them in this example, even though on a single system, this will probably result in a performance loss, rather than gain.

The commented cells are for reference purposes only, to document how we create the parquet file from the original image data. We don't want these cells to run every time, because they take around 30 minutes.

In [None]:
class Config:
    # initialize the path to the fire and non-fire dataset directories
    FIRE_PATH = os.path.sep.join(["/content/drive/MyDrive/datasets/Robbery_Accident_Fire_Database2",
        "Fire"])
    NON_FIRE_PATH = "/content/drive/MyDrive/datasets/spatial_envelope_256x256_static_8outdoorcategories"
    
    # initialize the class labels in the dataset
    CLASSES = ["Non-Fire", "Fire"]

    # define the size of the training and testing split
    TRAIN_SPLIT = 0.75
    TEST_SPLIT = 0.25
    MAX_SAMPLES = 1300
    CROSSVAL = 0
    
    # define the initial learning rate, batch size, and number of epochs
    INIT_LR = .01
    BATCH_SIZE = 64
    NUM_EPOCHS = 50

    # set the path to the serialized model after training
    MODEL_PATH = os.path.sep.join(["/content/drive/MyDrive/School/NYU/Big Data Fall 2021/Project/", "fire_detection.model"])
    # set the path to the parquet file
    PARQUET_PATH = os.path.sep.join(["/content/drive/MyDrive/datasets/", "groundfire.parquet"])
    config.PARQUET_PATH
    # define the path to the output learning rate finder plot and
    # training history plot
    LRFIND_PLOT_PATH = os.path.sep.join(["/content/drive/MyDrive/School/NYU/Big Data Fall 2021/Project/", "lrfind_plot.png"])
    TRAINING_PLOT_PATH = os.path.sep.join(["/content/drive/MyDrive/School/NYU/Big Data Fall 2021/Project/", "training_plot.png"])

    # define the path to the output directory that will store our final
    # output with labels/annotations along with the number of images to
    # sample
    OUTPUT_IMAGE_PATH = os.path.sep.join(["/content/drive/MyDrive/School/NYU/Big Data Fall 2021/Project/", "examples"])
    SAMPLE_SIZE = 50

# initialize the configuration object
config = Config()

In [None]:
# def load_dataset(datasetPath, shape, max_samples):
#   # grab the paths to all images in our dataset directory, then
#   # initialize our lists of images
#   imagePaths = list(paths.list_images(datasetPath))
#   data = []

#   # loop over the image paths, limiting size of dataset to 1300 samples per class
#   count = 0
#   for imagePath in tq.tqdm(imagePaths):
#     count += 1
#     if count > max_samples:
#       print("breaking at {}".format(max_samples))
#       break
#     # ignoring aspect ratio
#     image = cv2.imread(imagePath)
#     # load the image and resize it to be a fixed 128x128 pixels
#     image = cv2.resize(image, (128, 128))
#     if shape == "row":
#       image = np.asarray(image).flatten()
#     # add the image to the data lists
#     data.append(image)

#   # return the data list as a NumPy array
#   np_data = np.array(data, dtype="float32")
#   np_data /= 255
#   # Scale to [0,1]
#   return np_data

In [None]:
# fireData = load_dataset(config.FIRE_PATH, "row", config.MAX_SAMPLES)
# # print(fireData.shape)
# nonFireData = load_dataset(config.NON_FIRE_PATH, "row", config.MAX_SAMPLES)
# fireData = np.concatenate([fireData, np.ones((fireData.shape[0],1),dtype=fireData.dtype)], axis=1)
# nonFireData = np.concatenate([nonFireData, np.zeros((nonFireData.shape[0],1),dtype=nonFireData.dtype)], axis=1)
# data = np.vstack([fireData, nonFireData])
# # print(data.shape)
# #2600 rows, 49153 cols

Because of the way they are intended to function, parquet files have to be literal files -- we can't simply play around with them in memory as we can with, say, a Pandas DataFrame.

Assembling a parquet file row by row from a series of Numpy arrays, as we do here, is an unconventional use of the format, to say the least, but for the purposes of demonstration, it's fine.

In [None]:
# data_T = data.transpose()
# arrays = [
#   pa.array(col)  # Create one arrow array per column
#   for col in data_T
# ]
# table = pa.Table.from_arrays(
#     arrays,
#     names=['feature-{}'.format(i) for i in range(len(arrays)-1)]+["label"] # give names to each columns
# )
# pa.parquet.write_table(table, 'groundfire.parquet')

In [None]:
## Save the model

# from google.colab import files

# files.download('groundfire.parquet')

## Train and Test

If we were running a true simulation, we would not reuse our validation data as test data. However, for the purposes of demonstrating the methodology, this is not a concern.

Note that unlike our non-distributed training loop, this training loop makes extensive use of functions. This makes sense for distributed computing, which naturally lends itself to a functional programming paradigm. It also means we don't have to have a single ipynb cell with a zillion lines of unrelated code sitting inside of a context manager, thank goodness.

In the cells below, you'll see Dask's persist() and wait() methods. These are extremely important. Wait is a typical async-await paradigm similar to what one might encounter when dealing with semaphores, parallel processing, or certain Javascript APIs -- but in this case, we are waiting for the various Dask workers to return their portions of the data before we proceed.

persist() is a somewhat unconventional concept in Dask. A persist() object is not guaranteed to be available outside the context manager, but it is more likely to be available than it would be otherwise. It's cheaper than compute(), which is Dask's method for forcing evaluation of the graph, and serves as a kind of middle ground between no execution and complete execution of the graph.

In [None]:
def load_groundfire(
    path,
) -> Tuple[
    dask_cudf.DataFrame, dask_cudf.Series, dask_cudf.DataFrame, dask_cudf.Series
]:
    df = dask_cudf.read_parquet(path)

    y = df["label"]
    X = df[df.columns.difference(["label"])]

    X_train, X_valid, y_train, y_valid = train_test_split(
        X, y, test_size=0.25, shuffle=True
    )
    X_train, X_valid, y_train, y_valid = client.persist(
        [X_train, X_valid, y_train, y_valid]
    )
    wait([X_train, X_valid, y_train, y_valid])

    return X_train, X_valid, y_train, y_valid

A version of XGBoost is actually implemented within Dask itself -- that's the dxgb we see in the below function. Note that dxgb is responsible for storing the data, since it understands that it may be distributed, which conventional XGBoost would not, of course.

The train function is using a special tree method here, gpu_hist, designed to take advantage of the fact that the data is stored on a GPU.

In [None]:
def fit_model_customized_es(client, X, y, X_valid, y_valid):
    early_stopping_rounds = 10

    es = xgb.callback.EarlyStopping(rounds=early_stopping_rounds, save_best=True)

    Xy = dxgb.DaskDeviceQuantileDMatrix(client, X, y)

    Xy_valid = dxgb.DaskDMatrix(client, X_valid, y_valid)

    booster = xgb.dask.train(
        client,
        {
            "objective": "binary:logistic",
            "eval_metric": "error",
            "tree_method": "gpu_hist",
        },
        Xy,
        evals=[(Xy_valid, "Valid")],
        num_boost_round=1000,
        callbacks=[es],
    )["booster"]
    return booster

In [None]:
def explain(client, model, X):
   # Use array instead of dataframe in case of output dim is greater than 2.
   X_array = X.values
   contribs = dxgb.predict(
       client, model, X_array, pred_contribs=True, validate_features=False
   )
   # Use the result for further analysis
   return contribs

In [None]:
def predict(client, model, X):
    predt = dxgb.predict(client, model, X)
    assert isinstance(predt, dd.Series)
    return predt

In [None]:
def save_results(expected_y, predicted_y, contribs, start_time, config):
  # make predictions

  cur_time = datetime.now().strftime("%d_%m_%Y_%H_%M_%S")

  keystr = "xgb_ground_dist_" + cur_time

  classif = metrics.classification_report(expected_y, predicted_y)

  conf = metrics.confusion_matrix(expected_y, predicted_y)

  rtime = time() - start_time

  results = {"name" : keystr, "classifier" : "xgboost", "dataset" : "ground", "distributed" : True, "metric" : classif, "confmatrix" : conf.tolist(), "runtime" : rtime, "train_test_split" : config.TEST_SPLIT, "cross_val" : config.CROSSVAL}

  #store in mongoDB

  collection.insert_one(results)

  #store to local pickle

  PICKLE_FILE = keystr + ".pkl"

  PICKLE_PATH = os.path.join(PATH, PICKLE_FILE)

  with open(PICKLE_PATH, 'wb') as handle:
      pickle.dump(results, handle, protocol=pickle.HIGHEST_PROTOCOL)

  # summarize the fit of the model
  print(); print(classif)
  print(); print(conf)

  print()
  print("Execution Time %s seconds: " % (rtime)) 

The cell below preserves pointers to the model, dataframes and results locally.

In a true distributed environment (IE, in production), we would perform all operations inside Dask's context manager so that the load could be distributed over multiple nodes. But in Colab, we only have one node, and doing this allows us to inspect and manipulate the data after the context manager has run (or even if it crashes!)

One thing we cannot do is alter the dataframes -- if we do, Dask will know and produce an error --

```
Inputs contain futures that were created by another client
```

A comment on timing -- it's interesting to note that the data-processing runs 4-5x slower in Dask/CuDF than it does in SKLearn/Numpy. Distributing data only saves time in a true distributed environment.

In [None]:
X_train, X_valid, y_train, y_valid, booster, preds, contribs = None, None, None, None, None, None, None

In [None]:
if __name__ == "__main__":
   start_time = time()
   with LocalCUDACluster() as cluster:
       print("dashboard:", cluster.dashboard_link)
       with Client(cluster) as client:
          print("Load data ...")
          X_train, X_valid, y_train, y_valid = load_groundfire(config.PARQUET_PATH)
          print("Load complete.")
          print("Begin model training...")
          booster = fit_model_customized_es(client, X_train, y_train, X_valid, y_valid)
          print("Training complete.")
          preds = predict(client, booster, X_valid)
          #print("type of preds and yvalid")
          #print(type(preds)) #dask.dataframe.core.series
          # print(type(y_valid)) #dask_cudf.core.series
          preds_np = cupy.asnumpy(preds.to_dask_array()) 
          preds_np = np.rint(preds_np)
          # print("type of preds_np and y_valid_df")
          #print(type(preds_np)) #numpy array
          #y_valid_df = y_valid.compute() #cudf.core.series.Series
          # print(type(y_valid_df)) 
          #y_valid_np = cupy.asnumpy(y_valid_df)
          #y_valid_np = y_valid.compute().values #cupy._core.core.ndarray
          y_valid_np = cupy.asnumpy(y_valid.compute().values)
          # print(type(y_valid_np))
          contribs = explain(client, booster, X_train)
          contribs_np = cupy.asnumpy(contribs)
          contribs_list = contribs_np.tolist()
          save_results(y_valid_np, preds_np, contribs_list, start_time, config)
          # print("---METRICS---"); print(metrics.classification_report(y_valid_np, preds_np))
          # print("---CONFUSION MATRIX---"); print(metrics.confusion_matrix(y_valid_np, preds_np))
          # print("---CONTRIBUTIONS TO PREDICTION---"); print(contribs)
   print("Execution Time %s seconds: " % (time() - start_time))

dashboard: http://127.0.0.1:8787/status
Load data ...
Load complete.
Begin model training...
[0]	Valid-error:0.29100
[1]	Valid-error:0.27010
[2]	Valid-error:0.25723
[3]	Valid-error:0.25884
[4]	Valid-error:0.25402
[5]	Valid-error:0.24437
[6]	Valid-error:0.24598
[7]	Valid-error:0.25080
[8]	Valid-error:0.23633
[9]	Valid-error:0.24920
[10]	Valid-error:0.23794
[11]	Valid-error:0.24277
[12]	Valid-error:0.22669
[13]	Valid-error:0.22187
[14]	Valid-error:0.21383
[15]	Valid-error:0.21704
[16]	Valid-error:0.21543
[17]	Valid-error:0.21061
[18]	Valid-error:0.21543
[19]	Valid-error:0.20900
[20]	Valid-error:0.21383
[21]	Valid-error:0.21383
[22]	Valid-error:0.20900
[23]	Valid-error:0.20579
[24]	Valid-error:0.21383
[25]	Valid-error:0.20579
[26]	Valid-error:0.19936
[27]	Valid-error:0.19453
[28]	Valid-error:0.19293
[29]	Valid-error:0.20739
[30]	Valid-error:0.20097
[31]	Valid-error:0.20257
[32]	Valid-error:0.20257
[33]	Valid-error:0.20257
[34]	Valid-error:0.20579
[35]	Valid-error:0.19936
[36]	Valid-error:

# Image Classification: LANDSAT-8 Imagery

## Setup

The commented cells are for reference purposes only, to document how I created the parquet file from the original image data. We don't want these cells to run every time, because they take around 30 minutes.

In [None]:
class Config:
    # initialize the path to the fire and non-fire dataset directories
    FIRE_PATH = os.path.sep.join(["/content/drive/MyDrive/datasets/landsat_mini/Training",
        "Fire"])
    NON_FIRE_PATH = os.path.sep.join(["/content/drive/MyDrive/datasets/landsat_mini/Training", "No_Fire"])
    
    # initialize the class labels in the dataset
    CLASSES = ["Non-Fire", "Fire"]

    # define the size of the training and testing split
    TRAIN_SPLIT = 0.75
    TEST_SPLIT = 0.25
    MAX_SAMPLES = 1300
    CROSSVAL = 0

    # set the path to the serialized model after training
    MODEL_PATH = os.path.sep.join(["/content/drive/MyDrive/School/NYU/Big Data Fall 2021/Project/", "fire_detection_xgb_landsat.model"])

    PARQUET_PATH = os.path.sep.join(["/content/drive/MyDrive/datasets/", "landsatfire.parquet"])

    PATH = r"/content/drive/MyDrive/School/NYU/Big Data Fall 2021/Project"
    
    # define the path to the output learning rate finder plot and
    # training history plot
    LRFIND_PLOT_PATH = os.path.sep.join(["/content/drive/MyDrive/School/NYU/Big Data Fall 2021/Project/", "lrfind_plot_xgb_landsat.png"])
    TRAINING_PLOT_PATH = os.path.sep.join(["/content/drive/MyDrive/School/NYU/Big Data Fall 2021/Project/", "training_plot_xgb_landsat.png"])

    # define the path to the output directory that will store our final
    # output with labels/annotations along with the number of images to
    # sample
    OUTPUT_IMAGE_PATH = os.path.sep.join(["/content/drive/MyDrive/School/NYU/Big Data Fall 2021/Project/", "examples"])
    SAMPLE_SIZE = 50

# initialize the configuration object
config = Config()

time: 19.2 ms (started: 2021-12-05 20:13:18 +00:00)


In [None]:
# def load_dataset(datasetPath, shape, max_samples):
#   # grab the paths to all images in our dataset directory, then
#   # initialize our lists of images
#   imagePaths = list(paths.list_images(datasetPath))
#   data = []

#   # loop over the image paths, limiting size of dataset to 1300 samples per class
#   count = 0
#   for imagePath in tq.tqdm(imagePaths):
#     count += 1
#     if count > max_samples:
#       print("breaking at {}".format(max_samples))
#       break
#     # ignoring aspect ratio
#     img = rasterio.open(imagePath)
#     image = rasterio.plot.reshape_as_image(img.read((2,3,7)))
#     if shape == "row":
#       image = image.flatten()
#     # add the image to the data lists
#     data.append(image)

#   # return the data list as a NumPy array
#   np_data = np.array(data, dtype="float32")
#   # print(np_data.shape)
#   # Scale to [0,1]
#   np_data /= (np_data.max()-np_data.min())
#   return np_data

time: 3.84 ms (started: 2021-12-05 19:54:49 +00:00)


In [None]:
# fireData = load_dataset(config.FIRE_PATH, "row", config.MAX_SAMPLES)
# # print(fireData.shape)
# nonFireData = load_dataset(config.NON_FIRE_PATH, "row", config.MAX_SAMPLES)
# fireData = np.concatenate([fireData, np.ones((fireData.shape[0],1),dtype=fireData.dtype)], axis=1)
# nonFireData = np.concatenate([nonFireData, np.zeros((nonFireData.shape[0],1),dtype=nonFireData.dtype)], axis=1)
# data = np.vstack([fireData, nonFireData])
# # print(data.shape)

time: 1.55 ms (started: 2021-12-05 19:54:49 +00:00)


In [None]:
# data_T = data.transpose()
# arrays = [
#   pa.array(col)  # Create one arrow array per column
#   for col in data_T
# ]
# table = pa.Table.from_arrays(
#     arrays,
#     names=['feature-{}'.format(i) for i in range(len(arrays)-1)]+["label"] # give names to each columns
# )
# pa.parquet.write_table(table, 'landsatfire.parquet')

time: 1.7 ms (started: 2021-12-05 19:54:49 +00:00)


In [None]:
## Debug cells

# !ls -lh groundfire.parquet
# df = dask_cudf.read_parquet('groundfire.parquet')

time: 1.08 ms (started: 2021-12-05 19:54:49 +00:00)


In [None]:
# # Save the model

# from google.colab import files

# files.download('landsatfire.parquet')

time: 1.19 ms (started: 2021-12-05 19:54:49 +00:00)


## Train and Test

If we were running a true simulation, we would not reuse our validation data as test data. However, for the purposes of demonstrating the methodology, this is not a concern, so long as we remember that the accuracy values here are definitely NOT accurate.

In [None]:
def load_groundfire(
    path,
) -> Tuple[
    dask_cudf.DataFrame, dask_cudf.Series, dask_cudf.DataFrame, dask_cudf.Series
]:
    df = dask_cudf.read_parquet(path)

    y = df["label"]
    X = df[df.columns.difference(["label"])]

    X_train, X_valid, y_train, y_valid = train_test_split(
        X, y, test_size=0.25, shuffle=True
    )
    X_train, X_valid, y_train, y_valid = client.persist(
        [X_train, X_valid, y_train, y_valid]
    )
    wait([X_train, X_valid, y_train, y_valid])

    return X_train, X_valid, y_train, y_valid

time: 8.75 ms (started: 2021-12-05 20:13:45 +00:00)


In [None]:
def fit_model_customized_es(client, X, y, X_valid, y_valid):
    early_stopping_rounds = 10

    es = xgb.callback.EarlyStopping(rounds=early_stopping_rounds, save_best=True)

    Xy = dxgb.DaskDeviceQuantileDMatrix(client, X, y)

    Xy_valid = dxgb.DaskDMatrix(client, X_valid, y_valid)

    booster = xgb.dask.train(
        client,
        {
            "objective": "binary:logistic",
            "eval_metric": "error",
            "tree_method": "gpu_hist",
        },
        Xy,
        evals=[(Xy_valid, "Valid")],
        num_boost_round=1000,
        callbacks=[es],
    )["booster"]
    return booster

time: 7.12 ms (started: 2021-12-05 20:13:45 +00:00)


In [None]:
def explain(client, model, X):
   # Use array instead of dataframe in case of output dim is greater than 2.
   X_array = X.values
   contribs = dxgb.predict(
       client, model, X_array, pred_contribs=True, validate_features=False
   )
   # Use the result for further analysis
   return contribs

time: 3 ms (started: 2021-12-05 20:13:45 +00:00)


In [None]:
def predict(client, model, X):
    predt = dxgb.predict(client, model, X)
    assert isinstance(predt, dd.Series)
    return predt

time: 2 ms (started: 2021-12-05 20:13:45 +00:00)


In [None]:
def save_results(expected_y, predicted_y, contribs, start_time, config):
  # make predictions

  cur_time = datetime.now().strftime("%d_%m_%Y_%H_%M_%S")

  keystr = "xgb_sat_dist_" + cur_time

  classif = metrics.classification_report(expected_y, predicted_y)

  conf = metrics.confusion_matrix(expected_y, predicted_y)

  rtime = time() - start_time

  results = {"name" : keystr, "classifier" : "xgboost", "dataset" : "satellite", "distributed" : True, "metric" : classif, "confmatrix" : conf.tolist(), "runtime" : rtime, "train_test_split" : config.TEST_SPLIT, "cross_val" : config.CROSSVAL}

  #store in mongoDB

  collection.insert_one(results)

  #store to local pickle

  PICKLE_FILE = keystr + ".pkl"

  PICKLE_PATH = os.path.join(config.PATH, PICKLE_FILE)

  with open(PICKLE_PATH, 'wb') as handle:
      pickle.dump(results, handle, protocol=pickle.HIGHEST_PROTOCOL)

  # summarize the fit of the model
  print(); print(classif)
  print(); print(conf)

  print()
  print("Execution Time %s seconds: " % (rtime)) 

time: 24.4 ms (started: 2021-12-05 20:13:45 +00:00)


The cell below preserves pointers to the model, dataframes and results locally.

In a true distributed environment (IE, in production), we would perform all operations inside Dask's context manager so that the load could be distributed over multiple nodes. But in Colab, we only have one node, and doing this allows us to inspect and manipulate the data after the context manager has run (or even if it crashes!)

One thing we cannot do is alter the dataframes -- if we do, Dask will know and produce an error --

```
Inputs contain futures that were created by another client
```

A comment on timing -- it's interesting to note that the data-processing runs 4-5x slower in Dask/CuDF than it does in SKLearn/Numpy. Distributing data only saves time in a true distributed environment.

In [None]:
X_train, X_valid, y_train, y_valid, booster, preds, contribs = None, None, None, None, None, None, None

time: 829 µs (started: 2021-12-05 20:13:45 +00:00)


In [None]:
if __name__ == "__main__":
   start_time = time()
   with LocalCUDACluster() as cluster:
       print("dashboard:", cluster.dashboard_link)
       with Client(cluster) as client:
          print("Load data ...")
          X_train, X_valid, y_train, y_valid = load_groundfire(config.PARQUET_PATH)
          print("Load complete.")
          print("Begin model training...")
          booster = fit_model_customized_es(client, X_train, y_train, X_valid, y_valid)
          print("Training complete.")
          preds = predict(client, booster, X_valid)
          #print("type of preds and yvalid")
          #print(type(preds)) #dask.dataframe.core.series
          #print(type(y_valid)) #dask_cudf.core.series
          preds_np = cupy.asnumpy(preds.to_dask_array()) 
          preds_np = np.rint(preds_np) #round predictions to nearest integer (0,1)
          #print("type of preds_np and y_valid_df")
          #print(type(preds_np)) #numpy array
          y_valid_np = cupy.asnumpy(y_valid.compute().values)
          # print(type(y_valid_np)) #numpy array
          contribs = explain(client, booster, X_train)
          # contribs_np = cupy.asnumpy(contribs)
          # contribs_list = contribs_np.tolist()
          save_results(y_valid_np, preds_np, contribs, start_time, config)
          # contribs = contribs.compute().as_matrix()
          # print("---METRICS---"); print(metrics.classification_report(y_valid_np, preds_np))
          # print("---CONFUSION MATRIX---"); print(metrics.confusion_matrix(y_valid_np, preds_np))
          # print("---CONTRIBUTIONS TO PREDICTION---"); print(contribs)
   print("Execution Time %s seconds: " % (time() - start_time))

dashboard: http://127.0.0.1:8787/status
Load data ...
Load complete.
Begin model training...
[0]	Valid-error:0.16667
[1]	Valid-error:0.05556
[2]	Valid-error:0.10000
[3]	Valid-error:0.03333
[4]	Valid-error:0.03333
[5]	Valid-error:0.04444
[6]	Valid-error:0.03333
[7]	Valid-error:0.03333
[8]	Valid-error:0.03333
[9]	Valid-error:0.03333
[10]	Valid-error:0.03333
[11]	Valid-error:0.03333
[12]	Valid-error:0.03333
Training complete.

              precision    recall  f1-score   support

         0.0       0.93      0.96      0.95        27
         1.0       0.98      0.97      0.98        63

    accuracy                           0.97        90
   macro avg       0.96      0.97      0.96        90
weighted avg       0.97      0.97      0.97        90


[[26  1]
 [ 2 61]]

Execution Time 335.193058013916 seconds: 
Execution Time 336.52417731285095 seconds: 
time: 5min 36s (started: 2021-12-05 20:13:45 +00:00)


# Conclusions

While distributed training of XGBoost models on Dask is not quite as intuitive as the vanilla framework, its deep integration with Dask's architecture and ample documentation make it a very reasonable learning curve.

Even in this artificial setting, it's easy to see how the performance gains enabled by distributed computing will quickly become very desirable, and probably even commonplace at some point.