# Distributed Training of an XGBoost Model on Anyscale


<div align="left">
<a target="_blank" href="https://console.anyscale.com/"><img src="https://img.shields.io/badge/🚀 Run_on-Anyscale-9hf"></a>&nbsp;
<a href="https://github.com/anyscale/e2e-xgboost" role="button"><img src="https://img.shields.io/static/v1?label=&amp;message=View%20On%20GitHub&amp;color=586069&amp;logo=github&amp;labelColor=2f363d"></a>&nbsp;
</div>

In this tutorial, we'll execute a distributed training workload that will connect the following heterogenous workloads:
- preprocess the dataset prior to training with Ray Data
- distributed training with Ray Train
- save model artifacts to a model registry (MLFlow)

**Note**: we won't be tuning our model in this tutorial but be sure to check out [Ray Tune](https://docs.ray.io/en/latest/tune/index.html) for experiment execution and hyperparameter tuning at any scale.

<img src="https://raw.githubusercontent.com/anyscale/e2e-xgboost/refs/heads/main/images/distributed_training.png" width=800>


To run this tutorial, we need to install the dependencies:

In [1]:
%load_ext autoreload
%autoreload all

In [2]:
# Ensure the requirements are installed
! pip install -qU -r ../requirements.txt

In [3]:
# enable importing from dist_xgboost module
import os
import sys

sys.path.append(os.path.abspath(".."))

In [4]:
# Enable Ray Train v2. This will be the default in an upcoming release.
os.environ["RAY_TRAIN_V2_ENABLED"] = "1"
# now it's safe to import from ray.train

In [5]:
import ray

from dist_xgboost.constants import preprocessor_path, local_storage_path

In [6]:
# make ray data less verbose
ray.data.DataContext.get_current().enable_progress_bars = False
ray.data.DataContext.get_current().print_on_execution_start = False

