# Introduction to the Ray AI Libraries: An example of using Ray data, Ray Train, Ray Tune, Ray Serve to implement a XGBoost regression model

¬© 2025, Anyscale. All Rights Reserved

üíª **Launch Locally**: You can run this notebook locally, but performance will be reduced.

üöÄ **Launch on Cloud**: A Ray Cluster with 4 GPUs (Click [here](http://console.anyscale.com/register) to easily start a Ray cluster on Anyscale) is recommended to run this notebook.

Let's start with a quick end-to-end example to get a sense of what the Ray AI Libraries can do.
<div class="alert alert-block alert-info">
<b> Here is the roadmap for this notebook:</b>
<ul>
    <li>Overview of the Ray AI Libraries</li>
    <li>Quick end-to-end example</li>
    <ul>
      <li>Vanilla XGBoost code</li>
      <li>Hyperparameter tuning with Ray Tune</li>
      <li>Distributed training with Ray Train</li>
      <li>Serving an ensemble model with Ray Serve</li>
      <li>Batch inference with Ray Data</li>
    </ul>
</ul>
</div>

**Imports**

In [1]:
# (Optional): If you get an XGBoostError at import, you might have to `brew install libomp` before importing xgboost again
# !brew install libomp

In [2]:
import asyncio
import fastapi
import pandas as pd
import requests
# macos: If you get an XGBoostError at import, you might have to `brew install libomp` before importing xgboost again
import xgboost
from pydantic import BaseModel
from sklearn.model_selection import train_test_split

import ray
import ray.tune
import ray.train
from ray.train.xgboost import XGBoostTrainer as RayTrainXGBoostTrainer
from ray.train import RunConfig
import ray.data
import ray.serve

## 1. Overview of the Ray AI Libraries

<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Ray_AI_Libraries/Ray+AI+Libraries.png" width="700px" loading="lazy">

Built on top of Ray Core, the Ray AI Libraries inherit all the performance and scalability benefits offered by Core while providing a convenient abstraction layer for machine learning. These Python-first native libraries allow ML practitioners to distribute individual workloads, end-to-end applications, and build custom use cases in a unified framework.

The Ray AI Libraries bring together an ever-growing ecosystem of integrations with popular machine learning frameworks to create a common interface for development.

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Introduction_to_Ray_AIR/e2e_air.png" width="100%" loading="lazy">|
|:-:|
|Ray AI Libraries enable end-to-end ML development and provides multiple options for integrating with other tools and libraries from the MLOps ecosystem.|



## 2. Quick end-to-end example

For this regression task, you will apply a simple [XGBoost](https://xgboost.readthedocs.io/en/stable/) (a gradient boosted trees framework) model to the June 2021 [New York City Taxi & Limousine Commission's Trip Record Data](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page). 

The full dataset contains millions of samples of yellow cab rides, and the goal is to predict the tip amount.

**Dataset features**
* **`passenger_count`**
    * Float (whole number) representing number of passengers.
* **`trip_distance`** 
    * Float representing trip distance in miles.
* **`fare_amount`**
    * Float representing total price including tax, tip, fees, etc.
* **`tolls_amount`**
    * Float representing the total paid on tolls if any.

**Target**
* **`trip_amount`**
    * Float representing the total paid as tips

### 2.1 Vanilla XGboost code

Let's start with the vanilla XGBoost code to predict the tip amount for a NYC taxi cab data.

In [3]:
features = [
    "passenger_count", 
    "trip_distance",
    "fare_amount",
    "tolls_amount",
]

label_column = "tip_amount"

Define a function to load the data and split into train and test

In [4]:
def load_data():
    path = "s3://anyscale-public-materials/nyc-taxi-cab/yellow_tripdata_2021-03.parquet"
    df = pd.read_parquet(path, columns=features + [label_column])
    X_train, X_test, y_train, y_test = train_test_split(
        df[features], df[label_column], test_size=0.2, random_state=42
    )
    dtrain = xgboost.DMatrix(X_train, label=y_train)
    dtest = xgboost.DMatrix(X_test, label=y_test)
    return dtrain, dtest

Define a function to run `xgboost.train` given some hyperparameter dictionary `params`

In [5]:
from pathlib import Path

# storage_folder = "/mnt/cluster_storage/" # Modify this path to your local folder if it runs on your local environment
storage_folder = Path.cwd() / "models"
storage_folder.mkdir(parents=True, exist_ok=True)

In [6]:
from pathlib import Path
model_path = Path(storage_folder) / "model.ubj"

def my_xgboost_func(params):    
    evals_result = {}
    dtrain, dtest = load_data()
    bst = xgboost.train(
        params, 
        dtrain, 
        num_boost_round=10, 
        evals=[(dtest, "eval")], 
        evals_result=evals_result,
    )
    # Use Path
    bst.save_model(model_path)
    print(f"{evals_result['eval']}")
    return {"eval-rmse": evals_result["eval"]["rmse"][-1]}

params = {
    "objective": "reg:squarederror",
    "eval_metric": "rmse",
    "tree_method": "hist",
    "max_depth": 6,
    "eta": 0.1,
}
my_xgboost_func(params)

[0]	eval-rmse:2.18114
[1]	eval-rmse:2.13805
[2]	eval-rmse:2.10221
[3]	eval-rmse:2.07294
[4]	eval-rmse:2.04855
[5]	eval-rmse:2.02852
[6]	eval-rmse:2.01225
[7]	eval-rmse:1.99868
[8]	eval-rmse:1.98771
[9]	eval-rmse:1.97872
OrderedDict([('rmse', [2.18113709207776, 2.138052274494217, 2.1022143627953036, 2.072936825276888, 2.0485457212693987, 2.028522863406997, 2.0122461934067273, 1.99868078532301, 1.9877117047436585, 1.9787180742813582])])


{'eval-rmse': 1.9787180742813582}

### 2.2 Hyperparameter tuning with Ray Tune

Let's use Ray Tune to run distributed hyperparameter tuning for the XGBoost model.

In [7]:
tuner = ray.tune.Tuner(  # Create a tuner
    my_xgboost_func,  # Pass it the training function which Ray Tune calls Trainable.
    param_space={  # Pass it the parameter space to search over
        "objective": "reg:squarederror",
        "eval_metric": "rmse",
        "tree_method": "hist",
        "max_depth": 6,
        "eta": ray.tune.uniform(0.01, 0.3),
    },
    run_config=RunConfig(storage_path=storage_folder),
    tune_config=ray.tune.TuneConfig(  # Tell it which metric to tune
        metric="eval-rmse",
        mode="min",
        num_samples=10,
    ),
)

results = tuner.fit()  # Run the tuning job
print(results.get_best_result().config)  # Get back the best hyperparameters
# About the below output table:
# - it's dynamic: it has 1 row per task, with status updating as the jobs run (PENDING, RUNNING, FINISHED, FAILED)
# - pid is the process id of the task (within a node of the cluster)
# - ip is the ip address of the node where the task is running
# - node_id is the id of the node where the task is running
# - time_this_iter_s is the time taken by the task so far
# - time_total_s is the total time taken by the task
# - score is the score of the task


0,1
Current time:,2025-11-24 23:43:25
Running for:,00:00:29.31
Memory:,19.1/24.0 GiB

Trial name,status,loc,eta,iter,total time (s),eval-rmse
my_xgboost_func_71de2_00000,TERMINATED,127.0.0.1:36564,0.0287829,1,14.5091,2.10979
my_xgboost_func_71de2_00001,TERMINATED,127.0.0.1:36561,0.115652,1,17.5412,1.96708
my_xgboost_func_71de2_00002,TERMINATED,127.0.0.1:36571,0.19951,1,15.2727,1.9408
my_xgboost_func_71de2_00003,TERMINATED,127.0.0.1:36569,0.200837,1,15.6643,1.94069
my_xgboost_func_71de2_00004,TERMINATED,127.0.0.1:36570,0.0176332,1,21.2013,2.15055
my_xgboost_func_71de2_00005,TERMINATED,127.0.0.1:36573,0.0252701,1,12.9083,2.1218
my_xgboost_func_71de2_00006,TERMINATED,127.0.0.1:36577,0.132889,1,14.2887,1.95799
my_xgboost_func_71de2_00007,TERMINATED,127.0.0.1:36566,0.0371789,1,13.3904,2.08404
my_xgboost_func_71de2_00008,TERMINATED,127.0.0.1:36562,0.0652335,1,12.3552,2.02175
my_xgboost_func_71de2_00009,TERMINATED,127.0.0.1:36563,0.280744,1,11.1995,1.93489


[36m(my_xgboost_func pid=36563)[0m [0]	eval-rmse:2.09867
[36m(my_xgboost_func pid=36563)[0m [1]	eval-rmse:2.02469
[36m(my_xgboost_func pid=36563)[0m [2]	eval-rmse:1.98430
[36m(my_xgboost_func pid=36563)[0m [3]	eval-rmse:1.96273
[36m(my_xgboost_func pid=36563)[0m [4]	eval-rmse:1.95101
[36m(my_xgboost_func pid=36563)[0m [5]	eval-rmse:1.94453
[36m(my_xgboost_func pid=36563)[0m [6]	eval-rmse:1.93984
[36m(my_xgboost_func pid=36563)[0m [7]	eval-rmse:1.93714
[36m(my_xgboost_func pid=36563)[0m [8]	eval-rmse:1.93584
[36m(my_xgboost_func pid=36563)[0m [9]	eval-rmse:1.93489
[36m(my_xgboost_func pid=36563)[0m OrderedDict([('rmse', [2.098670914531913, 2.0246899103885965, 1.9843037115534818, 1.962731811611558, 1.9510121628264807, 1.9445273273346526, 1.9398359707897646, 1.937144672172907, 1.9358370394237943, 1.9348927970478986])])
[36m(my_xgboost_func pid=36562)[0m [0]	eval-rmse:2.19870
[36m(my_xgboost_func pid=36562)[0m [1]	eval-rmse:2.16820
[36m(my_xgboost_func pid=36562)

2025-11-24 23:43:25,284	INFO tune.py:1009 -- Wrote the latest version of all result files and experiment state to '/Users/rbrosa/Documents/github_personal/intro-to-ray/intro/models/my_xgboost_func_2025-11-24_23-42-48' in 0.0110s.


[36m(my_xgboost_func pid=36570)[0m [7]	eval-rmse:2.16505
[36m(my_xgboost_func pid=36570)[0m [8]	eval-rmse:2.15768


2025-11-24 23:43:25,288	INFO tune.py:1041 -- Total run time: 30.24 seconds (29.29 seconds for the tuning loop).


{'objective': 'reg:squarederror', 'eval_metric': 'rmse', 'tree_method': 'hist', 'max_depth': 6, 'eta': 0.2807441797237359}


Here is a diagram that shows what Tune does:

It is effectively scheduling many trials and returning the best performing one.

Obs.: the standard Trial Scheduler is FIFO (First-in-First-Out), but it could be alternatives more complex

Obs.2: They don't mention how data memory is shared across nodes. Based on the image, I'd guess everything is replicated and the whole trainable is shared (but it's said they will deep diver in a another class)

<img src="https://bair.berkeley.edu/static/blog/tune/tune-arch-simple.png" width="700px" loading="lazy">

### 2.3. Distributed training with Ray Train

In case your training data is too large, your training might take a long time to complete.

To speed it up, shard the dataset across training workers and perform distributed XGBoost training.

Let's redefine `load_data` to now load a different slice of the data given the worker index/rank.

In [8]:
def load_data():
    # find out which training worker is running this code (it will return a different context for each worker)
    train_ctx = ray.train.get_context()
    worker_rank = train_ctx.get_world_rank()
    print(f"Loading data for worker {worker_rank}...")

    # build path based on training worker rank
    month = (worker_rank + 1) % 12
    year = 2021 + (worker_rank + 1) // 12
    path = f"s3://anyscale-public-materials/nyc-taxi-cab/yellow_tripdata_{year}-{month:02}.parquet"

    # same as before
    df = pd.read_parquet(path, columns=features + [label_column])
    X_train, X_test, y_train, y_test = train_test_split(
        df[features], df[label_column], test_size=0.2, random_state=42
    )
    dtrain = xgboost.DMatrix(X_train, label=y_train)
    dtest = xgboost.DMatrix(X_test, label=y_test)
    return dtrain, dtest

Now we can run distributed XGBoost training using Ray Train's XGBoostTrainer - similar trainers exist for other popular ML frameworks.

In [9]:
trainer = RayTrainXGBoostTrainer(  # Create a trainer
    my_xgboost_func,  # Pass it the training function
    scaling_config=ray.train.ScalingConfig(
        num_workers=2, use_gpu=False
    ),  # Define how many training workers
    train_loop_config=params,  # Pass it the hyperparameters
)

trainer.fit()  # Run the training job
# About the below output table:
# - Look how it starts just stating the available CPUs/GPUs in the cluster
# - Then it updates the status with resource usage update
# - Lastly, it starts showing the Trainer logs per pid (look how it outputs the rank of the worker)

2025-11-24 23:43:25,341	INFO tune.py:616 -- [output] This uses the legacy output and progress reporter, as Jupyter notebooks are not supported by the new engine, yet. For more information, please see https://github.com/ray-project/ray/issues/36949


[36m(my_xgboost_func pid=36570)[0m [9]	eval-rmse:2.15055
[36m(my_xgboost_func pid=36570)[0m OrderedDict([('rmse', [2.223591551472562, 2.214411887253386, 2.205532735129033, 2.196918552330912, 2.188576980134632, 2.1804829498832183, 2.1726352475098256, 2.165048942296986, 2.1576837930141624, 2.150549162838675])])
== Status ==
Current time: 2025-11-24 23:43:25 (running for 00:00:00.13)
Using FIFO scheduling algorithm.
Logical resource usage: 0/10 CPUs, 0/0 GPUs
Result logdir: /tmp/ray/session_2025-11-24_23-42-48_935530_35114/artifacts/2025-11-24_23-43-25/XGBoostTrainer_2025-11-24_23-43-25/driver_artifacts
Number of trials: 1/1 (1 PENDING)


== Status ==
Current time: 2025-11-24 23:43:30 (running for 00:00:05.21)
Using FIFO scheduling algorithm.
Logical resource usage: 3.0/10 CPUs, 0/0 GPUs
Result logdir: /tmp/ray/session_2025-11-24_23-42-48_935530_35114/artifacts/2025-11-24_23-43-25/XGBoostTrainer_2025-11-24_23-43-25/driver_artifacts
Number of trials: 1/1 (1 PENDING)




[36m(XGBoostTrainer pid=50388)[0m Started distributed worker processes: 
[36m(XGBoostTrainer pid=50388)[0m - (node_id=19a03c4045d3f6add6eae2f8895430ab6b9819b97d746aa3363a095e, ip=127.0.0.1, pid=53418) world_rank=0, local_rank=0, node_rank=0
[36m(XGBoostTrainer pid=50388)[0m - (node_id=19a03c4045d3f6add6eae2f8895430ab6b9819b97d746aa3363a095e, ip=127.0.0.1, pid=53414) world_rank=1, local_rank=1, node_rank=0


== Status ==
Current time: 2025-11-24 23:43:35 (running for 00:00:10.25)
Using FIFO scheduling algorithm.
Logical resource usage: 3.0/10 CPUs, 0/0 GPUs
Result logdir: /tmp/ray/session_2025-11-24_23-42-48_935530_35114/artifacts/2025-11-24_23-43-25/XGBoostTrainer_2025-11-24_23-43-25/driver_artifacts
Number of trials: 1/1 (1 RUNNING)


[36m(RayTrainWorker pid=53418)[0m Loading data for worker 0...


[36m(RayTrainWorker pid=53418)[0m [23:43:35] Task [xgboost.ray-rank=00000000]:791668978711e8736a1fd8c401000000 got rank 0


== Status ==
Current time: 2025-11-24 23:43:40 (running for 00:00:15.31)
Using FIFO scheduling algorithm.
Logical resource usage: 3.0/10 CPUs, 0/0 GPUs
Result logdir: /tmp/ray/session_2025-11-24_23-42-48_935530_35114/artifacts/2025-11-24_23-43-25/XGBoostTrainer_2025-11-24_23-43-25/driver_artifacts
Number of trials: 1/1 (1 RUNNING)


== Status ==
Current time: 2025-11-24 23:43:45 (running for 00:00:20.38)
Using FIFO scheduling algorithm.
Logical resource usage: 3.0/10 CPUs, 0/0 GPUs
Result logdir: /tmp/ray/session_2025-11-24_23-42-48_935530_35114/artifacts/2025-11-24_23-43-25/XGBoostTrainer_2025-11-24_23-43-25/driver_artifacts
Number of trials: 1/1 (1 RUNNING)




[36m(XGBoostTrainer pid=50388)[0m [23:43:47] [0]	eval-rmse:2.28346
[36m(XGBoostTrainer pid=50388)[0m [23:43:47] [1]	eval-rmse:2.25069
[36m(XGBoostTrainer pid=50388)[0m [23:43:47] [2]	eval-rmse:2.22460
[36m(RayTrainWorker pid=53414)[0m [23:43:35] Task [xgboost.ray-rank=00000001]:8e03f2ab335227788a0a0bf501000000 got rank 1
[36m(XGBoostTrainer pid=50388)[0m [23:43:47] [3]	eval-rmse:2.20430
[36m(XGBoostTrainer pid=50388)[0m [23:43:47] [4]	eval-rmse:2.18836
[36m(XGBoostTrainer pid=50388)[0m [23:43:47] [5]	eval-rmse:2.17259
[36m(XGBoostTrainer pid=50388)[0m [23:43:47] [6]	eval-rmse:2.15961
[36m(XGBoostTrainer pid=50388)[0m [23:43:47] [7]	eval-rmse:2.14910
[36m(XGBoostTrainer pid=50388)[0m [23:43:47] [8]	eval-rmse:2.14038
[36m(XGBoostTrainer pid=50388)[0m [23:43:47] [9]	eval-rmse:2.13346


[36m(RayTrainWorker pid=53418)[0m OrderedDict([('rmse', [np.float64(2.283455977036048), np.float64(2.2506874095991365), np.float64(2.2245956490092267), np.float64(2.204302370137811), np.float64(2.1883569504768654), np.float64(2.1725859682897077), np.float64(2.1596106275006597), np.float64(2.149095348540642), np.float64(2.140382007941743), np.float64(2.133460611893429)])])
[36m(RayTrainWorker pid=53414)[0m Loading data for worker 1...


2025-11-24 23:43:48,962	INFO tune.py:1009 -- Wrote the latest version of all result files and experiment state to '/Users/rbrosa/ray_results/XGBoostTrainer_2025-11-24_23-43-25' in 0.0043s.
2025-11-24 23:43:48,964	INFO tune.py:1041 -- Total run time: 23.62 seconds (23.61 seconds for the tuning loop).


Trial XGBoostTrainer_83ebc_00000 completed. Last result: 
== Status ==
Current time: 2025-11-24 23:43:48 (running for 00:00:23.61)
Using FIFO scheduling algorithm.
Logical resource usage: 3.0/10 CPUs, 0/0 GPUs
Result logdir: /tmp/ray/session_2025-11-24_23-42-48_935530_35114/artifacts/2025-11-24_23-43-25/XGBoostTrainer_2025-11-24_23-43-25/driver_artifacts
Number of trials: 1/1 (1 TERMINATED)




Result(
  metrics={},
  path='/Users/rbrosa/ray_results/XGBoostTrainer_2025-11-24_23-43-25/XGBoostTrainer_83ebc_00000_0_2025-11-24_23-43-25',
  filesystem='local',
  checkpoint=None
)

Here is a diagram that shows what Train does:

A train controller will create training workers and execute the training function on each worker.

Ray Train delegates the distributed training to the underlying XGBoost framework. (again, data is owned by each worker)

<img src="https://docs.ray.io/en/latest/_images/overview.png" width="700px" loading="lazy">

### 2.4 Serving an ensemble model with Ray Serve

Ray Serve allows for distributed serving of models and complex inference pipelines.

Here is a diagram showing how to deploy an ensemble model with Ray Serve:

<img src="https://images.ctfassets.net/xjan103pcp94/3DJ7vVRxYIvcFO7JmIUMCx/77a45caa275ffa46f5135f4d6726dd4f/Figure_2_-_Fanout_and_ensemble.png" width="700px" loading="lazy">

Here is how the resulting code looks like:

In [None]:
app = fastapi.FastAPI()

class Payload(BaseModel):
    passenger_count: int
    trip_distance: float
    fare_amount: float
    tolls_amount: float


## Decorator to deploy the ensemble model
# @ray.serve.deployment: decorator to deploy the ensemble model
@ray.serve.deployment
# @ray.serve.ingress(app): decorator to ingest the FastAPI app into the ray serve instance
@ray.serve.ingress(app)
class Ensemble:
    def __init__(self, model1, model2):
        self.model1 = model1
        self.model2 = model2

    @app.post("/predict")
    async def predict(self, data: Payload) -> dict:
        model1_prediction, model2_prediction = await asyncio.gather(
            self.model1.predict.remote([data.model_dump()]),
            self.model2.predict.remote([data.model_dump()]),
        )
        out = {"prediction": float(model1_prediction + model2_prediction) / 2}
        return out


@ray.serve.deployment
class Model:
    def __init__(self, path: str):
        self._model = xgboost.Booster()
        self._model.load_model(path)

    def predict(self, data: list[dict]) -> list[float]:
        # Make prediction
        dmatrix = xgboost.DMatrix(pd.DataFrame(data))
        model_prediction = self._model.predict(dmatrix)
        return model_prediction


# Run the deployment
handle = ray.serve.run(
    Ensemble.bind(
        model1=Model.bind(model_path),
        model2=Model.bind(model_path),
    ),
    route_prefix="/ensemble"
)

[36m(ProxyActor pid=62784)[0m INFO 2025-11-24 23:43:50,690 proxy 127.0.0.1 -- Proxy starting on node 19a03c4045d3f6add6eae2f8895430ab6b9819b97d746aa3363a095e (HTTP port: 8000).
[36m(ProxyActor pid=62784)[0m INFO 2025-11-24 23:43:50,753 proxy 127.0.0.1 -- Got updated endpoints: {}.
INFO 2025-11-24 23:43:50,801 serve 35114 -- Started Serve in namespace "serve".
[36m(ServeController pid=49514)[0m INFO 2025-11-24 23:43:50,870 controller 49514 -- Deploying new version of Deployment(name='Model', app='default') (initial target replicas: 1).
[36m(ServeController pid=49514)[0m INFO 2025-11-24 23:43:50,871 controller 49514 -- Deploying new version of Deployment(name='Model_1', app='default') (initial target replicas: 1).
[36m(ServeController pid=49514)[0m INFO 2025-11-24 23:43:50,872 controller 49514 -- Deploying new version of Deployment(name='Ensemble', app='default') (initial target replicas: 1).
[36m(ProxyActor pid=62784)[0m INFO 2025-11-24 23:43:50,876 proxy 127.0.0.1 -- Got up

Let's make an HTTP request to the Ray Serve instance.

In [11]:
requests.post(
    "http://localhost:8000/ensemble/predict",
    json={  # Use json parameter instead of params
        "passenger_count": 1,
        "trip_distance": 2.5,
        "fare_amount": 10.0,
        "tolls_amount": 0.5,
    },
).json()

{'prediction': 2.0076115131378174}

### 2.5 Batch inference with Ray Data

Ray Data allows for distributed data processing through streaming execution across a heterogeneous cluster of CPUs and GPUs.

This makes Ray Data ideal for workloads like compute-intensive data processing, data ingestion, and batch inference.

In [12]:
class OfflinePredictor:
    def __init__(self):
        # Load expensive state
        self._model = xgboost.Booster()
        self._model.load_model(model_path)

    def predict(self, data: list[dict]) -> list[float]:
        # Make prediction in batch
        dmatrix = xgboost.DMatrix(pd.DataFrame(data))
        model_prediction = self._model.predict(dmatrix)
        return model_prediction

    def __call__(self, batch: dict) -> dict:
        batch["predictions"] = self.predict(batch)
        return batch


# Apply the predictor to the validation dataset
prediction_pipeline = (
    ray.data.read_parquet(
        "s3://anyscale-public-materials/nyc-taxi-cab/yellow_tripdata_2021-03.parquet"
    )
    .select_columns(features)
    .map_batches(OfflinePredictor, concurrency=(2, 10))
)

[36m(ServeReplica:default:Model pid=63277)[0m INFO 2025-11-24 23:43:54,010 default_Model h65kr9da f2712793-2982-4e97-83d2-4eda2e924df0 -- CALL /ensemble/predict OK 5.6ms
[36m(ServeReplica:default:Model_1 pid=63284)[0m INFO 2025-11-24 23:43:54,013 default_Model_1 e8yj62gq f2712793-2982-4e97-83d2-4eda2e924df0 -- CALL /ensemble/predict OK 5.4ms
[36m(ServeReplica:default:Ensemble pid=63291)[0m INFO 2025-11-24 23:43:53,993 default_Ensemble 16tlnz25 f2712793-2982-4e97-83d2-4eda2e924df0 -- Started <ray.serve._private.router.SharedRouterLongPollClient object at 0x12e3517d0>.
[36m(ServeReplica:default:Ensemble pid=63291)[0m INFO 2025-11-24 23:43:54,015 default_Ensemble 16tlnz25 f2712793-2982-4e97-83d2-4eda2e924df0 -- POST /ensemble/predict 200 34.8ms


Parquet Files Sample 0:   0%|          | 0.00/1.00 [00:00<?, ? file/s]

After defining the pipeline, we can execute it in a distributed manner by writing the output to a sink

In [13]:
prediction_pipeline.write_parquet("./xgboost_predictions") #update this to your local path if runs on your local

2025-11-24 23:44:10,964	INFO logging.py:290 -- Registered dataset logger for dataset dataset_4_0
2025-11-24 23:44:10,977	INFO streaming_executor.py:117 -- Starting execution of Dataset dataset_4_0. Full logs are in /tmp/ray/session_2025-11-24_23-42-48_935530_35114/logs/ray-data
2025-11-24 23:44:10,977	INFO streaming_executor.py:118 -- Execution plan of Dataset dataset_4_0: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadParquet] -> ActorPoolMapOperator[Project->MapBatches(OfflinePredictor)] -> TaskPoolMapOperator[Write]


Running 0: 0.00 row [00:00, ? row/s]

- ReadParquet->SplitBlocks(100) 1: 0.00 row [00:00, ? row/s]

- Project->MapBatches(OfflinePredictor) 2: 0.00 row [00:00, ? row/s]

- Write 3: 0.00 row [00:00, ? row/s]

2025-11-24 23:45:01,158	INFO streaming_executor.py:220 -- ‚úîÔ∏è  Dataset dataset_4_0 execution finished in 50.18 seconds
2025-11-24 23:45:01,225	INFO dataset.py:4537 -- Data sink Parquet finished. 1925152 rows and 66.1MB data written.


Let's inspect the produced predictions.

In [14]:
!ls {storage_folder}/xgboost_predictions/

ls: /Users/rbrosa/Documents/github_personal/intro-to-ray/intro/models/xgboost_predictions/: No such file or directory


### 2.6 Clean up

In [15]:
# Run this cell for file cleanup 
!rm -rf {storage_folder}/xgboost_predictions/
!rm {model_path}

[36m(ServeReplica:default:Ensemble pid=63291)[0m INFO 2025-11-25 00:21:22,538 default_Ensemble 16tlnz25 69b644e2-55ea-4285-9e86-396a6175ec0d -- GET /ensemble 405 2.8ms
