In [None]:
import cudf
import dask_cudf
import xgboost as xgb

import time

from dask_cuda import LocalCUDACluster
from dask.distributed import Client

In [None]:
train_data_dir = "/datasets/criteo/crit_orig_pq_1day"
test_data_dit = "/datasets/criteo/crit_orig_pq_1day"

In [None]:
cluster = LocalCUDACluster(
    n_workers=16,
    protocol="ucx",
    jit_unspill=True,
    rmm_pool_size="28GiB",
    device_memory_limit="30GB",
)
client = Client(cluster)
client

In [None]:
train_ddf = dask_cudf.read_parquet(train_data_dir, split_row_groups=True)

In [None]:
def transform_df(ddf):
    # ddf = ddf.fillna(0)

    for n in range(1,27):
        col = "C" + str(n)
        ddf[col] = ddf[col].astype("category")
    return ddf

In [None]:
train_ddf = tranform_df(train_ddf)
X_train, y_train = train_ddf.drop("label", axis=1), train_ddf["label"]
del train_ddf

In [None]:
dtrain = xgb.dask.DaskDMatrix(client, X_train, y_train, enable_categorical=True)
del X_train
del y_train

In [None]:
params = {
    "tree_method": "gpu_hist",
    "objective": "binary:logistic",
    "max_cat_to_onehot": 1
}

In [None]:
t0 = time.time()
output = xgb.dask.train(
        client,
        params,
        dtrain,
        num_boost_round=4,
        evals=[(dtrain, "train")],
)
t1 = time.time()
print("Training Time: {}".format(t1-t0))

In [None]:
del dtrain

In [None]:
test_ddf = dask_cudf.read_parquet(test_data_dir, split_row_groups=True)

In [None]:
test_ddf = transform_df(test_ddf)
X_test, y_test = test_ddf.drop("label", axis=1), test_ddf["label"]
del test_ddf

In [None]:
from cuml.metrics import accuracy_score, log_loss

y_test_pred_prob = xgb.dask.inplace_predict(client, output, X_test)
y_test_pred_val = y_test_pred_prob>=0.5

test_acc = accuracy_score(y_test.compute(), y_test_pred_val.compute())
test_log_loss = log_loss(y_test.compute(), y_test_pred_prob.compute())

print("Test Accuracy: {}".format(test_acc))
print("Test Log Loss: {}".format(test_log_loss))