# Ray AI Libraries: An example of using Ray data, Ray Train, Ray Tune, Ray Serve to implement a XGBoost regression model



<b> Here is the roadmap for this notebook:</b>
<ul>
    <li>Overview of the Ray AI Libraries</li>
    <li>Quick end-to-end example</li>
    <ul>
      <li>Vanilla XGBoost code</li>
      <li>Hyperparameter tuning with Ray Tune</li>
      <li>Distributed training with Ray Train</li>
      <li>Serving an ensemble model with Ray Serve</li>
      <li>Batch inference with Ray Data</li>
    </ul>
</ul>
</div>

In [None]:
!apt-get update
!apt-get install libomp-dev

0% [Working]            Hit:1 http://archive.ubuntu.com/ubuntu jammy InRelease
0% [Waiting for headers] [Connecting to security.ubuntu.com (185.125.190.82)] [                                                                               Get:2 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
0% [2 InRelease 50.4 kB/128 kB 39%] [Connecting to security.ubuntu.com (185.1250% [Waiting for headers] [Waiting for headers] [Connected to cloud.r-project.or                                                                               Get:3 http://archive.ubuntu.com/ubuntu jammy-backports InRelease [127 kB]
                                                                               Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
0% [3 InRelease 48.9 kB/127 kB 38%] [Waiting for headers] [Connected to cloud.r0% [Waiting for headers] [Connected to cloud.r-project.org (3.171.85.123)] [Con                                    

In [None]:
!python -m pip install "ray[data,train,tune,serve]"

Collecting ray[data,serve,train,tune]
  Downloading ray-2.51.1-cp312-cp312-manylinux2014_x86_64.whl.metadata (21 kB)
Collecting click!=8.3.0,>=7.0 (from ray[data,serve,train,tune])
  Downloading click-8.2.1-py3-none-any.whl.metadata (2.5 kB)
Collecting virtualenv!=20.21.1,>=20.0.24 (from ray[data,serve,train,tune])
  Downloading virtualenv-20.35.4-py3-none-any.whl.metadata (4.6 kB)
Collecting opencensus (from ray[data,serve,train,tune])
  Downloading opencensus-0.11.4-py2.py3-none-any.whl.metadata (12 kB)
Collecting opentelemetry-exporter-prometheus (from ray[data,serve,train,tune])
  Downloading opentelemetry_exporter_prometheus-0.59b0-py3-none-any.whl.metadata (2.1 kB)
Collecting aiohttp_cors (from ray[data,serve,train,tune])
  Downloading aiohttp_cors-0.8.1-py3-none-any.whl.metadata (20 kB)
Collecting watchfiles (from ray[data,serve,train,tune])
  Downloading watchfiles-1.1.1-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.9 kB)
Collecting colorful (from ray[d

In [None]:
import asyncio
import fastapi
import pandas as pd
import requests
# macos: If you get an XGBoostError at import, you might have to `brew install libomp` before importing xgboost again
import xgboost
from pydantic import BaseModel
from sklearn.model_selection import train_test_split

import ray
import ray.tune
import ray.train
from ray.train.xgboost import XGBoostTrainer as RayTrainXGBoostTrainer
from ray.train import RunConfig
import ray.data
import ray.serve

## 1. Overview of the Ray AI Libraries

<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Ray_AI_Libraries/Ray+AI+Libraries.png" width="700px" loading="lazy">

Built on top of Ray Core, the Ray AI Libraries inherit all the performance and scalability benefits offered by Core while providing a convenient abstraction layer for machine learning. These Python-first native libraries allow ML practitioners to distribute individual workloads, end-to-end applications, and build custom use cases in a unified framework.

The Ray AI Libraries bring together an ever-growing ecosystem of integrations with popular machine learning frameworks to create a common interface for development.

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Introduction_to_Ray_AIR/e2e_air.png" width="100%" loading="lazy">|
|:-:|
|Ray AI Libraries enable end-to-end ML development and provides multiple options for integrating with other tools and libraries from the MLOps ecosystem.|



## 2. Quick end-to-end example

For this classification task, we will apply a simple [XGBoost](https://xgboost.readthedocs.io/en/stable/) (a gradient boosted trees framework) model to the June 2021 [New York City Taxi & Limousine Commission's Trip Record Data](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page).

The full dataset contains millions of samples of yellow cab rides, and the goal is to predict the tip amount.

**Dataset features**
* **`passenger_count`**
    * Float (whole number) representing number of passengers.
* **`trip_distance`**
    * Float representing trip distance in miles.
* **`fare_amount`**
    * Float representing total price including tax, tip, fees, etc.
* **`tolls_amount`**
    * Float representing the total paid on tolls if any.

**Target**
* **`tip_amount`**
    * Float representing the total paid as tips

In [None]:
features = [
    "passenger_count",
    "trip_distance",
    "fare_amount",
    "tolls_amount",
]

label_column = "tip_amount"

In [None]:
def load_data():
    path = "s3://anyscale-public-materials/nyc-taxi-cab/yellow_tripdata_2021-03.parquet"
    df = pd.read_parquet(path, columns=features + [label_column])
    X_train, X_test, y_train, y_test = train_test_split(
        df[features], df[label_column], test_size=0.2, random_state=42
    )
    dtrain = xgboost.DMatrix(X_train, label=y_train)
    dtest = xgboost.DMatrix(X_test, label=y_test)
    return dtrain, dtest

In [None]:
storage_folder = "/content/cluster_storage"

In [None]:
from pathlib import Path
model_path = Path(storage_folder) / "model.ubj"

def my_xgboost_func(params):
    evals_result = {}
    dtrain, dtest = load_data()
    bst = xgboost.train(
        params,
        dtrain,
        num_boost_round=10,
        evals=[(dtest, "eval")],
        evals_result=evals_result,
    )
    # Use Path
    bst.save_model(model_path)
    print(f"{evals_result['eval']}")
    return {"eval-rmse": evals_result["eval"]["rmse"][-1]}

params = {
    "objective": "reg:squarederror",
    "eval_metric": "rmse",
    "tree_method": "hist",
    "max_depth": 6,
    "eta": 0.1,
}
my_xgboost_func(params)

[0]	eval-rmse:2.18114
[1]	eval-rmse:2.13805
[2]	eval-rmse:2.10221
[3]	eval-rmse:2.07294
[4]	eval-rmse:2.04855
[5]	eval-rmse:2.02852
[6]	eval-rmse:2.01225
[7]	eval-rmse:1.99868
[8]	eval-rmse:1.98771
[9]	eval-rmse:1.97872
OrderedDict({'rmse': [2.1811370920777606, 2.1380522744942168, 2.1022143627953036, 2.072936825276888, 2.048545721269399, 2.028522863406997, 2.0122461934067273, 1.9986807853230097, 1.9877117047436583, 1.9787180742813586]})


{'eval-rmse': 1.9787180742813586}

In [None]:
ray.init(num_cpus=9, ignore_reinit_error=True, include_dashboard=False)


2025-11-01 04:19:35,204	INFO worker.py:2012 -- Started a local Ray instance.


0,1
Python version:,3.12.12
Ray version:,2.51.1


In [None]:
ray.cluster_resources()


{'CPU': 9.0,
 'memory': 9190551552.0,
 'node:172.28.0.12': 1.0,
 'object_store_memory': 3938807808.0,
 'node:__internal_head__': 1.0}

### Hyperparameter tuning with Ray Tune

Let's use Ray Tune to run distributed hyperparameter tuning for the XGBoost model.

In [None]:
from ray.air import RunConfig
# No CLIRepter or Verbosity import is needed

tuner = ray.tune.Tuner(
    my_xgboost_func,
    param_space={
        "objective": "reg:squarederror",
        "eval_metric": "rmse",
        "tree_method": "hist",
        "max_depth": 6,
        "eta": ray.tune.uniform(0.01, 0.3),
    },
    run_config=RunConfig(
        storage_path=storage_folder
    ),
    tune_config=ray.tune.TuneConfig(
        metric="eval-rmse",
        mode="min",
        num_samples=10,
    ),
)

results = tuner.fit()
print(results.get_best_result().config)

+------------------------------------------------------------------------+
| Configuration for experiment     my_xgboost_func_2025-11-01_04-19-57   |
+------------------------------------------------------------------------+
| Search algorithm                 BasicVariantGenerator                 |
| Scheduler                        FIFOScheduler                         |
| Number of trials                 10                                    |
+------------------------------------------------------------------------+

View detailed results here: /content/cluster_storage/my_xgboost_func_2025-11-01_04-19-57
To visualize your results with TensorBoard, run: `tensorboard --logdir /tmp/ray/session_2025-11-01_04-19-25_916625_803/artifacts/2025-11-01_04-19-57/my_xgboost_func_2025-11-01_04-19-57/driver_artifacts`

Trial status: 10 PENDING
Current time: 2025-11-01 04:20:06. Total running time: 0s
Logical resource usage: 0/9 CPUs, 0/0 GPUs
+----------------------------------------------------+


2025-11-01 04:22:02,878	INFO tune.py:1009 -- Wrote the latest version of all result files and experiment state to '/content/cluster_storage/my_xgboost_func_2025-11-01_04-19-57' in 0.0166s.



Trial my_xgboost_func_0670e_00009 completed after 1 iterations at 2025-11-01 04:22:02. Total running time: 1min 56s
+------------------------------------------------------+
| Trial my_xgboost_func_0670e_00009 result             |
+------------------------------------------------------+
| checkpoint_dir_name                                  |
| time_this_iter_s                             5.69158 |
| time_total_s                                 5.69158 |
| training_iteration                                 1 |
| eval-rmse                                     1.9453 |
+------------------------------------------------------+
[36m(my_xgboost_func pid=3741)[0m [9]	eval-rmse:1.94530

Trial status: 10 TERMINATED
Current time: 2025-11-01 04:22:02. Total running time: 1min 56s
Logical resource usage: 1.0/9 CPUs, 0/0 GPUs
Current best trial: 0670e_00001 with eval-rmse=1.9344382573515202 and params={'objective': 'reg:squarederror', 'eval_metric': 'rmse', 'tree_method': 'hist', 'max_depth': 6, '

Here is a diagram that shows what Tune does:

It is effectively scheduling many trials and returning the best performing one.

<img src="https://bair.berkeley.edu/static/blog/tune/tune-arch-simple.png" width="700px" loading="lazy">

### Distributed training with Ray Train

In case the training data is too large, training might take a long time to complete.

To speed it up, share the dataset across training workers and perform distributed XGBoost training.

Let's redefine `load_data` to now load a different slice of the data given the worker index/rank.

In [None]:
def load_data():
    # find out which training worker is running this code
    train_ctx = ray.train.get_context()
    worker_rank = train_ctx.get_world_rank()
    print(f"Loading data for worker {worker_rank}...")

    # build path based on training worker rank
    month = (worker_rank + 1) % 12
    year = 2021 + (worker_rank + 1) // 12
    path = f"s3://anyscale-public-materials/nyc-taxi-cab/yellow_tripdata_{year}-{month:02}.parquet"

    # same as before
    df = pd.read_parquet(path, columns=features + [label_column])
    X_train, X_test, y_train, y_test = train_test_split(
        df[features], df[label_column], test_size=0.2, random_state=42
    )
    dtrain = xgboost.DMatrix(X_train, label=y_train)
    dtest = xgboost.DMatrix(X_test, label=y_test)
    return dtrain, dtest

In [None]:
trainer = RayTrainXGBoostTrainer(  # Create a trainer
    my_xgboost_func,  # Pass it the training function
    scaling_config=ray.train.ScalingConfig(
        num_workers=2, use_gpu=False
    ),  # Define how many training workers
    train_loop_config=params,  # Pass it the hyperparameters
)

trainer.fit()  # Run the training job

[36m(TrainController pid=3930)[0m Attempting to start training worker group of size 2 with the following resources: [{'CPU': 1}] * 2
[36m(TrainController pid=3930)[0m Started training worker group of size 2: 
[36m(TrainController pid=3930)[0m - (ip=172.28.0.12, pid=4061) world_rank=0, local_rank=0, node_rank=0
[36m(TrainController pid=3930)[0m - (ip=172.28.0.12, pid=4127) world_rank=1, local_rank=1, node_rank=0
[36m(RayTrainWorker pid=4061)[0m [04:22:50] Task [xgboost.ray-rank=00000000]:22cfc77f4eaa0709f2931fb901000000 got rank 0
[36m(RayTrainWorker pid=4061)[0m Loading data for worker 0...
[36m(TrainController pid=3930)[0m [04:22:53] [0]	eval-rmse:2.28346
[36m(TrainController pid=3930)[0m [04:22:53] [1]	eval-rmse:2.25069
[36m(TrainController pid=3930)[0m [04:22:53] [2]	eval-rmse:2.22460
[36m(TrainController pid=3930)[0m [04:22:54] [3]	eval-rmse:2.20430
[36m(TrainController pid=3930)[0m [04:22:54] [4]	eval-rmse:2.18836
[36m(TrainController pid=3930)[0m [04:22:54

Result(metrics=None, checkpoint=None, error=None, path='/root/ray_results/ray_train_run-2025-11-01_04-22-19', metrics_dataframe=None, best_checkpoints=[], _storage_filesystem=<pyarrow._fs.LocalFileSystem object at 0x7ba936c04070>)

Here is a diagram that shows what Train does:

A train controller will create training workers and execute the training function on each worker.

Ray Train delegates the distributed training to the underlying XGBoost framework.

<img src="https://docs.ray.io/en/latest/_images/overview.png" width="700px" loading="lazy">

### Serving an ensemble model with Ray Serve

Ray Serve allows for distributed serving of models and complex inference pipelines.

Here is a diagram showing how to deploy an ensemble model with Ray Serve:

<img src="https://images.ctfassets.net/xjan103pcp94/3DJ7vVRxYIvcFO7JmIUMCx/77a45caa275ffa46f5135f4d6726dd4f/Figure_2_-_Fanout_and_ensemble.png" width="700px" loading="lazy">

Here is how the resulting code looks like:

In [None]:
import fastapi
from pydantic import BaseModel
import ray
from ray import serve
from ray.serve.handle import DeploymentHandle
import xgboost
import pandas as pd
import asyncio

@ray.serve.deployment
class Ensemble:
    def __init__(self, model1: DeploymentHandle, model2: DeploymentHandle):
        self.model1 = model1
        self.model2 = model2

        # Define Payload model inside __init__ to avoid serialization issues
        class Payload(BaseModel):
            passenger_count: int
            trip_distance: float
            fare_amount: float
            tolls_amount: float

        # Create FastAPI app inside the deployment
        self.app = fastapi.FastAPI()

        # Create endpoint function
        async def predict_endpoint(data: Payload):
            model1_prediction, model2_prediction = await asyncio.gather(
                self.model1.predict.remote([data.model_dump()]),
                self.model2.predict.remote([data.model_dump()]),
            )
            out = {"prediction": float(model1_prediction + model2_prediction) / 2}
            return out

        # Register the endpoint
        self.app.post("/predict")(predict_endpoint)

    async def __call__(self, request: fastapi.Request):
        return await self.app(request.scope, request.receive, request._send)

@ray.serve.deployment
class Model:
    def __init__(self, path: str):
        self._model = xgboost.Booster()
        self._model.load_model(path)

    def predict(self, data: list[dict]) -> list[float]:
        # Make prediction
        dmatrix = xgboost.DMatrix(pd.DataFrame(data))
        model_prediction = self._model.predict(dmatrix)
        return model_prediction

# Make sure Ray is initialized
if not ray.is_initialized():
    ray.init()


# Run the deployment
handle = ray.serve.run(
    Ensemble.bind(
        model1=Model.bind(model_path),
        model2=Model.bind(model_path),
    ),
    route_prefix="/ensemble"
)

[36m(ProxyActor pid=4421)[0m INFO 2025-11-01 04:23:18,950 proxy 172.28.0.12 -- Proxy starting on node 12ec6d85f37f25cd8dba2fedc57633e120fbda3acd3b776babde4bf7 (HTTP port: 8000).
INFO 2025-11-01 04:23:19,219 serve 803 -- Started Serve in namespace "serve".
[36m(ProxyActor pid=4421)[0m INFO 2025-11-01 04:23:19,212 proxy 172.28.0.12 -- Got updated endpoints: {}.
[36m(ServeController pid=4366)[0m INFO 2025-11-01 04:23:19,296 controller 4366 -- Deploying new version of Deployment(name='Model', app='default') (initial target replicas: 1).
[36m(ServeController pid=4366)[0m INFO 2025-11-01 04:23:19,298 controller 4366 -- Deploying new version of Deployment(name='Model_1', app='default') (initial target replicas: 1).
[36m(ServeController pid=4366)[0m INFO 2025-11-01 04:23:19,299 controller 4366 -- Deploying new version of Deployment(name='Ensemble', app='default') (initial target replicas: 1).
[36m(ProxyActor pid=4421)[0m INFO 2025-11-01 04:23:19,303 proxy 172.28.0.12 -- Got updated

In [None]:
requests.post(
    "http://localhost:8000/ensemble/predict",
    json={  # Use json parameter instead of params
        "passenger_count": 1,
        "trip_distance": 2.5,
        "fare_amount": 10.0,
        "tolls_amount": 0.5,
    },
).json()

[36m(ServeReplica:default:Ensemble pid=4532)[0m INFO 2025-11-01 04:23:57,141 default_Ensemble ie49zc0b a96e4f1e-fff7-46f7-a170-c8d1cdfd2770 -- Started <ray.serve._private.router.SharedRouterLongPollClient object at 0x79c6f8249400>.


{'prediction': 2.0076115131378174}

### Batch inference with Ray Data

Ray Data allows for distributed data processing through streaming execution across a heterogeneous cluster of CPUs and GPUs.

This makes Ray Data ideal for workloads like compute-intensive data processing, data ingestion, and batch inference.

In [None]:
class OfflinePredictor:
    def __init__(self):
        # Load expensive state
        self._model = xgboost.Booster()
        self._model.load_model(model_path)

    def predict(self, data: list[dict]) -> list[float]:
        # Make prediction in batch
        dmatrix = xgboost.DMatrix(pd.DataFrame(data))
        model_prediction = self._model.predict(dmatrix)
        return model_prediction

    def __call__(self, batch: dict) -> dict:
        batch["predictions"] = self.predict(batch)
        return batch


# Apply the predictor to the validation dataset
prediction_pipeline = (
    ray.data.read_parquet(
        "s3://anyscale-public-materials/nyc-taxi-cab/yellow_tripdata_2021-03.parquet"
    )
    .select_columns(features)
    .map_batches(OfflinePredictor, concurrency=(2, 10))
)

Parquet dataset sampling 0:   0%|          | 0.00/1.00 [00:00<?, ? file/s]

2025-11-01 04:24:19,347	INFO parquet_datasource.py:725 -- Estimated parquet encoding ratio is 9.752.
2025-11-01 04:24:19,351	INFO parquet_datasource.py:785 -- Estimated parquet reader batch size at 883012 rows


In [None]:
prediction_pipeline.write_parquet("/content/xgboost_predictions")

2025-11-01 04:24:30,960	INFO streaming_executor.py:85 -- A new progress UI is available. To enable, set `ray.data.DataContext.get_current().enable_rich_progress_bars = True`.
2025-11-01 04:24:30,962	INFO logging.py:397 -- Registered dataset logger for dataset dataset_4_0
2025-11-01 04:24:31,005	INFO streaming_executor.py:170 -- Starting execution of Dataset dataset_4_0. Full logs are in /tmp/ray/session_2025-11-01_04-19-25_916625_803/logs/ray-data
2025-11-01 04:24:31,006	INFO streaming_executor.py:171 -- Execution plan of Dataset dataset_4_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> ActorPoolMapOperator[MapBatches(OfflinePredictor)] -> TaskPoolMapOperator[Write]




Running 0: 0.00 row [00:00, ? row/s]

- ReadParquet->SplitBlocks(100) 1: 0.00 row [00:00, ? row/s]

- MapBatches(OfflinePredictor) 2: 0.00 row [00:00, ? row/s]

- Write 3: 0.00 row [00:00, ? row/s]





2025-11-01 04:25:04,602	INFO streaming_executor.py:298 -- ✔️  Dataset dataset_4_0 execution finished in 33.59 seconds
2025-11-01 04:25:04,770	INFO dataset.py:5106 -- Data sink Parquet finished. 1925152 rows and 66.1MB data written.


In [None]:
!ls xgboost_predictions/

2_e12ef53ea9ef45ea9c9f5a62ad2a5f66_000000_000000-0.parquet
2_e12ef53ea9ef45ea9c9f5a62ad2a5f66_000001_000000-0.parquet
2_e12ef53ea9ef45ea9c9f5a62ad2a5f66_000002_000000-0.parquet
2_e12ef53ea9ef45ea9c9f5a62ad2a5f66_000003_000000-0.parquet
2_e12ef53ea9ef45ea9c9f5a62ad2a5f66_000004_000000-0.parquet
2_e12ef53ea9ef45ea9c9f5a62ad2a5f66_000005_000000-0.parquet
2_e12ef53ea9ef45ea9c9f5a62ad2a5f66_000006_000000-0.parquet
2_e12ef53ea9ef45ea9c9f5a62ad2a5f66_000007_000000-0.parquet
2_e12ef53ea9ef45ea9c9f5a62ad2a5f66_000008_000000-0.parquet
2_e12ef53ea9ef45ea9c9f5a62ad2a5f66_000009_000000-0.parquet
2_e12ef53ea9ef45ea9c9f5a62ad2a5f66_000010_000000-0.parquet
2_e12ef53ea9ef45ea9c9f5a62ad2a5f66_000011_000000-0.parquet
2_e12ef53ea9ef45ea9c9f5a62ad2a5f66_000012_000000-0.parquet
2_e12ef53ea9ef45ea9c9f5a62ad2a5f66_000013_000000-0.parquet
2_e12ef53ea9ef45ea9c9f5a62ad2a5f66_000014_000000-0.parquet
2_e12ef53ea9ef45ea9c9f5a62ad2a5f66_000015_000000-0.parquet
2_e12ef53ea9ef45ea9c9f5a62ad2a5f66_000016_000000-0.parqu

In [None]:
ray.timeline(filename="ray_end_to_end.json")

In [None]:
import pandas as pd

# Read the Parquet file into a DataFrame
file_example = pd.read_parquet('/content/xgboost_predictions/2_e12ef53ea9ef45ea9c9f5a62ad2a5f66_000003_000000-0.parquet')
# Display the first few rows
print(file_example.head(10))

# View the schema (data types and column names)
print(file_example.info())

   passenger_count  trip_distance  fare_amount  tolls_amount  predictions
0              1.0           1.40          7.0          0.00     1.705917
1              1.0           1.18          5.5          0.00     1.558317
2              2.0          14.01         52.0          6.12     6.075655
3              1.0           2.15         12.5          0.00     2.071767
4              1.0           1.82          7.5          0.00     1.749015
5              1.0           1.40          8.0          0.00     1.771149
6              1.0           1.60          8.0          0.00     1.787580
7              1.0           2.66         13.5          0.00     2.131023
8              1.0           1.24          7.0          0.00     1.659633
9              1.0           4.00         13.0          0.00     2.177301
<class 'pandas.core.frame.DataFrame'>
RangeIndex: 19252 entries, 0 to 19251
Data columns (total 5 columns):
 #   Column           Non-Null Count  Dtype  
---  ------           ----------