Next we define a function to load our train, validation, and test datasets. For this example, we are using the ["Breast Cancer Wisconsin (Diagnostic)"](https://archive.ics.uci.edu/dataset/17/breast+cancer+wisconsin+diagnostic) dataset.

We do this by first splitting a random 70% for the train subset, then dividing the remaining samples in half between the test set and validation set.

In [7]:
from ray.data import Dataset


def prepare_data() -> tuple[Dataset, Dataset, Dataset]:
    """Load and split the dataset into train, validation, and test sets."""
    dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")
    seed = 42
    train_dataset, rest = dataset.train_test_split(
        test_size=0.3, shuffle=True, seed=seed
    )
    # 15% for validation, 15% for testing
    valid_dataset, test_dataset = rest.train_test_split(
        test_size=0.5, shuffle=True, seed=seed
    )
    return train_dataset, valid_dataset, test_dataset

In [None]:
# Load and split the dataset
train_dataset, valid_dataset, _test_dataset = prepare_data()
train_dataset.take(1)

We see from the output that we have a tabular dataset [characterizing cell nuclei in breast mass](https://minds.wisconsin.edu/bitstream/handle/1793/59692/TR1131.pdf), such as radius, concavity, symmetry, etc.

## How to preprocess data for training?

Notice that the features have different magnitudes and ranges. This is stricly a problem for tree-based methods, however in some cases it can [improve numerical stability](https://stats.stackexchange.com/a/485681/80433).

Ray Data offers built-in preprocessors that simplify common feature preprocessing tasks especially for tabular data.
These can be seamlessly integrated with Ray Datasets, allowing you to preprocess your data in a fault-tolerant and distributed way before training.

In this example, we use Ray's built-in StandardScaler to zero-center and z-score normalize the columns of our dataset. We fit the preprocessor using the train subset, and save it so that we can pre-process data when we deploy our model to production.

In [None]:
from ray.data.preprocessors import StandardScaler

# pick some dataset columns to scale
columns_to_scale = [c for c in train_dataset.columns() if c != "target"]

# Initialize the preprocessor
preprocessor = StandardScaler(columns=columns_to_scale)
# train the preprocessor on the training set
preprocessor.fit(train_dataset)

Now that we've fit the preprocessor, let's save it to a file. Later, we will register this artifact in MLFlow so that we can reuse it in downstream pipelines.

In [10]:
import pickle

with open(preprocessor_path, "wb") as f:
    pickle.dump(preprocessor, f)

Now that we have our preprocessor fitted, we can use it to transform our data. Note that this `transform()` operation is lazy; it won't be applied to the data until it is required by the train workers.

In [None]:
train_dataset = preprocessor.transform(train_dataset)
valid_dataset = preprocessor.transform(valid_dataset)
train_dataset.take(1)

Using `take()`, we can see that the values are now zero-centered and rescaled to be roughly between -1 and 1.

Optionally, at this stage we could run the preprocessing step and save the intermediates using `ds.write_parquet(output_path)`. For the purposes of this example, we will just process the dataset in-memory in a streaming fashion.

<div class="alert alert-block alert"> <b> Data Processing</b> 

Be sure to checkout this extensive guide on [data loading and preprocessing](https://docs.ray.io/en/latest/train/user-guides/data-loading-preprocessing.html) for the last-mile preprocessing we'll need to do prior to training our models. However, Ray Data does support performant joins, filters, aggregations, etc. for the more structure data processing your workloads may need.

## Save and load XGBoost checkpoints

Checkpointing is a powerful feature.
It is particularly useful for long-running training sessions, as it enables you to resume training from the last checkpoint in case of interruptions.

[`XGBoostTrainer`](https://docs.ray.io/en/latest/train/api/doc/ray.train.xgboost.XGBoostTrainer.html#ray.train.xgboost.XGBoostTrainer) implements checkpointing out of the box. These checkpoints can be loaded into memory
using static methods [`XGBoostTrainer.get_model`](https://docs.ray.io/en/latest/train/api/doc/ray.train.xgboost.XGBoostTrainer.get_model.html#ray.train.xgboost.XGBoostTrainer.get_model).

The only required change is to configure [`CheckpointConfig`](https://docs.ray.io/en/latest/train/api/doc/ray.train.CheckpointConfig.html#ray.train.CheckpointConfig) to set the checkpointing frequency. For example, the following configuration
saves a checkpoint on every boosting round and only keeps the latest checkpoint.

In [12]:
from ray.train import CheckpointConfig, RunConfig, ScalingConfig, Result


# Configure checkpointing to save progress during training
run_config = RunConfig(
    checkpoint_config=CheckpointConfig(
        # Checkpoint every 10 iterations.
        checkpoint_frequency=10,
        # Only keep the latest checkpoint.
        num_to_keep=1,
    ),
    ## If running in a multi-node cluster, this is where you
    ## should configure the run's persistent storage that is accessible
    ## across all worker nodes with `storage_path="s3://..."`
    storage_path=local_storage_path,
)

:::{tip} Once you enable checkpointing, you can follow [this guide](https://docs.ray.io/en/latest/train/user-guides/fault-tolerance.html#train-fault-tolerance) to enable fault tolerance. :::

## Basic training with tree-based models in Train

Just as in the original [`xgboost.train()`](https://xgboost.readthedocs.io/en/stable/parameter.html) function, the training parameters are passed as the `params` dictionary.

### XGBoost Example

In [13]:
import xgboost

from ray.train.xgboost import RayTrainReportCallback
from ray.train.xgboost import XGBoostTrainer


def train_fn_per_worker(config: dict):
    # Get this worker's dataset shard convert
    train_ds, val_ds = (
        ray.train.get_dataset_shard("train"),
        ray.train.get_dataset_shard("validation"),
    )

    train_ds = train_ds.materialize().to_pandas()
    val_ds = val_ds.materialize().to_pandas()

    # Separate the labels from the features
    train_X, train_y = train_ds.drop("target", axis=1), train_ds["target"]
    eval_X, eval_y = val_ds.drop("target", axis=1), val_ds["target"]

    # Convert the data into a DMatrix
    dtrain = xgboost.DMatrix(train_X, label=train_y)
    deval = xgboost.DMatrix(eval_X, label=eval_y)

    # Do distributed data-parallel training.
    # Ray Train sets up the necessary coordinator processes and
    # environment variables for your workers to communicate with each other.
    # it also handles checkpointing via the `RayTrainReportCallback`
    _booster = xgboost.train(
        config["xgboost_params"],
        dtrain=dtrain,
        evals=[(dtrain, "train"), (deval, "validation")],
        num_boost_round=10,
        callbacks=[RayTrainReportCallback()],
    )


# Params that will be passed to the base XGBoost model.
model_config = {
    "xgboost_params": {
        "objective": "binary:logistic",
        "eval_metric": ["logloss", "error"],
    }
}

trainer = XGBoostTrainer(
    train_fn_per_worker,
    train_loop_config=model_config,
    # Register the data subsets.
    datasets={"train": train_dataset, "validation": valid_dataset},
    # see "How to scale out training?" for more details
    scaling_config=ScalingConfig(
        # Number of workers to use for data parallelism.
        num_workers=5,
        # Whether to use GPU acceleration. Set to True to schedule GPU workers.
        use_gpu=True,
    ),
    run_config=run_config,
)

<div class="alert alert-block alert"> <b> Minimal change to your training code</b> 

You'll notice that there isn't much new Ray Train code on top of our base XGboost code. We specified how we want to scale out our training workload, load the Ray datasets and then checkpoint on our main worker node... and that's it! 

Check out these guides this extensive list of [Ray Train user guides](https://docs.ray.io/en/latest/train/user-guides.html).

<div class="alert alert-block alert"> <b> Ray Train</b> 

**🎛️ Multi-node orchestration made easy**

- Ray Train automatically handles multi-node, multi-GPU setup with no manual SSH setup or hostfile configs. 
- And it also integrates with Ray's cluster launcher for cloud (AWS, GCP, K8s) and on-prem clusters. 
- Solutions like PyTorch DDP require manually setting up your own process group, ranks, networking, etc.

**🩹 2. Built-in fault tolerance**
- Ray Train supports automatic retry of failed workers.
- and can continue training from the last checkpoint in case of failure.


**✂️ 3. Flexible training strategies** (not just DDP)
- Ray Train supports Data Parallel, Model Parallel, Parameter Server, and even custom strategies.
- You can also use Torch DDP, FSPD, DeepSpeed, etc. under the hood if you want.
- [Ray Compiled graphs](https://docs.ray.io/en/latest/ray-core/compiled-graph/ray-compiled-graph.html) allow us to even define different parallelism for jointly optimizing multipe models (Megatron, Deepspeed, etc. only allow for one global setting).

**🔥 Better support for heterogeneous clusters**
- Ray Train lets you define per-worker resource requirements (e.g., 2 CPUs and 1 GPU per worker).
- and can run on heterogeneous machines and scale flexibly (e.g., CPU for preprocessing and GPU for training)

**🌍 Integrations**

<img src="https://raw.githubusercontent.com/anyscale/foundational-ray-app/refs/heads/main/images/train_integrations.png" width=500>

[RayTurbo Train](https://docs.anyscale.com/rayturbo/rayturbo-train) offers even more improvement to the price-performance ratio, performance monitoring and more:
- **elastic training** to scale to a dynamic number of workers, continue training on fewer resources (even on spot instances).
- **purpose-built dashboard** designed to streamline the debugging of Ray Train workloads
    - Monitoring: View the status of training runs and train workers.
    - Metrics: See insights on training throughput, training system operation time.
    - Profiling: Investigate bottlenecks, hangs, or errors from individual training worker processes.

<img src="https://raw.githubusercontent.com/anyscale/foundational-ray-app/refs/heads/main/images/train_dashboard.png" width=700>

Finally, we can train our model:

In [None]:
result: Result = trainer.fit()
result

Ray Train returns a [`ray.train.Result`](https://docs.ray.io/en/latest/train/api/doc/ray.train.Result.html) object, which contains a few useful properties such as 'metrics', 'checkpoint', 'error', 'path', 'metrics_dataframe', 'best_checkpoints':

In [None]:
metrics = result.metrics
metrics

This should output something like:

```python
OrderedDict([('train-logloss', 0.05463397157248817),
             ('train-error', 0.00506329113924051),
             ('validation-logloss', 0.06741214815308066),
             ('validation-error', 0.01176470588235294)])
```

We see that the Ray Train logged metrics based on the values we configured in `eval_metric` and `evals`.

Ray Train also automatically stored model checkpoints in the `result.checkpoint` directory. We can use `RayTrainReportCallback` to re-create our booster, which will become handy in the next few guides.

In [None]:
booster = RayTrainReportCallback.get_model(result.checkpoint)
booster

# Model registry

We'll be creating a model registry in our [Anyscale user storage](https://docs.anyscale.com/configuration/storage/#user-storage) to save our model checkpoints to. We'll be using OSS mlflow but we can easily [set up other experiment trackers](https://docs.ray.io/en/latest/train/user-guides/experiment-tracking.html) with Ray.

In [None]:
import mlflow
from dist_xgboost.constants import experiment_name
import shutil
from tempfile import TemporaryDirectory
from dist_xgboost.constants import (
    model_registry,
    model_fname,
    preprocessor_fname,
)

# clean up old runs
os.path.isdir(model_registry) and shutil.rmtree(model_registry)
# mlflow.delete_experiment(experiment_name)
os.makedirs(model_registry, exist_ok=True)


# create a model registry in our user storage
mlflow.set_tracking_uri(f"file:{model_registry}")

# create a new experiment and log metrics and artifacts
mlflow.set_experiment(experiment_name)
with mlflow.start_run(description="xgboost breast cancer classifier on all features"):
    mlflow.log_params(model_config)
    mlflow.log_metrics(metrics)

    # Selectively log just the preprocessor and model weights
    with TemporaryDirectory() as tmp_dir:
        shutil.copy(
            os.path.join(result.checkpoint.path, model_fname),
            os.path.join(tmp_dir, model_fname),
        )
        shutil.copy(
            preprocessor_path,
            os.path.join(tmp_dir, preprocessor_fname),
        )

        mlflow.log_artifacts(tmp_dir)

We can view our experiment metrics and model artifacts in our model registry. We're using OSS mlflow so we can run the server by pointing to our model registry location:

In [None]:
! mlflow server -h 0.0.0.0 -p 8080 --backend-store-uri {model_registry}

We can view the dashboard by going to the **Overview tab** up top → **Open Ports** → `8080`.

<img src="https://raw.githubusercontent.com/anyscale/e2e-xgboost/refs/heads/main/images/mlflow.png" width=685>

We also have our Ray Dashboard and Train workfload specific dashboards above. 

<img src="https://raw.githubusercontent.com/anyscale/e2e-xgboost/refs/heads/main/images/train_metrics.png" width=700>

In [None]:
from dist_xgboost.data import get_best_model_from_registry

best_model, artifacts_dir = get_best_model_from_registry()

And we can easily wrap our training workload as a production grade [Anyscale Job](https://docs.anyscale.com/platform/jobs/) ([API ref](https://docs.anyscale.com/reference/job-api/))

**Note**: 
- we're using a `containerfile` to define our dependencies, but we could easily use a pre-built image as well.
- we can specify the compute as a [compute config](https://docs.anyscale.com/configuration/compute-configuration/) or inline in a [job config](https://docs.anyscale.com/reference/job-api#job-cli) file.
- when we don't specify compute and when launching from a workspace, this defaults to the compute configuration of the Workspace.

In [None]:
%%bash
# Production batch job
# FIXME use relative paths
anyscale job submit --name=train-xboost-breast-cancer-model \
  --containerfile="../containerfile" \
  --working-dir="/home/ray/default" \
  --exclude="" \
  --max-retries=0 \
  -- python dist_xgboost/train.py

## How to scale out training?

One of the key advantages of using Ray Train is its ability to effortlessly scale your training workloads.
By adjusting the [`ScalingConfig`](https://docs.ray.io/en/latest/train/api/doc/ray.train.ScalingConfig.html#ray.train.ScalingConfig),
you can optimize resource utilization and reduce training time, making it ideal for large-scale machine learning tasks.

:::{note}
Ray Train doesn’t modify or otherwise alter the working of the underlying XGBoost or LightGBM distributed training algorithms. Ray only provides orchestration, data ingest and fault tolerance. For more information on GBDT distributed training, refer to [XGBoost documentation](https://xgboost.readthedocs.io/en/stable/) and [LightGBM documentation](https://lightgbm.readthedocs.io/en/latest/).
:::

### Multi-node CPU Example

Setup: 4 nodes with 8 CPUs each.

Use-case: To utilize all resources in multi-node training.

```python
scaling_config = ScalingConfig(
    num_workers=4,
    resources_per_worker={"CPU": 8},
)
```

### Single-node multi-GPU Example

Setup: 1 node with 8 CPUs and 4 GPUs.

Use-case: If you have a single node with multiple GPUs, you need to use
distributed training to leverage all GPUs.

```python
scaling_config = ScalingConfig(
    num_workers=4,
    use_gpu=True,
)
```

### Multi-node multi-GPU Example

Setup: 4 nodes with 8 CPUs and 4 GPUs each.

Use-case: If you have multiple nodes with multiple GPUs, you need to
schedule one worker per GPU.

```python
scaling_config = ScalingConfig(
    num_workers=16,
    use_gpu=True,
)
```

Note that you just have to adjust the number of workers. Ray handles everything else automatically.

::: {warning}
Specifying a *shared storage location* (such as cloud storage or NFS) is *optional* for single-node clusters, but it is **required for multi-node clusters**. Using a local path will [raise an error](https://docs.ray.io/en/latest/train/user-guides/persistent-storage.html#multinode-local-storage-warning) during checkpointing for multi-node clusters.

```python
trainer = XGBoostTrainer(
    ..., run_config=ray.train.RunConfig(storage_path="s3://...")
)
```
:::

## How many remote actors should you use?

This depends on your workload and your cluster setup. Generally there is no inherent benefit of running more than one remote actor per node for CPU-only training. This is because XGBoost can already leverage multiple CPUs with threading.

However, in some cases, you should consider some starting more than one actor per node:

For **multi GPU training**, each GPU should have a separate remote actor. Thus, if your machine has 24 CPUs and 4 GPUs, you want to start 4 remote actors with 6 CPUs and 1 GPU each

In a **heterogeneous cluster**, you might want to find the [greatest common divisor](https://en.wikipedia.org/wiki/Greatest_common_divisor) for the number of CPUs. For example, for a cluster with three nodes of 4, 8, and 12 CPUs, respectively, you should set the number of actors to 6 and the CPUs per actor to 4.

## How to use GPUs for training?

Ray Train enables multi-GPU training for XGBoost and LightGBM. The core backends automatically leverage NCCL2 for cross-device communication. All you have to do is to start one actor per GPU and set GPU-compatible parameters. For example, XGBoost’s `tree_method` to `gpu_hist`. See XGBoost documentation for more details.

For instance, if you have 2 machines with 4 GPUs each, you want to start 8 workers, and set `use_gpu=True`. There is usually no benefit in allocating less (for example, 0.5) or more than one GPU per actor.

You should divide the CPUs evenly across actors per machine, so if your machines have 16 CPUs in addition to the 4 GPUs, each actor should have 4 CPUs to use.

```python
trainer = XGBoostTrainer(
    scaling_config=ScalingConfig(
        # Number of workers to use for data parallelism.
        num_workers=2,
        # Whether to use GPU acceleration.
        use_gpu=True,
    ),
    params={
        # XGBoost specific params
        "tree_method": "gpu_hist",
        "eval_metric": ["logloss", "error"],
    },
    ...
)
```


## How to optimize XGBoost memory usage?

XGBoost uses a compute-optimized data structure called `DMatrix` to store training data.
However, converting a dataset to a `DMatrix` involves storing a complete copy of the data
as well as intermediate conversions.
On a 64-bit system the format is 64-bit floats. Depending on the system and original dataset dtype, 
this matrix can thus occupy more memory than the original dataset.

The **peak memory usage** for CPU-based training is at least 3x the dataset size, assuming dtype `float32` on a 64-bit system, plus about **400,000 KiB** for other resources, like operating system requirements and storing of intermediate results.

### Example

- Machine type: AWS m5.xlarge (4 vCPUs, 16 GiB RAM)
- Usable RAM: ~15,350,000 KiB
- Dataset: 1,250,000 rows with 1024 features, dtype float32. Total size: 5,000,000 KiB
- XGBoost DMatrix size: ~10,000,000 KiB

This dataset fits exactly on this node for training.

Note that the DMatrix size might be lower on a 32 bit system.

### GPUs

Generally, the same memory requirements exist for GPU-based training. Additionally, the GPU must have enough memory to hold the dataset.

In the preceding example, the GPU must have at least 10,000,000 KiB (about 9.6 GiB) memory. However, empirical data shows that using a `DeviceQuantileDMatrix` seems to result in more peak GPU memory usage, possibly for intermediate storage when loading data (about 10%).

### Best practices

In order to reduce peak memory usage, consider the following suggestions:

- Store data as `float32` or less. You often don’t need more precision is often, and keeping data in a smaller format helps reduce peak memory usage for initial data loading.
- Pass the `dtype` when loading data from CSV. Otherwise, floating point values are loaded as `np.float64` per default, increasing peak memory usage by 33%.