# Scalable online XGBoost inference with Ray Serve

<div align="left">
<a target="_blank" href="https://console.anyscale.com/"><img src="https://img.shields.io/badge/🚀 Run_on-Anyscale-9hf"></a>&nbsp;
<a href="https://github.com/anyscale/e2e-xgboost" role="button"><img src="https://img.shields.io/static/v1?label=&amp;message=View%20On%20GitHub&amp;color=586069&amp;logo=github&amp;labelColor=2f363d"></a>&nbsp;
</div>

In this tutorial, we'll launch an online service that will:
- deploy our trained XGBoost model artifacts to generate predictions
- autoscale based on real-time incoming traffic
- cover observability and debugging around our service

Note that this notebook requires `notebook/01-Distributed_Training.ipynb` to run so that we can fetch the pre-trained model artifacts.

<div class="alert alert-block alert">

[Ray Serve](https://docs.ray.io/en/latest/serve/index.html) is a highly scalable and flexible model serving library for building online inference APIs.
- wrap our models and business logic as separate [serve deployments](https://docs.ray.io/en/latest/serve/key-concepts.html#deployment) and [connect](https://docs.ray.io/en/latest/serve/model_composition.html) them together (pipeline, ensemble, etc.)
- avoid one large service that is network and compute bounded (inefficient use of resources)
- utilize fractional heterogenous [resources](https://docs.ray.io/en/latest/serve/resource-allocation.html) (**not possible** with Sagemaker, Vertex, KServe, etc.) and horizontally scale (`num_replicas`)
- [autoscale](https://docs.ray.io/en/latest/serve/autoscaling-guide.html) up/down based on traffic
- integrations with [FastAPI and HTTP](https://docs.ray.io/en/latest/serve/http-guide.html)
- set up a [gRPC service](https://docs.ray.io/en/latest/serve/advanced-guides/grpc-guide.html#set-up-a-grpc-service) to build distributed systems and microservices.
- enable [dynamic batching](https://docs.ray.io/en/latest/serve/advanced-guides/dyn-req-batch.html) (based on batch size, time, etc.)
- suite of [utilities for serving LLMs](https://docs.ray.io/en/latest/serve/llm/serving-llms.html) (multi-lora, inference engine agnostic, etc.)

<img src="https://github.com/anyscale/e2e-xgboost/blob/main/images/ray_serve.png?raw=true" width=600>

In [None]:
%load_ext autoreload
%autoreload all

In [None]:
# enable loading of the dist_xgboost module
import os
import sys

sys.path.append(os.path.abspath(".."))

In [None]:
# Enable Ray Train v2
os.environ["RAY_TRAIN_V2_ENABLED"] = "1"
# now it's safe to import from ray.train

## Loading the Model

Next, we load the pre-trained preprocessor and XGBoost model from the MLFlow registry as we demonstrated in the validation notebook.

## Creating a Ray Serve Deployment

We'll now define our Ray Serve endpoint. We'll use a reusable class to avoid reloading the model and preprocessor for each request. Our deployment will support both Pythonic and HTTP requests.

In [None]:
import pandas as pd
import xgboost
from ray import serve
from starlette.requests import Request

from dist_xgboost.data import load_model_and_preprocessor


@serve.deployment(num_replicas=2, max_ongoing_requests=25, ray_actor_options={"num_cpus": 2})
class XGBoostModel:
    def __init__(self):
        self.preprocessor, self.model = load_model_and_preprocessor()

    @serve.batch(max_batch_size=16, batch_wait_timeout_s=0.1)
    async def predict_batch(self, input_data: list[dict]) -> list[float]:
        print(f"Batch size: {len(input_data)}")
        # Convert list of dictionaries to DataFrame
        input_df = pd.DataFrame(input_data)
        # Preprocess the input
        preprocessed_batch = self.preprocessor.transform_batch(input_df)
        # Create DMatrix for prediction
        dmatrix = xgboost.DMatrix(preprocessed_batch)
        # Get predictions
        predictions = self.model.predict(dmatrix)
        return predictions.tolist()

    async def __call__(self, request: Request):
        # Parse the request body as JSON
        input_data = await request.json()
        return await self.predict_batch(input_data)

<div class="alert alert-block alert"> <b>🧱 Model composition</b>

Ray Serve makes it extremely easy to do [model composition](https://docs.ray.io/en/latest/serve/model_composition.html) where we can compose multiple deployments containing ML models or business logic into a single application. And we can independently scale (even fractional resources) and configure each of our deployments.

<img src="https://raw.githubusercontent.com/anyscale/foundational-ray-app/refs/heads/main/images/serve_composition.png" width=800>

Let's ensure that we don't have any existing deployments first using [`serve.shutdown()`](https://docs.ray.io/en/latest/serve/api/doc/ray.serve.shutdown.html#ray.serve.shutdown):

In [None]:
if "default" in serve.status().applications and serve.status().applications["default"].status == "RUNNING":
    print("Shutting down existing serve application")
    serve.shutdown()

2025-04-16 21:35:03,819	INFO worker.py:1660 -- Connecting to existing Ray cluster at address: 10.0.23.200:6379...
2025-04-16 21:35:03,828	INFO worker.py:1843 -- Connected to Ray cluster. View the dashboard at [1m[32mhttps://session-1kebpylz8tcjd34p4sv2h1f9tg.i.anyscaleuserdata.com [39m[22m
2025-04-16 21:35:03,833	INFO packaging.py:367 -- Pushing file package 'gcs://_ray_pkg_dbf2a602028d604b4b1f9474b353f0574c4a48ce.zip' (0.08MiB) to Ray cluster...
2025-04-16 21:35:03,834	INFO packaging.py:380 -- Successfully pushed file package 'gcs://_ray_pkg_dbf2a602028d604b4b1f9474b353f0574c4a48ce.zip'.


Now that we've defined the deployment, we can create our `ray.serve.Application` using the [`.bind()`](https://docs.ray.io/en/latest/serve/api/doc/ray.serve.Deployment.html#ray.serve.Deployment) method:

In [None]:
# Define the app
xgboost_model = XGBoostModel.bind()

## Preparing Test Data

Let's prepare some example data to test our deployment. We'll use a sample from our hold-out set:

In [None]:
sample_input = {
    "mean radius": 14.9,
    "mean texture": 22.53,
    "mean perimeter": 102.1,
    "mean area": 685.0,
    "mean smoothness": 0.09947,
    "mean compactness": 0.2225,
    "mean concavity": 0.2733,
    "mean concave points": 0.09711,
    "mean symmetry": 0.2041,
    "mean fractal dimension": 0.06898,
    "radius error": 0.253,
    "texture error": 0.8749,
    "perimeter error": 3.466,
    "area error": 24.19,
    "smoothness error": 0.006965,
    "compactness error": 0.06213,
    "concavity error": 0.07926,
    "concave points error": 0.02234,
    "symmetry error": 0.01499,
    "fractal dimension error": 0.005784,
    "worst radius": 16.35,
    "worst texture": 27.57,
    "worst perimeter": 125.4,
    "worst area": 832.7,
    "worst smoothness": 0.1419,
    "worst compactness": 0.709,
    "worst concavity": 0.9019,
    "worst concave points": 0.2475,
    "worst symmetry": 0.2866,
    "worst fractal dimension": 0.1155,
}
sample_target = 0  # Ground truth label

## Running the Service

There are two ways to run a Ray Serve service:

1) **Serve API**:  use the [`serve run`](https://docs.ray.io/en/latest/serve/getting_started.html#running-a-ray-serve-application) CLI command, e.g. `serve run tutorial:xgboost_model`
2) **Pythonic API**: use `ray.serve`'s [`serve.run` command](https://docs.ray.io/en/latest/serve/api/doc/ray.serve.run.html#ray.serve.run), e.g. `serve.run(xgboost_model)`.

For this example, we'll use the Pythonic API:

In [None]:
from ray.serve.handle import DeploymentHandle

handle: DeploymentHandle = serve.run(xgboost_model, name="xgboost-breast-cancer-classifier")

INFO 2025-04-16 21:35:08,246 serve 30790 -- Started Serve in namespace "serve".
INFO 2025-04-16 21:35:13,363 serve 30790 -- Application 'xgboost-breast-cancer-classifier' is ready at http://127.0.0.1:8000/.


[36m(ProxyActor pid=31032)[0m INFO 2025-04-16 21:35:08,167 proxy 10.0.23.200 -- Proxy starting on node dc30e171b93f61245644ba4d0147f8b27f64e9e1eaf34d1bb63c9c99 (HTTP port: 8000).
[36m(ProxyActor pid=31032)[0m INFO 2025-04-16 21:35:08,226 proxy 10.0.23.200 -- Got updated endpoints: {}.
[36m(ServeController pid=30973)[0m INFO 2025-04-16 21:35:08,307 controller 30973 -- Deploying new version of Deployment(name='XGBoostModel', app='xgboost-breast-cancer-classifier') (initial target replicas: 2).
[36m(ProxyActor pid=31032)[0m INFO 2025-04-16 21:35:08,310 proxy 10.0.23.200 -- Got updated endpoints: {Deployment(name='XGBoostModel', app='xgboost-breast-cancer-classifier'): EndpointInfo(route='/', app_is_cross_language=False)}.
[36m(ProxyActor pid=31032)[0m INFO 2025-04-16 21:35:08,323 proxy 10.0.23.200 -- Started <ray.serve._private.router.SharedRouterLongPollClient object at 0x77864005ee70>.
[36m(ServeController pid=30973)[0m INFO 2025-04-16 21:35:08,411 controller 30973 -- Adding

We should see some logs indicating that the service is running locally:

```bash
INFO 2025-04-09 14:06:55,760 serve 31684 -- Started Serve in namespace "serve".
INFO 2025-04-09 14:06:57,875 serve 31684 -- Application 'default' is ready at http://127.0.0.1:8000/.
```

We can also check whether it is running using [`serve.status()`](https://docs.ray.io/en/latest/serve/api/doc/ray.serve.status.html#ray.serve.status):

In [None]:
serve.status().applications["xgboost-breast-cancer-classifier"].status == "RUNNING"

True

## Querying the Service

### Using HTTP
The most common way to query services is via an HTTP request. This invokes the `__call__` method we defined earlier:

In [None]:
import requests

url = "http://127.0.0.1:8000/"

prediction = requests.post(url, json=sample_input).json()

print(f"Prediction: {prediction:.4f}")
print(f"Ground truth: {sample_target}")

Prediction: 0.0503
Ground truth: 0


This is fine to process an individual query, but not if you have many queries. This is because `requests.post` is a blocking call, so if you run it in a for loop you will never benefit from Ray Serve's dyanmic batching.

Instead, you want to fire all your requests concurrently and let Ray Serve buffer and batch process them. You can accomplish this with `aiohttp`:
Instead, you want to fire many requests concurrently using asynchronous requests and let Ray Serve buffer and batch process them. You can accomplish this with `aiohttp`:

In [None]:
import asyncio

import aiohttp


async def fetch(session, url, data):
    async with session.post(url, json=data) as response:
        return await response.json()


async def fetch_all(requests: list):
    async with aiohttp.ClientSession() as session:
        tasks = [fetch(session, url, input_item) for input_item in requests]
        responses = await asyncio.gather(*tasks)
        return responses

[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4874, ip=10.0.240.129)[0m Batch size: 1


[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4874, ip=10.0.240.129)[0m INFO 2025-04-16 21:35:13,834 xgboost-breast-cancer-classifier_XGBoostModel ep2o1d1x 0ddcd27d-d671-4365-b7e3-6e4cae856d9b -- POST / 200 117.8ms
[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4875, ip=10.0.240.129)[0m INFO 2025-04-16 21:35:14,352 xgboost-breast-cancer-classifier_XGBoostModel cxd4bxd1 aeb83339-359a-41e2-99c4-4ab06252d0b9 -- POST / 200 94.7ms
[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4875, ip=10.0.240.129)[0m INFO 2025-04-16 21:35:14,353 xgboost-breast-cancer-classifier_XGBoostModel cxd4bxd1 8c80adfd-2033-41d3-a718-aecbd5bcb996 -- POST / 200 93.9ms
[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4875, ip=10.0.240.129)[0m INFO 2025-04-16 21:35:14,354 xgboost-breast-cancer-classifier_XGBoostModel cxd4bxd1 7ed45f79-c665-4a17-94f7-6d02c56ab504 -- POST / 200 93.8ms
[36m(ServeReplica:xgboost-breast-cancer-classi

In [None]:
sample_input_list = [sample_input] * 100

# notebook is already running an asyncio event loop in background, so use `await`
# in other cases, you would use `asyncio.run(fetch_all(sample_input_list))`
responses = await fetch_all(sample_input_list)
print(f"Finished processing {len(responses)} queries. Example result: {responses[0]}")

[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4875, ip=10.0.240.129)[0m Batch size: 16


Finished processing 100 queries. Example result: 0.05025313049554825


[36m(ProxyActor pid=5012, ip=10.0.240.129)[0m INFO 2025-04-16 21:35:14,555 proxy 10.0.240.129 -- Got updated endpoints: {Deployment(name='XGBoostModel', app='xgboost-breast-cancer-classifier'): EndpointInfo(route='/', app_is_cross_language=False)}.
[36m(ProxyActor pid=5012, ip=10.0.240.129)[0m INFO 2025-04-16 21:35:14,576 proxy 10.0.240.129 -- Started <ray.serve._private.router.SharedRouterLongPollClient object at 0x7835f2b9acc0>.
[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4874, ip=10.0.240.129)[0m INFO 2025-04-16 21:35:14,619 xgboost-breast-cancer-classifier_XGBoostModel ep2o1d1x 24933cc1-07b4-4680-bb84-adcd54ff2de3 -- POST / 200 139.5ms
[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4874, ip=10.0.240.129)[0m INFO 2025-04-16 21:35:14,620 xgboost-breast-cancer-classifier_XGBoostModel ep2o1d1x 15167894-ceac-4464-bbb6-0556c8299d8a -- POST / 200 138.3ms
[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4874, ip=10.0.

### Using Python

For a more direct Pythonic way to query the model, you can use the deployment handle:

In [None]:
response = await handle.predict_batch.remote(sample_input)
print(response)

INFO 2025-04-16 21:35:14,803 serve 30790 -- Started <ray.serve._private.router.SharedRouterLongPollClient object at 0x7156ffcf6d80>.


[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4874, ip=10.0.240.129)[0m Batch size: 11
[36m(ServeReplica:xgboost-breast-cancer-classifier:XGBoostModel pid=4875, ip=10.0.240.129)[0m Batch size: 1


0.05025313049554825


This approach is useful if you need to interact with the service from a different process in the same Ray Cluster. If you need to regenerate the serve handle, you can use [`serve.get_deployment_handle`](https://docs.ray.io/en/latest/serve/api/doc/ray.serve.get_deployment_handle.html):

`handle = serve.get_deployment_handle("XGBoostModel", "xgboost-breast-cancer-classifier")`

<div class="alert alert-block alert"> <b>🔎 Observability for Services</b>

Observability for Ray Serve applications are automatically captured in the Ray dashboard and specifically the [Serve view](https://docs.ray.io/en/latest/ray-observability/getting-started.html#serve-view). Here we can view our service [deployments and their replicas](https://docs.ray.io/en/latest/serve/key-concepts.html#serve-key-concepts-deployment) and time-series metrics to see our service's health.

<img src="https://raw.githubusercontent.com/anyscale/e2e-xgboost/refs/heads/main/images/serve_dashboard.png" width=800>

In [None]:
# Shutdown service
serve.shutdown()

<div class="alert alert-block alert"> <b>Anyscale Services</b>

[Anyscale Services](https://docs.anyscale.com/platform/services/) ([API ref](https://docs.anyscale.com/reference/service-api/)) offers an extremely fault tolerant, scalable and optimized way to serve our Ray Serve applications.
- we can [rollout and update](https://docs.anyscale.com/platform/services/update-a-service) our services with canary deployment (zero-downtime upgrades)
- [monitor](https://docs.anyscale.com/platform/services/monitoring) our Services through a dedicated Service page, unified log viewer, tracing, set up alerts, etc.
- scale a service (`num_replicas=auto`) and utilize replica compaction to consolidate nodes that are fractionally utilized
- [head node fault tolerance](https://docs.anyscale.com/platform/services/production-best-practices#head-node-ft) (OSS Ray recovers from failed workers and replicas but not head node crashes)
- serving [muliple applications](https://docs.anyscale.com/platform/services/multi-app) in a single Service

<img src="https://raw.githubusercontent.com/anyscale/e2e-xgboost/refs/heads/main/images/canary.png" width=1000>

[RayTurbo Serve](https://docs.anyscale.com/rayturbo/rayturbo-serve) on Anyscale has even more functionality on top of Ray Serve:
- **fast autoscaling and model loading** to get our services up and running even faster ([5x improvements](https://www.anyscale.com/blog/autoscale-large-ai-models-faster) even for LLMs)
- 54% **higher QPS** and up-to 3x **streaming tokens per second** for high traffic serving use-cases (no proxy bottlenecks)
- **replica compaction** into fewer nodes where possible to reduce resource fragmentation and improve hardware utilization
- **zero-downtime** [incremental rollouts](https://docs.anyscale.com/platform/services/update-a-service/#resource-constrained-updates) so your service is never interrupted
- [**different environments**](https://docs.anyscale.com/platform/services/multi-app/#multiple-applications-in-different-containers) for each service in a multi-serve application
- **multi availability-zone** aware scheduling of Ray Serve replicas to provide higher redundancy to availability zone failures



**Note**: 
- we're using a `containerfile` to define our dependencies, but we could easily use a pre-built image as well.
- we can specify the compute as a [compute config](https://docs.anyscale.com/configuration/compute-configuration/) or inline in a [Service config](https://docs.anyscale.com/reference/service-api/) file.
- when we don't specify compute and when launching from a workspace, this defaults to the compute configuration of the Workspace.

In [None]:
from dist_xgboost.constants import root_dir

os.environ["WORKING_DIR"] = root_dir

In [None]:
%%bash
# Production online service
anyscale service deploy dist_xgboost.serve:xgboost_model --name=xgboost-breast_cancer_all_features \
  --containerfile="${WORKING_DIR}/containerfile" \
  --working-dir="${WORKING_DIR}" \
  --exclude=""

(anyscale +1.4s) Starting new service 'xgboost-breast_cancer_all_features'.
(anyscale +2.4s) Building image. View it in the UI: https://console.anyscale.com/v2/container-images/apt_gdm4p6u38va8itd2rvpxclm9ms/versions/bld_q2a3b4eb3s4cns7qpu4bnr8eun
(anyscale +33m43.2s) Waiting for image build to complete. Elapsed time: 1938 seconds.
(anyscale +33m43.2s) Image build succeeded.
(anyscale +33m44.4s) Uploading local dir '/home/ray/default/e2e-xgboost' to cloud storage.
(anyscale +33m45.4s) Including workspace-managed pip dependencies.
(anyscale +33m45.8s) Service 'xgboost-breast_cancer_all_features' deployed (version ID: b8vzznu8).
(anyscale +33m45.8s) View the service in the UI: 'https://console.anyscale.com/services/service2_i7ku1lh6ahp49vj6aztaa4w1hp'
(anyscale +33m45.8s) Query the service once it's running using the following curl command (add the path you want to query):
(anyscale +33m45.8s) curl -H "Authorization: Bearer tXhmYYY7qMbrb1ToO9_J3n5_kD7ym7Nirs8djtip7P0" https://xgboost-bre

Your service is now in production! In the process, Anyscale created and saved a container image to enable fast starting this service in the future.

The link to your endpoint and your bearer token should be in the logs. Now that the service is running remotely, we will need to use the bearer token to query it. Here's how you would modify the `requests` code above to use this token:

```python
# Service specific config (replace with your own values from the above logs)
base_url = "https://xgboost-breast-cancer-all-features-jgz99.cld-kvedzwag2qa8i5bj.s.anyscaleuserdata.com"
token = "tXhmYYY7qMbrb1ToO9_J3n5_kD7ym7Nirs8djtip7P0"

# Requests config
path = "/"
full_url = f"{base_url}{path}"
headers = {"Authorization": f"Bearer {token}"}

prediction = requests.post(url, json=sample_input, headers=headers).json()
```

In [None]:
%%bash
# Terminate service
anyscale service terminate --name e2e-xgboost

(anyscale +1.8s) Service service2_9ucj98xf7yq9uvleyatqrbu2l1 terminate initiated.
(anyscale +1.8s) View the service in the UI at https://console.anyscale.com/services/service2_9ucj98xf7yq9uvleyatqrbu2l1


<div class="alert alert-block alert"> <b>CI/CD</b>

While Anyscale [Jobs](https://docs.anyscale.com/platform/jobs/) and [Services](https://docs.anyscale.com/platform/services/) are great atomic concepts that help us productionize our workloads, they're also great for nodes in a larger ML DAG or [CI/CD workflow](https://docs.anyscale.com/ci-cd/). You can chain Jobs together, store results and then serve your application with those artifacts. And from there, you can trigger updates to your service (and retrigger the Jobs) based on events, time, etc.  And while we can simply use the Anyscale CLI to integrate with any orchestration platform, Anyscale does support some purpose-built integrations ([Airflow](https://docs.anyscale.com/ci-cd/apache-airflow/), [Prefect](https://github.com/anyscale/prefect-anyscale)). 

<img src="https://raw.githubusercontent.com/anyscale/e2e-xgboost/refs/heads/main/images/cicd.png" width=700>

