# Introduction to the Ray AI Libraries: An example of using Ray data, Ray Train, Ray Tune, Ray Serve to implement a XGBoost regression model

¬© 2025, Anyscale. All Rights Reserved

üíª **Launch Locally**: You can run this notebook locally, but performance will be reduced.

üöÄ **Launch on Cloud**: A Ray Cluster with 4 GPUs (Click [here](http://console.anyscale.com/register) to easily start a Ray cluster on Anyscale) is recommended to run this notebook.

Let's start with a quick end-to-end example to get a sense of what the Ray AI Libraries can do.
<div class="alert alert-block alert-info">
<b> Here is the roadmap for this notebook:</b>
<ul>
    <li>Overview of the Ray AI Libraries</li>
    <li>Quick end-to-end example</li>
    <ul>
      <li>Vanilla XGBoost code</li>
      <li>Hyperparameter tuning with Ray Tune</li>
      <li>Distributed training with Ray Train</li>
      <li>Serving an ensemble model with Ray Serve</li>
      <li>Batch inference with Ray Data</li>
    </ul>
</ul>
</div>

**Imports**

In [1]:
# (Optional): If you get an XGBoostError at import, you might have to `brew install libomp` before importing xgboost again
!brew install libomp

[?25l[K[34m‚†ã[0m JSON API formula.jws.json                          Downloading  32.0MB/-------
[K[34m‚†ã[0m JSON API cask.jws.json                             Downloading  15.3MB/-------[1F[K[34m‚†ã[0m JSON API formula.jws.json                          Downloading  32.0MB/-------
[K[34m‚†ã[0m JSON API cask.jws.json                             Downloading  15.3MB/-------[1F[K[34m‚†ô[0m JSON API formula.jws.json                          Downloading  32.0MB/-------
[K[34m‚†ô[0m JSON API cask.jws.json                             Downloading  15.3MB/-------[1F[K[34m‚†ô[0m JSON API formula.jws.json                          Downloading  32.0MB/-------
[K[34m‚†ô[0m JSON API cask.jws.json                             Downloading  15.3MB/-------[1F[K[34m‚†ö[0m JSON API formula.jws.json                          Downloading  32.0MB/-------
[K[34m‚†ö[0m JSON API cask.jws.json                             Downloading  15.3MB/-------[1F[K[34m‚†û[0m JSON API form

In [1]:
import asyncio
import fastapi
import pandas as pd
import requests
# macos: If you get an XGBoostError at import, you might have to `brew install libomp` before importing xgboost again
import xgboost
from pydantic import BaseModel
from sklearn.model_selection import train_test_split

import ray
import ray.tune
import ray.train
from ray.train.xgboost import XGBoostTrainer as RayTrainXGBoostTrainer
from ray.train import RunConfig
import ray.data
import ray.serve

  from .autonotebook import tqdm as notebook_tqdm
2026-01-17 17:50:28,197	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.
2026-01-17 17:50:28,273	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.
2026-01-17 17:50:28,312	INFO util.py:154 -- Missing packages: ['ipywidgets']. Run `pip install -U ipywidgets`, then restart the notebook server for rich notebook output.


## 1. Overview of the Ray AI Libraries

<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Ray_AI_Libraries/Ray+AI+Libraries.png" width="700px" loading="lazy">

Built on top of Ray Core, the Ray AI Libraries inherit all the performance and scalability benefits offered by Core while providing a convenient abstraction layer for machine learning. These Python-first native libraries allow ML practitioners to distribute individual workloads, end-to-end applications, and build custom use cases in a unified framework.

The Ray AI Libraries bring together an ever-growing ecosystem of integrations with popular machine learning frameworks to create a common interface for development.

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Introduction_to_Ray_AIR/e2e_air.png" width="100%" loading="lazy">|
|:-:|
|Ray AI Libraries enable end-to-end ML development and provides multiple options for integrating with other tools and libraries from the MLOps ecosystem.|



## 2. Quick end-to-end example

For this classification task, you will apply a simple [XGBoost](https://xgboost.readthedocs.io/en/stable/) (a gradient boosted trees framework) model to the June 2021 [New York City Taxi & Limousine Commission's Trip Record Data](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page). 

The full dataset contains millions of samples of yellow cab rides, and the goal is to predict the tip amount.

**Dataset features**
* **`passenger_count`**
    * Float (whole number) representing number of passengers.
* **`trip_distance`** 
    * Float representing trip distance in miles.
* **`fare_amount`**
    * Float representing total price including tax, tip, fees, etc.
* **`tolls_amount`**
    * Float representing the total paid on tolls if any.

**Target**
* **`trip_amount`**
    * Float representing the total paid as tips

### 2.1 Vanilla XGboost code

Let's start with the vanilla XGBoost code to predict the tip amount for a NYC taxi cab data.

In [2]:
features = [
    "passenger_count", 
    "trip_distance",
    "fare_amount",
    "tolls_amount",
]

label_column = "tip_amount"

Define a function to load the data and split into train and test

In [11]:
# import kagglehub

# # Download latest version
# path = kagglehub.dataset_download("elemento/nyc-yellow-taxi-trip-data")

# print("Path to dataset files:", path)

  from .autonotebook import tqdm as notebook_tqdm


Downloading to /Users/gourbera/.cache/kagglehub/datasets/elemento/nyc-yellow-taxi-trip-data/2.archive...


100%|‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà‚ñà| 1.78G/1.78G [04:28<00:00, 7.14MB/s]

Extracting files...





Path to dataset files: /Users/gourbera/.cache/kagglehub/datasets/elemento/nyc-yellow-taxi-trip-data/versions/2


In [3]:
def load_data():
    # path = "s3://anyscale-public-materials/nyc-taxi-cab/yellow_tripdata_2021-03.parquet"
    path = "/Users/gourbera/.cache/kagglehub/datasets/elemento/nyc-yellow-taxi-trip-data/versions/2"
    df = pd.read_csv(path, usecols=features + [label_column])
    X_train, X_test, y_train, y_test = train_test_split(
        df[features], df[label_column], test_size=0.2, random_state=42
    )
    dtrain = xgboost.DMatrix(X_train, label=y_train)
    dtest = xgboost.DMatrix(X_test, label=y_test)
    return dtrain, dtest

Define a function to run `xgboost.train` given some hyperparameter dictionary `params`

In [4]:
storage_folder = "/Users/gourbera/ai_engineering/projects/ray_certification/storage_folder/" # Modify this path to your local folder if it runs on your local environment

In [7]:
from pathlib import Path
model_path = Path(storage_folder) / "model.ubj"

def my_xgboost_func(params):    
    evals_result = {}
    dtrain, dtest = load_data()
    bst = xgboost.train(
        params, 
        dtrain, 
        num_boost_round=10, 
        evals=[(dtest, "eval")], 
        evals_result=evals_result,
    )
    # Use Path
    bst.save_model(model_path)
    print(f"{evals_result['eval']}")
    return {"eval-rmse": evals_result["eval"]["rmse"][-1]}

params = {
    "objective": "reg:squarederror",
    "eval_metric": "rmse",
    "tree_method": "hist",
    "max_depth": 5,
    "eta": 0.1,
}
# my_xgboost_func(params)

### 2.2 Hyperparameter tuning with Ray Tune

Let's use Ray Tune to run distributed hyperparameter tuning for the XGBoost model.

In [9]:
# tuner = ray.tune.Tuner(  # Create a tuner
#     my_xgboost_func,  # Pass it the training function which Ray Tune calls Trainable.
#     param_space={  # Pass it the parameter space to search over
#         "objective": "reg:squarederror",
#         "eval_metric": "rmse",
#         "tree_method": "hist",
#         "max_depth": 6,
#         "eta": ray.tune.uniform(0.01, 0.3),
#     },
#     run_config=RunConfig(storage_path=storage_folder),
#     tune_config=ray.tune.TuneConfig(  # Tell it which metric to tune
#         metric="eval-rmse",
#         mode="min",
#         num_samples=10,
#     ),
# )

# results = tuner.fit()  # Run the tuning job
# print(results.get_best_result().config)  # Get back the best hyperparameters

Here is a diagram that shows what Tune does:

It is effectively scheduling many trials and returning the best performing one.

<img src="https://bair.berkeley.edu/static/blog/tune/tune-arch-simple.png" width="700px" loading="lazy">

### 2.3. Distributed training with Ray Train

In case your training data is too large, your training might take a long time to complete.

To speed it up, shard the dataset across training workers and perform distributed XGBoost training.

Let's redefine `load_data` to now load a different slice of the data given the worker index/rank.

In [15]:
def load_data():
    # find out which training worker is running this code
    train_ctx = ray.train.get_context()
    worker_rank = train_ctx.get_world_rank()
    print(f"Loading data for worker {worker_rank}...")

    # build path based on training worker rank
    month = "01"
    year = 2015 + (worker_rank + 1) // 12
    path = f"/Users/gourbera/.cache/kagglehub/datasets/elemento/nyc-yellow-taxi-trip-data/versions/2/yellow_tripdata_{year}-{month:02}.csv"

    # same as before
    df = pd.read_csv(path, usecols=features + [label_column])
    X_train, X_test, y_train, y_test = train_test_split(
        df[features], df[label_column], test_size=0.2, random_state=42
    )
    dtrain = xgboost.DMatrix(X_train, label=y_train)
    dtest = xgboost.DMatrix(X_test, label=y_test)
    return dtrain, dtest

In [11]:
# path = f"/Users/gourbera/.cache/kagglehub/datasets/elemento/nyc-yellow-taxi-trip-data/versions/2/yellow_tripdata_2016-01.csv"

#     # same as before
# df = pd.read_csv(path, usecols=features + [label_column])
# # X_train, X_test, y_train, y_test = train_test_split(
# #         df[features], df[label_column], test_size=0.2, random_state=42
# #     )
# df.head()

Now we can run distributed XGBoost training using Ray Train's XGBoostTrainer - similar trainers exist for other popular ML frameworks.

In [16]:
trainer = RayTrainXGBoostTrainer(  # Create a trainer
    my_xgboost_func,  # Pass it the training function
    scaling_config=ray.train.ScalingConfig(
        num_workers=2, use_gpu=False
    ),  # Define how many training workers
    train_loop_config=params,  # Pass it the hyperparameters
)

trainer.fit()  # Run the training job

[36m(TrainController pid=28265)[0m Attempting to start training worker group of size 2 with the following resources: [{'CPU': 1}] * 2
[36m(TrainController pid=28265)[0m Started training worker group of size 2: 
[36m(TrainController pid=28265)[0m - (ip=127.0.0.1, pid=28274) world_rank=0, local_rank=0, node_rank=0
[36m(TrainController pid=28265)[0m - (ip=127.0.0.1, pid=28275) world_rank=1, local_rank=1, node_rank=0
[36m(RayTrainWorker pid=28274)[0m [17:57:04] Task [xgboost.ray-rank=00000000]:4f170b056f1c58a45964d48701000000 got rank 0
[36m(RayTrainWorker pid=28274)[0m Loading data for worker 0...
[36m(TrainController pid=28265)[0m [17:57:13] [0]	eval-rmse:4.69454
[36m(RayTrainWorker pid=28275)[0m [17:57:04] Task [xgboost.ray-rank=00000001]:0d508ef904ac26ec3927855101000000 got rank 1
[36m(RayTrainWorker pid=28275)[0m Loading data for worker 1...
[36m(TrainController pid=28265)[0m [17:57:13] [1]	eval-rmse:7.99348
[36m(TrainController pid=28265)[0m [17:57:13] [2]	eval-

Result(metrics=None, checkpoint=None, error=None, path='/Users/gourbera/ray_results/ray_train_run-2026-01-17_17-57-01', metrics_dataframe=None, best_checkpoints=[], _storage_filesystem=<pyarrow._fs.LocalFileSystem object at 0x128f069f0>)

Here is a diagram that shows what Train does:

A train controller will create training workers and execute the training function on each worker.

Ray Train delegates the distributed training to the underlying XGBoost framework.

<img src="https://docs.ray.io/en/latest/_images/overview.png" width="700px" loading="lazy">

### 2.4 Serving an ensemble model with Ray Serve

Ray Serve allows for distributed serving of models and complex inference pipelines.

Here is a diagram showing how to deploy an ensemble model with Ray Serve:

<img src="https://images.ctfassets.net/xjan103pcp94/3DJ7vVRxYIvcFO7JmIUMCx/77a45caa275ffa46f5135f4d6726dd4f/Figure_2_-_Fanout_and_ensemble.png" width="700px" loading="lazy">

Here is how the resulting code looks like:

In [23]:
app = fastapi.FastAPI()

class Payload(BaseModel):
    passenger_count: int
    trip_distance: float
    fare_amount: float
    tolls_amount: float


@ray.serve.deployment
@ray.serve.ingress(app)
class Ensemble:
    def __init__(self, model1, model2):
        self.model1 = model1
        self.model2 = model2

    @app.post("/predict")
    async def predict(self, data: Payload) -> dict:
        model1_prediction, model2_prediction = await asyncio.gather(
            self.model1.predict.remote([data.model_dump()]),
            self.model2.predict.remote([data.model_dump()]),
        )
        # Extract scalar values from numpy arrays
        pred1 = float(model1_prediction[0]) if hasattr(model1_prediction, '__len__') else float(model1_prediction)
        pred2 = float(model2_prediction[0]) if hasattr(model2_prediction, '__len__') else float(model2_prediction)
        out = {"prediction": (pred1 + pred2) / 2}
        return out


@ray.serve.deployment
class Model:
    def __init__(self, path: str):
        self._model = xgboost.Booster()
        self._model.load_model(path)

    def predict(self, data: list[dict]) -> list[float]:
        # Make prediction
        dmatrix = xgboost.DMatrix(pd.DataFrame(data))
        model_prediction = self._model.predict(dmatrix)
        return model_prediction


# Run the deployment
handle = ray.serve.run(
    Ensemble.bind(
        model1=Model.bind(model_path),
        model2=Model.bind(model_path),
    ),
    route_prefix="/ensemble"
)

INFO 2026-01-17 18:03:03,514 serve 26934 -- Connecting to existing Serve app in namespace "serve". New http options will not be applied.
[36m(ServeController pid=27185)[0m INFO 2026-01-17 18:03:03,616 controller 27185 -- Deploying new version of Deployment(name='Model', app='default') (initial target replicas: 1).
[36m(ServeController pid=27185)[0m INFO 2026-01-17 18:03:03,617 controller 27185 -- Deploying new version of Deployment(name='Model_1', app='default') (initial target replicas: 1).
[36m(ServeController pid=27185)[0m INFO 2026-01-17 18:03:03,618 controller 27185 -- Deploying new version of Deployment(name='Ensemble', app='default') (initial target replicas: 1).
[36m(ServeController pid=27185)[0m INFO 2026-01-17 18:03:03,723 controller 27185 -- Stopping 1 replicas of Deployment(name='Model', app='default') with outdated versions.
[36m(ServeController pid=27185)[0m INFO 2026-01-17 18:03:03,723 controller 27185 -- Adding 1 replica to Deployment(name='Model', app='defaul

Let's make an HTTP request to the Ray Serve instance.

In [24]:
try:
    response = requests.post(
        "http://localhost:8000/ensemble/predict",
        json={
            "passenger_count": 1,
            "trip_distance": 2.5,
            "fare_amount": 10.0,
            "tolls_amount": 0.5,
        },
        timeout=10
    )
    response.raise_for_status()
    print("Prediction:", response.json())
except requests.exceptions.ConnectionError:
    print("Error: Could not connect to Ray Serve. Make sure the server is running with ray.serve.run()")
except requests.exceptions.RequestException as e:
    print(f"Request error: {e}")


Prediction: {'prediction': 1.51448655128479}


[36m(ServeReplica:default:Model_1 pid=29412)[0m INFO 2026-01-17 18:03:14,546 default_Model_1 3325qrcd 779e6cc5-5c57-4f7d-b595-0209dc18ecd3 -- CALL predict OK 3.0ms
[36m(ServeReplica:default:Model pid=29411)[0m INFO 2026-01-17 18:03:14,546 default_Model yb75o62z 779e6cc5-5c57-4f7d-b595-0209dc18ecd3 -- CALL predict OK 4.0ms
[36m(ServeReplica:default:Ensemble pid=29413)[0m INFO 2026-01-17 18:03:14,522 default_Ensemble c24thshy 779e6cc5-5c57-4f7d-b595-0209dc18ecd3 -- Started <ray.serve._private.router.SharedRouterLongPollClient object at 0x11b722e40>.
[36m(ServeReplica:default:Ensemble pid=29413)[0m INFO 2026-01-17 18:03:14,547 default_Ensemble c24thshy 779e6cc5-5c57-4f7d-b595-0209dc18ecd3 -- POST /ensemble/predict 200 40.1ms
[36m(ServeReplica:default:Ensemble pid=29413)[0m INFO 2026-01-17 18:03:37,443 default_Ensemble c24thshy 052c5c75-c934-48da-85f0-65fab586a400 -- GET /ensemble 404 0.7ms
[36m(ServeReplica:default:Ensemble pid=29413)[0m INFO 2026-01-17 18:04:06,085 default_En

### 2.5 Batch inference with Ray Data

Ray Data allows for distributed data processing through streaming execution across a heterogeneous cluster of CPUs and GPUs.

This makes Ray Data ideal for workloads like compute-intensive data processing, data ingestion, and batch inference.

In [20]:
class OfflinePredictor:
    def __init__(self):
        # Load expensive state
        self._model = xgboost.Booster()
        self._model.load_model(model_path)

    def predict(self, data: list[dict]) -> list[float]:
        # Make prediction in batch
        dmatrix = xgboost.DMatrix(pd.DataFrame(data))
        model_prediction = self._model.predict(dmatrix)
        return model_prediction

    def __call__(self, batch: dict) -> dict:
        batch["predictions"] = self.predict(batch)
        return batch


# Apply the predictor to the validation dataset
prediction_pipeline = (
    ray.data.read_csv(
        "/Users/gourbera/.cache/kagglehub/datasets/elemento/nyc-yellow-taxi-trip-data/versions/2/yellow_tripdata_2015-01.csv"
    )
    .select_columns(features)
    .map_batches(OfflinePredictor, concurrency=(2, 10))
)



After defining the pipeline, we can execute it in a distributed manner by writing the output to a sink

In [21]:
prediction_pipeline.write_parquet("./xgboost_predictions") #update this to your local path if runs on your local

2026-01-17 17:58:53,978	INFO logging.py:397 -- Registered dataset logger for dataset dataset_4_0
2026-01-17 17:58:54,014	INFO streaming_executor.py:178 -- Starting execution of Dataset dataset_4_0. Full logs are in /tmp/ray/session_2026-01-17_17-51-39_773213_26934/logs/ray-data
2026-01-17 17:58:54,015	INFO streaming_executor.py:179 -- Execution plan of Dataset dataset_4_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV] -> ActorPoolMapOperator[Project->MapBatches(OfflinePredictor)] -> TaskPoolMapOperator[Write]
2026-01-17 17:58:54,057	INFO streaming_executor.py:686 -- [dataset]: A new progress UI is available. To enable, set `ray.data.DataContext.get_current().enable_rich_progress_bars = True` and `ray.data.DataContext.get_current().use_ray_tqdm = False`.
2026-01-17 17:58:54,058	INFO progress_bar.py:155 -- Progress bar disabled because stdout is a non-interactive terminal.
2026-01-17 17:58:54,172	INFO progress_bar.py:213 -- === Ray Data Progress {ReadCSV->SplitBlocks(14)} ===
20

[36m(ServeReplica:default:Ensemble pid=27194)[0m INFO 2026-01-17 17:59:29,412 default_Ensemble ktnnax4b 18d294db-a5f8-4c38-97d7-fda7b97aadf7 -- GET /ensemble 404 0.6ms
[36m(ServeReplica:default:Ensemble pid=27194)[0m INFO 2026-01-17 17:59:35,189 default_Ensemble ktnnax4b a677aa52-9ab3-4ca8-9364-e87300c2fd74 -- GET /ensemble 404 2.2ms


Let's inspect the produced predictions.

In [None]:
!ls {storage_folder}/xgboost_predictions/

### 2.6 Clean up

In [None]:
# Run this cell for file cleanup 
!rm -rf {storage_folder}/xgboost_predictions/
!rm {model_path}