In [1]:
import numpy as np
import os.path
# import pandas as pd
import modin.pandas as pd
import time
import xgboost as xgb
import sys
from dask.distributed import Client, wait
from dask_cuda import LocalCUDACluster
# import dask_cudf
import dask
import dask.dataframe as dd
import dask_xgboost as dxgb
import matplotlib.pyplot as plt
%matplotlib inline

os.environ["MODIN_ENGINE"] = "dask"  # Modin will use Dask



# Download Data

In [2]:
if sys.version_info[0] >= 3:
    from urllib.request import urlretrieve
else:
    from urllib import urlretrieve

data_url = "https://archive.ics.uci.edu/ml/machine-learning-databases/00280/HIGGS.csv.gz"
dmatrix_train_filename = "./data/higgs_train.dmatrix"
dmatrix_test_filename = "./data/higgs_test.dmatrix"
csv_filename = "./data/HIGGS.csv.gz"
train_rows = 10500000
test_rows = 500000
num_round = 200 # originally 1000

plot = True

In [3]:
# return xgboost dmatrix
def load_higgs():
    if os.path.isfile(dmatrix_train_filename) and os.path.isfile(dmatrix_test_filename):           
        dtrain = xgb.DMatrix(dmatrix_train_filename)
        dtest = xgb.DMatrix(dmatrix_test_filename)
        if dtrain.num_row() == train_rows and dtest.num_row() == test_rows:
            print("Loading cached dmatrix...")
            return dtrain, dtest

    if not os.path.isfile(csv_filename):
        print("Downloading higgs file...")
        urlretrieve(data_url, csv_filename)

    df_higgs_train = pd.read_csv(csv_filename, dtype=np.float32,
                                     nrows=train_rows, header=None)
    dtrain = xgb.DMatrix(df_higgs_train.loc[:, 1:29], df_higgs_train[0])
    dtrain.save_binary(dmatrix_train_filename)
    df_higgs_test = pd.read_csv(csv_filename, dtype=np.float32,
                                    skiprows=train_rows, nrows=test_rows, 
                                    header=None)
    dtest = xgb.DMatrix(df_higgs_test.loc[:, 1:29], df_higgs_test[0])
    dtest.save_binary(dmatrix_test_filename)

    return dtrain, dtest

In [4]:
dtrain, dtest = load_higgs()

In [5]:
datasnap = pd.read_csv(csv_filename, dtype=np.float32,
                                     nrows=10, header=None)

In [6]:
datasnap

Unnamed: 0,0,1,2,3,4,5,6,7,8,9,...,19,20,21,22,23,24,25,26,27,28
0,1.0,0.869293,-0.635082,0.22569,0.32747,-0.689993,0.754202,-0.248573,-1.092064,0.0,...,-0.010455,-0.045767,3.101961,1.35376,0.979563,0.978076,0.920005,0.721657,0.988751,0.876678
1,1.0,0.907542,0.329147,0.359412,1.49797,-0.31301,1.095531,-0.557525,-1.58823,2.173076,...,-1.13893,-0.000819,0.0,0.30222,0.833048,0.9857,0.978098,0.779732,0.992356,0.798343
2,1.0,0.798835,1.470639,-1.635975,0.453773,0.425629,1.104875,1.282322,1.381664,0.0,...,1.128848,0.900461,0.0,0.909753,1.10833,0.985692,0.951331,0.803252,0.865924,0.780118
3,0.0,1.344385,-0.876626,0.935913,1.99205,0.882454,1.786066,-1.646778,-0.942383,0.0,...,-0.678379,-1.360356,0.0,0.946652,1.028704,0.998656,0.728281,0.8692,1.026736,0.957904
4,1.0,1.105009,0.321356,1.522401,0.882808,-1.205349,0.681466,-1.070464,-0.921871,0.0,...,-0.373566,0.113041,0.0,0.755856,1.361057,0.98661,0.838085,1.133295,0.872245,0.808487
5,0.0,1.595839,-0.607811,0.007075,1.81845,-0.111906,0.84755,-0.566437,1.581239,2.173076,...,-0.654227,-1.274345,3.101961,0.823761,0.938191,0.971758,0.789176,0.430553,0.961357,0.957818
6,1.0,0.409391,-1.884684,-1.027292,1.672452,-1.604598,1.338015,0.055427,0.013466,2.173076,...,0.069496,1.37713,3.101961,0.869418,1.222083,1.000627,0.545045,0.698653,0.977314,0.828786
7,1.0,0.933895,0.62913,0.527535,0.238033,-0.966569,0.547811,-0.059439,-1.706866,2.173076,...,1.291248,-1.467454,0.0,0.901837,1.083671,0.979696,0.7833,0.849195,0.894356,0.774879
8,1.0,1.405144,0.536603,0.689554,1.179567,-0.110061,3.202405,-1.52696,-1.576033,0.0,...,-0.151202,1.163489,0.0,1.667071,4.039273,1.175828,1.045352,1.542972,3.534827,2.740754
9,1.0,1.176566,0.104161,1.397002,0.479721,0.265513,1.135563,1.534831,-0.253291,0.0,...,0.268541,0.530334,0.0,0.833175,0.773968,0.98575,1.103696,0.84914,0.937104,0.812364


