In [None]:
import numpy as np
import pandas as pd
import glob

from sklearn.datasets import make_classification, make_regression
from sklearn.model_selection import train_test_split

import time
import tqdm
import xgboost as xgb
from xgboost.callback import TrainingCallback

def create_learnable_data(num_rows,
                          num_cols,
                          num_classes,
                          target_accuracy,
                          seed=1234,
                          split=True):
    """Create a synthetic dataset and return a pandas df."""
    seed = int(seed)
    np.random.seed(seed)

    num_rows = int(num_rows)
    num_cols = int(num_cols)
    num_classes = int(num_classes)
    target = float(target_accuracy)

    if num_classes > 0:
        x, y = make_classification(
            n_samples=num_rows,
            n_features=num_cols,
            n_informative=num_cols // 2,
            n_redundant=num_cols // 10,
            n_repeated=0,
            n_classes=num_classes,
            n_clusters_per_class=2,
            flip_y=1 - target,
            random_state=seed,
        )
    else:
        x, y = make_regression(
            n_samples=num_rows,
            n_features=num_cols,
            n_informative=num_cols // 2,
            n_targets=1,
            noise=0.1,
            random_state=seed,
        )

    data = pd.DataFrame(x, columns=[f"feature_{i}" for i in range(num_cols)])
    data["labels"] = pd.Series(y, dtype="bool")

    return train_test_split(
        data, test_size=0.25, random_state=seed,
        stratify=data["labels"]) if split else data


def get_parquet_files(path, size=10):
    """Get all parquet parts from a directory."""
    size *= 10
    files = sorted(glob.glob(path))
    while size > len(files):
        files = files + files
    files = files[0:size]
    return files

def load_parquet_dataset(files):
    """Load all parquet files into a pandas df."""
    df = pd.read_parquet(files[0])
    for i in tqdm.tqdm(range(1, len(files), 50)):
        df = pd.concat((df, pd.read_parquet(files[i:i+50])))
        memory_usage = df.memory_usage(deep=True).sum()/1e9
        tqdm.tqdm.write(f"Dataset size: {memory_usage} GB")
        if memory_usage > 12:
            raise MemoryError(f"Dataset too big to fit into memory!")
    return df

class TqdmCallback(TrainingCallback):
    """Simple callback to print a progress bar"""
    def __init__(self, num_samples: int) -> None:
        self.num_samples = num_samples
        super().__init__()

    def before_training(self, model):
        if xgb.rabit.get_rank() == 0:
            self.pbar = tqdm.tqdm(total=self.num_samples)
        return model

    def after_iteration(self, model, epoch, evals_log):
        if xgb.rabit.get_rank() == 0:
            self.pbar.update(1)

    def after_training(self, model):
        if xgb.rabit.get_rank() == 0:
            self.pbar.close()
        return model

data_path = f"/home/ubuntu/data/classification.parquet/**/*.parquet"

In [None]:
# Get two pandas dataframes
train_df, test_df = create_learnable_data(num_rows=1000000,
                                            num_cols=40,
                                            num_classes=2,
                                            target_accuracy=0.8,
                                            seed=1234)

# XGBoost config.
xgboost_params = {
    "tree_method": "approx",
    "objective": "binary:logistic",
    "eval_metric": ["logloss", "error"],
}

from xgboost import DMatrix, train

def train_xgboost(config, train_df, test_df, progress_bar=True):
    target_column = "labels"

    train_x = train_df.drop(target_column, axis=1)
    train_y = train_df[target_column]
    test_x = test_df.drop(target_column, axis=1)
    test_y = test_df[target_column]

    train_set = DMatrix(train_x, train_y)
    test_set = DMatrix(test_x, test_y)

    evals_result = {}

    start_time = time.time()
    # Train the classifier
    bst = train(params=config,
                dtrain=train_set,
                evals=[(test_set, "eval")],
                evals_result=evals_result,
                verbose_eval=False,
                num_boost_round=25,
                callbacks=[TqdmCallback(25)] if progress_bar else [])
    print(f"Total time taken: {time.time()-start_time}")

    model_path = "model.xgb"
    bst.save_model(model_path)
    print("Final validation error: {:.4f}".format(
        evals_result["eval"]["error"][-1]))

In [None]:
train_xgboost(xgboost_params, train_df, test_df)