# Train XGB with CPU

In [7]:
print("Training with CPU ...")
param = {'objective': 'binary:logitraw', 'eval_metric': 'error', 'silent': 1, 'tree_method': 'hist'}

tmp = time.time()
cpu_res = {}
xgb.train(param, dtrain, num_round, evals=[(dtest, "test")], evals_result=cpu_res)
cpu_time = time.time() - tmp
print("CPU Training Time: %s seconds" % (str(cpu_time)))

Training with CPU ...
[0]	test-error:0.529014
[1]	test-error:0.404738
[2]	test-error:0.36437
[3]	test-error:0.34097
[4]	test-error:0.331918
[5]	test-error:0.32475
[6]	test-error:0.319754
[7]	test-error:0.31552
[8]	test-error:0.311322
[9]	test-error:0.308886
[10]	test-error:0.307236
[11]	test-error:0.304846
[12]	test-error:0.303586
[13]	test-error:0.30259
[14]	test-error:0.301056
[15]	test-error:0.3004
[16]	test-error:0.29817
[17]	test-error:0.297258
[18]	test-error:0.29687
[19]	test-error:0.296264
[20]	test-error:0.29559
[21]	test-error:0.294236
[22]	test-error:0.293642
[23]	test-error:0.292816
[24]	test-error:0.29212
[25]	test-error:0.290954
[26]	test-error:0.290696
[27]	test-error:0.290382
[28]	test-error:0.289936
[29]	test-error:0.289404
[30]	test-error:0.28939
[31]	test-error:0.288796
[32]	test-error:0.288328
[33]	test-error:0.288092
[34]	test-error:0.287584
[35]	test-error:0.28711
[36]	test-error:0.287028
[37]	test-error:0.286914
[38]	test-error:0.286666
[39]	test-error:0.286422
[

# Train XGB With GPU

In [None]:
print("Training with Single GPU ...")
param['tree_method'] = 'gpu_hist'
tmp = time.time()
gpu_res = {}
xgb.train(param, dtrain, num_round, evals=[(dtest, "test")], evals_result=gpu_res)
gpu_time = time.time() - tmp
print("GPU Training Time: %s seconds" % (str(gpu_time)))

Training with Single GPU ...


# Train XGB With DASK Multi GPU Cluster 

In [None]:
def load_higgs_for_dask(client):
    # 1. read the CSV File using Pandas
    df_higgs_train = pd.read_csv(csv_filename, dtype=np.float32,
                                     nrows=train_rows, header=None).ix[:, 0:30]
    df_higgs_test = pd.read_csv(csv_filename, dtype=np.float32,
                                    skiprows=train_rows, nrows=test_rows, 
                                    header=None).ix[:, 0:30]

    # 2. Create a Dask Dataframe from Pandas Dataframe.
    ddf_higgs_train = dd.from_pandas(df_higgs_train, npartitions=8)
    ddf_higgs_test = dd.from_pandas(df_higgs_test, npartitions=8)
    ddf_y_train = ddf_higgs_train[0]
    del ddf_higgs_train[0]
    ddf_y_test = ddf_higgs_test[0]
    del ddf_higgs_test[0]
    
    #3. Create Dask DMatrix Object using dask dataframes
    xgb.DMatrix()
    ddtrain = xgb.dask.DaskDMatrix(client, ddf_higgs_train ,ddf_y_train)
    ddtest = xgb.dask.DaskDMatrix(client, ddf_higgs_test ,ddf_y_test)
    
    return ddtrain, ddtest

In [None]:
cluster = LocalCUDACluster()
client = Client(cluster)

In [None]:
ddtrain, ddtest = load_higgs_for_dask(client)

In [None]:
param = {}
param['objective'] = 'binary:logitraw'
param['eval_metric'] = 'error'
param['silence'] = 1
param['tree_method'] = 'gpu_hist'
param['nthread'] = 1

In [None]:
print("Training with Multiple GPUs ...")
tmp = time.time()
output = xgb.dask.train(client, param, ddtrain, num_boost_round=1000, evals=[(ddtest, 'test')])
multigpu_time = time.time() - tmp
print("Multi GPU Training Time: %s seconds" % (str(multigpu_time)))

In [None]:
bst = output['booster']
multigpu_res = output['history']

In [None]:
multigpu_res

# Results

In [None]:


gpu_iteration_time = [x / (num_round * 1.0) * gpu_time for x in range(0, num_round)]
cpu_iteration_time = [x / (num_round * 1.0) * cpu_time for x in range(0, num_round)]
multigpu_iteration_time = [x / (num_round * 1.0) * multigpu_time for x in range(0, num_round)]


In [None]:
import plotly.graph_objects as go

# Create random data with numpy
import numpy as np

min_error = min(min(gpu_res["test"][param['eval_metric']]), 
                min(cpu_res["test"][param['eval_metric']]),
                min(multigpu_res["test"]['error'])
               )

# Create traces
fig = go.Figure()


fig.add_trace(go.Scatter(x=cpu_iteration_time, y=cpu_res['test'][param['eval_metric']],
                    mode='lines', name='Intel(R) Core(TM) i9-9920X CPU (12 cores)'))
fig.add_trace(go.Scatter(x=gpu_iteration_time, y=gpu_res['test'][param['eval_metric']],
                    mode='lines',
                    name='Titan RTX'))
fig.add_trace(go.Scatter(x=multigpu_iteration_time, y= multigpu_res['test']['error'],
                    mode='lines',
                    name='2X Titan RTX'))
fig.update_yaxes(range=[.23, .35])
fig.update_xaxes(title_text='Time')
fig.update_yaxes(title_text='Error')

fig.add_trace(go.Scatter(x=cpu_iteration_time, y=[min_error for x in cpu_iteration_time] ,mode="lines", name='MinError'))

fig.show()

In [None]:
import plotly.graph_objects as go

# Create random data with numpy
import numpy as np

# Create traces
fig = go.Figure()


fig.add_trace(go.Scatter(x=gpu_iteration_time, y=gpu_res['test'][param['eval_metric']],
                    mode='lines',
                    name='Titan RTX'))
fig.add_trace(go.Scatter(x=multigpu_iteration_time, y= multigpu_res['test']['error'],
                    mode='lines',
                    name='2X Titan RTX'))
fig.update_yaxes(range=[.23, .35])
fig.update_xaxes(title_text='Time')
fig.update_yaxes(title_text='Error')


fig.add_trace(go.Scatter(x=gpu_iteration_time, y=[min_error for x in gpu_iteration_time] ,mode="lines", name='MinError'))

fig.show()