In [None]:
from xgboost_ray import RayDMatrix, train, RayParams

def train_xgboost_ray(config, train_df, test_df, ray_params, progress_bar=True):
    target_column = "labels"

    train_set = RayDMatrix(train_df, target_column)
    test_set = RayDMatrix(test_df, target_column)

    evals_result = {}

    start_time = time.time()
    # Train the classifier
    bst = train(params=config,
                dtrain=train_set,
                evals=[(test_set, "eval")],
                evals_result=evals_result,
                verbose_eval=False,
                num_boost_round=25,
                callbacks=[TqdmCallback(25)] if progress_bar else [],
                ray_params=ray_params)
    print(f"Total time taken: {time.time()-start_time}")

    model_path = "model.xgb"
    bst.save_model(model_path)
    print("Final validation error: {:.4f}".format(
        evals_result["eval"]["error"][-1]))

In [None]:
train_xgboost_ray(xgboost_params, train_df, test_df, RayParams(num_actors=2))

In [None]:
import ray

ray.shutdown()
ray.init(address='ray://13.52.180.135:10001')

In [None]:
@ray.remote
def train_xgboost_ray_cluster():
    train_files, test_files = create_learnable_data(num_rows=1000000,
                                                num_cols=40,
                                                num_classes=2,
                                                target_accuracy=0.8,
                                                seed=1234)
    # XGBoost config.
    xgboost_params = {
        "tree_method": "approx",
        "objective": "binary:logistic",
        "eval_metric": ["logloss", "error"],
    }
    train_xgboost_ray(xgboost_params, train_files, test_files, RayParams(num_actors=8))
    return xgb.Booster(model_file="model.xgb")

In [None]:
bst = ray.get(train_xgboost_ray_cluster.remote())
bst.save_model("model.xgb")

In [None]:
def infer_xgboost(inference_df, bst):
    inference_df = DMatrix(inference_df)

    results = bst.predict(inference_df)

total_time = time.time()

bst = xgb.Booster(model_file="model.xgb")
files = get_parquet_files(data_path, size=30)
df = load_parquet_dataset(files).drop(["labels", "partition"], axis=1)
infer_xgboost(df, bst)

print(f"Total time taken: {time.time()-total_time}")

In [None]:
from xgboost_ray import predict

def infer_xgboost_ray(inference_df, bst):
    bst.feature_names = sorted(bst.feature_names)
    inference_df = RayDMatrix(inference_df, ignore=["labels", "partition"])

    predict(bst, inference_df, ray_params=RayParams(num_actors=8))

In [None]:
@ray.remote
def infer_xgboost_ray_cluster(bst):
    files = get_parquet_files(data_path, size=30)
    infer_xgboost_ray(files, bst)

total_time = time.time()
ray.get(infer_xgboost_ray_cluster.remote(bst))
print(f"Total time taken: {time.time()-total_time}")

In [None]:
from ray import tune

@ray.remote
def tune_xgboost():
    train_df, test_df = create_learnable_data(num_rows=1000000,
                                                num_cols=40,
                                                num_classes=2,
                                                target_accuracy=0.8,
                                                seed=1234)

    # Set XGBoost config.
    config = {
        "tree_method": "approx",
        "objective": "binary:logistic",
        "eval_metric": ["logloss", "error"],
        "eta": tune.loguniform(1e-4, 1e-1),
        "subsample": tune.uniform(0.5, 1.0),
        "max_depth": tune.randint(1, 9)
    }

    ray_params = RayParams(
        max_actor_restarts=1,
        gpus_per_actor=0,
        cpus_per_actor=2,
        num_actors=8)

    analysis = tune.run(
        tune.with_parameters(train_xgboost_ray, train_df=train_df, test_df=test_df, ray_params=ray_params, progress_bar=False),
        # Use the `get_tune_resources` helper function to set the resources.
        resources_per_trial=ray_params.get_tune_resources(),
        config=config,
        num_samples=16,
        metric="eval-error",
        mode="min",
        verbosity=1)

    accuracy = 1. - analysis.best_result["eval-error"]
    print(f"Best model parameters: {analysis.best_config}")
    print(f"Best model total accuracy: {accuracy:.4f}")
    return analysis.best_config

In [None]:
ray.get(tune_xgboost.remote())