In [None]:
!pip install "ray[air]" xgboost xgboost-ray scikit-learn

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting xgboost-ray
  Downloading xgboost_ray-0.1.16-py3-none-any.whl (139 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m139.3/139.3 kB[0m [31m3.6 MB/s[0m eta [36m0:00:00[0m
Installing collected packages: xgboost-ray
Successfully installed xgboost-ray-0.1.16


In [None]:
import ray
import pandas as pd
from sklearn.datasets import load_breast_cancer
from sklearn.model_selection import train_test_split

from ray.train.xgboost import XGBoostTrainer
from ray.air.config import ScalingConfig
from ray.data.preprocessors import StandardScaler

data_raw = load_breast_cancer()
dataset_df = pd.DataFrame(data_raw["data"], columns=data_raw["feature_names"])
dataset_df["target"] = data_raw["target"]
train_df, test_df = train_test_split(dataset_df, test_size=0.3)
train_dataset = ray.data.from_pandas(train_df)
valid_dataset = ray.data.from_pandas(test_df)
test_dataset = ray.data.from_pandas(test_df.drop("target", axis=1))

# Define preprocessor
columns_to_scale = ["mean radius", "mean texture"]
preprocessor = StandardScaler(columns=columns_to_scale)

# Define trainer
trainer = XGBoostTrainer(
    scaling_config=ScalingConfig(num_workers=1),
    label_column="target",
    params={
    "tree_method": "approx",
    "objective": "binary:logistic",
    "eval_metric": ["logloss", "error"],
    "max_depth": 2,
},
    datasets={"train": train_dataset, "valid": valid_dataset},
    preprocessor=preprocessor,
    num_boost_round=5,
)

result = trainer.fit()

2023-05-25 12:47:03,730	INFO worker.py:1616 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m
  tuner = Tuner(


0,1
Current time:,2023-05-25 12:47:32
Running for:,00:00:19.50
Memory:,1.8/12.7 GiB

Trial name,status,loc,iter,total time (s),train-logloss,train-error,valid-logloss
XGBoostTrainer_422f8_00000,TERMINATED,172.28.0.12:2536,6,16.2632,0.172102,0.0276382,0.257809


[2m[36m(XGBoostTrainer pid=2536)[0m 2023-05-25 12:47:16,335	INFO streaming_executor.py:83 -- Executing DAG InputDataBuffer[Input] -> AllToAllOperator[Aggregate]
[2m[36m(XGBoostTrainer pid=2536)[0m 2023-05-25 12:47:16,336	INFO streaming_executor.py:84 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)


(pid=2536) - Aggregate 1:   0%|          | 0/1 [00:00<?, ?it/s]

(pid=2536) SortSample 2:   0%|          | 0/1 [00:00<?, ?it/s]

(pid=2536) ShuffleMap 3:   0%|          | 0/1 [00:00<?, ?it/s]

(pid=2536) ShuffleReduce 4:   0%|          | 0/1 [00:00<?, ?it/s]

(pid=2536) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

[2m[36m(XGBoostTrainer pid=2536)[0m 2023-05-25 12:47:18,778	INFO streaming_executor.py:83 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[StandardScaler]
[2m[36m(XGBoostTrainer pid=2536)[0m 2023-05-25 12:47:18,855	INFO streaming_executor.py:84 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)


(pid=2536) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

[2m[36m(XGBoostTrainer pid=2536)[0m 2023-05-25 12:47:20,773	INFO streaming_executor.py:83 -- Executing DAG InputDataBuffer[Input] -> TaskPoolMapOperator[StandardScaler]
[2m[36m(XGBoostTrainer pid=2536)[0m 2023-05-25 12:47:20,773	INFO streaming_executor.py:84 -- Execution config: ExecutionOptions(resource_limits=ExecutionResources(cpu=None, gpu=None, object_store_memory=None), locality_with_output=False, preserve_order=False, actor_locality_enabled=True, verbose_progress=False)


(pid=2536) Running 0:   0%|          | 0/1 [00:00<?, ?it/s]

[2m[36m(XGBoostTrainer pid=2536)[0m 2023-05-25 12:47:29,641	INFO tracker.py:218 -- start listen on 172.28.0.12:32955
[2m[36m(XGBoostTrainer pid=2536)[0m 2023-05-25 12:47:29,796	INFO tracker.py:382 -- @tracker All of 1 nodes getting started
[2m[36m(_RemoteRayXGBoostActor pid=2730)[0m [12:47:29] task [xgboost.ray]:140114790180480 got new rank 0
[2m[36m(XGBoostTrainer pid=2536)[0m 2023-05-25 12:47:31,358	INFO tracker.py:388 -- @tracker All nodes finishes job


Trial name,date,done,experiment_tag,hostname,iterations_since_restore,node_ip,pid,should_checkpoint,time_since_restore,time_this_iter_s,time_total_s,timestamp,train-error,train-logloss,training_iteration,trial_id,valid-error,valid-logloss
XGBoostTrainer_422f8_00000,2023-05-25_12-47-32,True,0,6c47e2c6e626,6,172.28.0.12,2536,True,16.2632,0.145307,16.2632,1685018852,0.0276382,0.172102,6,422f8_00000,0.0701754,0.257809


2023-05-25 12:47:32,674	INFO tune.py:945 -- Total run time: 24.93 seconds (19.50 seconds for the tuning loop).


The following block serves a Ray AIR model from a [checkpoint](air-checkpoint-ref), using the built-in [`XGBoostPredictor`](ray.train.xgboost.XGBoostPredictor).

In [None]:
from ray.train.xgboost import XGBoostPredictor
from ray import serve
from ray.serve import PredictorDeployment
from ray.serve.http_adapters import pandas_read_json

serve.run(
    PredictorDeployment.options(name="XGBoostService").bind(
        XGBoostPredictor, result.checkpoint, http_adapter=pandas_read_json
    )
)

[2m[36m(ServeController pid=60981)[0m INFO 2022-06-02 19:31:52,825 controller 60981 checkpoint_path.py:17 - Using RayInternalKVStore for controller checkpoint and recovery.
[2m[36m(ServeController pid=60981)[0m INFO 2022-06-02 19:31:52,828 controller 60981 http_state.py:115 - Starting HTTP proxy with name 'SERVE_CONTROLLER_ACTOR:SERVE_PROXY_ACTOR-node:127.0.0.1-0' on node 'node:127.0.0.1-0' listening on '127.0.0.1:8000'
[2m[36m(HTTPProxyActor pid=60984)[0m INFO:     Started server process [60984]
[2m[36m(ServeController pid=60981)[0m INFO 2022-06-02 19:31:55,191 controller 60981 deployment_state.py:1221 - Adding 1 replicas to deployment 'XGBoostService'.


Let's send a request through HTTP.

In [None]:
import requests

sample_input = test_dataset.take(1)
sample_input = dict(sample_input[0])

output = requests.post("http://localhost:8000/", json=[sample_input]).json()
print(output)

[{'predictions': 0.1142289936542511}]


[2m[36m(HTTPProxyActor pid=60984)[0m INFO 2022-06-02 19:32:00,604 http_proxy 127.0.0.1 http_proxy.py:320 - POST /XGBoostService 307 5.4ms
[2m[36m(XGBoostService pid=60988)[0m INFO 2022-06-02 19:32:00,603 XGBoostService XGBoostService#LOYoUm replica.py:484 - HANDLE __call__ OK 0.3ms
[2m[36m(HTTPProxyActor pid=60984)[0m INFO 2022-06-02 19:32:00,658 http_proxy 127.0.0.1 http_proxy.py:320 - POST /XGBoostService 200 49.8ms
[2m[36m(XGBoostService pid=60988)[0m INFO 2022-06-02 19:32:00,656 XGBoostService XGBoostService#LOYoUm replica.py:484 - HANDLE __call__ OK 46.8ms


## 1. Predictor accepting NumPy array
We'll use a simple predictor implementation that adds an increment to an input array.

In [None]:
import numpy as np

from ray.train.predictor import Predictor
from ray.air.checkpoint import Checkpoint

class AdderPredictor(Predictor):
    """Dummy predictor that increments input by a staic value."""
    def __init__(self, increment: int):
        self.increment = increment
    
    @classmethod
    def from_checkpoint(cls, ckpt: Checkpoint):
        """Create predictor from checkpoint.
        
        Args:
          ckpt: The AIR checkpoint representing a single dictionary. The dictionary
              should have key `increment` and an integer value.
        """
        return cls(ckpt.to_dict()["increment"])
    
    def predict(self, inp: np.ndarray) -> np.ndarray:
        return inp + self.increment

Let's first test it locally.

In [None]:
local_checkpoint = Checkpoint.from_dict({"increment": 2})
local_predictor = AdderPredictor.from_checkpoint(local_checkpoint)
assert local_predictor.predict(np.array([40])) == np.array([42])

In [None]:
from ray import serve
from ray.serve import PredictorDeployment

# Deploy the model behind HTTP endpoint
serve.run(
    PredictorDeployment.options(name="Adder").bind(
        predictor_cls=AdderPredictor,
        checkpoint=local_checkpoint
    )
)

[2m[36m(ServeController pid=60981)[0m INFO 2022-06-02 19:32:07,559 controller 60981 deployment_state.py:1221 - Adding 1 replicas to deployment 'Adder'.


After the model has been deployed, let's send an HTTP request.

In [None]:
import requests
resp = requests.post("http://localhost:8000/", json={"array": [40]})
resp.raise_for_status()
resp.json()

[42.0]

[2m[36m(HTTPProxyActor pid=60984)[0m INFO 2022-06-02 19:32:18,864 http_proxy 127.0.0.1 http_proxy.py:320 - POST /Adder 200 18.0ms
[2m[36m(Adder pid=60999)[0m INFO 2022-06-02 19:32:18,863 Adder Adder#aqYgDS replica.py:484 - HANDLE __call__ OK 13.1ms


## 2. Predictor accepting Pandas DataFrame
Let's now take a look at a predictor accepting dataframe inputs. We'll perform some simple column-wise transformations on the input data.

In [None]:
import pandas as pd


class DataFramePredictor(Predictor):
    """Dummy predictor that first multiplies input then increment it."""
    def __init__(self, increment: int):
        self.increment = increment

    @classmethod
    def from_checkpoint(cls, ckpt: Checkpoint):
        return cls(ckpt.to_dict()["increment"])

    def predict(self, inp: pd.DataFrame) -> pd.DataFrame:
        inp["prediction"] =  inp["base"] * inp["multiplier"] + self.increment
        return inp

local_df_predictor = DataFramePredictor.from_checkpoint(local_checkpoint)

In [None]:
from ray.serve.http_adapters import pandas_read_json

serve.run(
    PredictorDeployment.options(name="DataFramePredictor").bind(
        predictor_cls=DataFramePredictor,
        checkpoint=local_checkpoint,
        http_adapter=pandas_read_json
    )
)

[2m[36m(ServeController pid=60981)[0m INFO 2022-06-02 19:32:24,396 controller 60981 deployment_state.py:1221 - Adding 1 replicas to deployment 'DataFramePredictor'.


Let's send a request to our endpoint. 

In [None]:
resp = requests.post(
    "http://localhost:8000/",
    json=[{"base": 1, "multiplier": 2}, {"base": 3, "multiplier": 4}],
    params={"orient": "records"},
)
resp.raise_for_status()
resp.text

'[{"base":1,"multiplier":2,"prediction":4},{"base":3,"multiplier":4,"prediction":14}]'

[2m[36m(HTTPProxyActor pid=60984)[0m INFO 2022-06-02 19:32:28,751 http_proxy 127.0.0.1 http_proxy.py:320 - POST /DataFramePredictor 200 21.0ms
[2m[36m(DataFramePredictor pid=61006)[0m INFO 2022-06-02 19:32:28,750 DataFramePredictor DataFramePredictor#IJcHCI replica.py:484 - HANDLE __call__ OK 17.2ms


In [None]:
def our_own_http_adapter(base: int, multiplier: int):
    return pd.DataFrame([{"base": base, "multiplier": multiplier}])

Let's deploy it.

In [None]:
from ray.serve.http_adapters import pandas_read_json

serve.run(
    PredictorDeployment.options(name="DataFramePredictor").bind(
        predictor_cls=DataFramePredictor,
        checkpoint=local_checkpoint,
        http_adapter=our_own_http_adapter
    )
)

[2m[36m(ServeController pid=60981)[0m INFO 2022-06-02 19:33:31,010 controller 60981 deployment_state.py:1180 - Stopping 1 replicas of deployment 'DataFramePredictor' with outdated versions.
[2m[36m(ServeController pid=60981)[0m INFO 2022-06-02 19:33:33,165 controller 60981 deployment_state.py:1221 - Adding 1 replicas to deployment 'DataFramePredictor'.


Let's now send a request. Note that the new predictor accepts our specified input via HTTP parameters. 

The equivalent curl request would be `curl -X POST http://localhost:8000/DataFramePredictor/?base=10&multiplier=4`.

In [None]:
resp = requests.post(
    "http://localhost:8000/",
    params={"base": 10, "multiplier": 4}
)
resp.raise_for_status()
resp.text

'[{"base":10,"multiplier":4,"prediction":42}]'

[2m[36m(HTTPProxyActor pid=60984)[0m INFO 2022-06-02 19:33:36,070 http_proxy 127.0.0.1 http_proxy.py:320 - POST /DataFramePredictor 200 21.6ms
[2m[36m(DataFramePredictor pid=61037)[0m INFO 2022-06-02 19:33:36,069 DataFramePredictor DataFramePredictor#QzQiec replica.py:484 - HANDLE __call__ OK 17.5ms


In [None]:
import time
class BatchSizePredictor(Predictor):
    @classmethod
    def from_checkpoint(cls, _: Checkpoint):
        return cls()
    
    def predict(self, inp: np.ndarray):
        time.sleep(0.5) # simulate model inference.
        return [(i, len(inp), inp) for i in inp]

In [None]:
serve.run(
    PredictorDeployment.options(name="BatchSizePredictor").bind(
        predictor_cls=BatchSizePredictor,
        checkpoint=local_checkpoint,
    )
)

[2m[36m(ServeController pid=60981)[0m INFO 2022-06-02 19:33:39,305 controller 60981 deployment_state.py:1221 - Adding 1 replicas to deployment 'BatchSizePredictor'.


Let's use a threadpool executor to send ten requests at the same time to simulate multiple clients.

In [None]:
from concurrent.futures import ThreadPoolExecutor, wait

with ThreadPoolExecutor() as pool:
    futs = [
        pool.submit(
            requests.post,
            "http://localhost:8000/",
            json={"array": [i]},
        )
        for i in range(10)
    ]
    wait(futs)
for fut in futs:
    i, batch_size, batch_group = fut.result().json()
    print(f"Request id: {i} is part of batch group: {batch_group}, with batch size {batch_size}")

[2m[36m(HTTPProxyActor pid=60984)[0m INFO 2022-06-02 19:33:43,141 http_proxy 127.0.0.1 http_proxy.py:320 - POST /BatchSizePredictor 200 525.9ms
[2m[36m(BatchSizePredictor pid=61041)[0m INFO 2022-06-02 19:33:43,139 BatchSizePredictor BatchSizePredictor#QQPBXh replica.py:484 - HANDLE __call__ OK 519.1ms
[2m[36m(HTTPProxyActor pid=60984)[0m INFO 2022-06-02 19:33:43,647 http_proxy 127.0.0.1 http_proxy.py:320 - POST /BatchSizePredictor 200 1030.2ms
[2m[36m(BatchSizePredictor pid=61041)[0m INFO 2022-06-02 19:33:43,645 BatchSizePredictor BatchSizePredictor#QQPBXh replica.py:484 - HANDLE __call__ OK 1013.6ms
[2m[36m(BatchSizePredictor pid=61041)[0m INFO 2022-06-02 19:33:44,155 BatchSizePredictor BatchSizePredictor#QQPBXh replica.py:484 - HANDLE __call__ OK 1015.0ms
[2m[36m(BatchSizePredictor pid=61041)[0m INFO 2022-06-02 19:33:44,155 BatchSizePredictor BatchSizePredictor#QQPBXh replica.py:484 - HANDLE __call__ OK 511.8ms
[2m[36m(BatchSizePredictor pid=61041)[0m INFO 2022-0

Request id: [0.0] is part of batch group: [[3.0], [0.0], [4.0], [7.0]], with batch size 4
Request id: [1.0] is part of batch group: [[1.0]], with batch size 1
Request id: [2.0] is part of batch group: [[2.0]], with batch size 1
Request id: [3.0] is part of batch group: [[3.0], [0.0], [4.0], [7.0]], with batch size 4
Request id: [4.0] is part of batch group: [[3.0], [0.0], [4.0], [7.0]], with batch size 4
Request id: [5.0] is part of batch group: [[6.0], [5.0], [9.0]], with batch size 3
Request id: [6.0] is part of batch group: [[6.0], [5.0], [9.0]], with batch size 3
Request id: [7.0] is part of batch group: [[3.0], [0.0], [4.0], [7.0]], with batch size 4
Request id: [8.0] is part of batch group: [[8.0]], with batch size 1
Request id: [9.0] is part of batch group: [[6.0], [5.0], [9.0]], with batch size 3


[2m[36m(HTTPProxyActor pid=60984)[0m INFO 2022-06-02 19:33:45,167 http_proxy 127.0.0.1 http_proxy.py:320 - POST /BatchSizePredictor 200 2539.1ms
[2m[36m(BatchSizePredictor pid=61041)[0m INFO 2022-06-02 19:33:45,165 BatchSizePredictor BatchSizePredictor#QQPBXh replica.py:484 - HANDLE __call__ OK 1516.7ms


As you can see, some of the requests are part of a bigger group that's run together.

You can also configure the exact details of batching parameters:
- `max_batch_size(int)`: the maximum batch size that will be executed in one call to predict.
- `batch_wait_timeout_s (float)`: the maximum duration to wait for `max_batch_size` elements before running the predict call.

Let's set a `max_batch_size` of 10 to group our requests into the same batch.

In [None]:
serve.run(
    PredictorDeployment.options(name="BatchSizePredictor").bind(
        predictor_cls=BatchSizePredictor,
        checkpoint=local_checkpoint,
        batching_params={"max_batch_size": 10, "batch_wait_timeout_s": 5}
    )
)

[2m[36m(ServeController pid=60981)[0m INFO 2022-06-02 19:33:47,081 controller 60981 deployment_state.py:1180 - Stopping 1 replicas of deployment 'BatchSizePredictor' with outdated versions.
[2m[36m(ServeController pid=60981)[0m INFO 2022-06-02 19:33:49,234 controller 60981 deployment_state.py:1221 - Adding 1 replicas to deployment 'BatchSizePredictor'.


Let's call them again! You should see all ten requests executed as part of the same group.

In [None]:
from concurrent.futures import ThreadPoolExecutor, wait

with ThreadPoolExecutor() as pool:
    futs = [
        pool.submit(
            requests.post,
            "http://localhost:8000/",
            json={"array": [i]},
        )
        for i in range(10)
    ]
    wait(futs)
for fut in futs:
    i, batch_size, batch_group = fut.result().json()
    print(f"Request id: {i} is part of batch group: {batch_group}, with batch size {batch_size}")

Request id: [0.0] is part of batch group: [[0.0], [5.0], [1.0], [2.0], [3.0], [4.0], [7.0], [6.0], [8.0], [9.0]], with batch size 10
Request id: [1.0] is part of batch group: [[0.0], [5.0], [1.0], [2.0], [3.0], [4.0], [7.0], [6.0], [8.0], [9.0]], with batch size 10
Request id: [2.0] is part of batch group: [[0.0], [5.0], [1.0], [2.0], [3.0], [4.0], [7.0], [6.0], [8.0], [9.0]], with batch size 10
Request id: [3.0] is part of batch group: [[0.0], [5.0], [1.0], [2.0], [3.0], [4.0], [7.0], [6.0], [8.0], [9.0]], with batch size 10
Request id: [4.0] is part of batch group: [[0.0], [5.0], [1.0], [2.0], [3.0], [4.0], [7.0], [6.0], [8.0], [9.0]], with batch size 10
Request id: [5.0] is part of batch group: [[0.0], [5.0], [1.0], [2.0], [3.0], [4.0], [7.0], [6.0], [8.0], [9.0]], with batch size 10
Request id: [6.0] is part of batch group: [[0.0], [5.0], [1.0], [2.0], [3.0], [4.0], [7.0], [6.0], [8.0], [9.0]], with batch size 10
Request id: [7.0] is part of batch group: [[0.0], [5.0], [1.0], [2.0]

[2m[36m(HTTPProxyActor pid=60984)[0m INFO 2022-06-02 19:33:52,751 http_proxy 127.0.0.1 http_proxy.py:320 - POST /BatchSizePredictor 200 538.8ms
[2m[36m(HTTPProxyActor pid=60984)[0m INFO 2022-06-02 19:33:52,752 http_proxy 127.0.0.1 http_proxy.py:320 - POST /BatchSizePredictor 200 526.8ms
[2m[36m(HTTPProxyActor pid=60984)[0m INFO 2022-06-02 19:33:52,753 http_proxy 127.0.0.1 http_proxy.py:320 - POST /BatchSizePredictor 200 535.1ms
[2m[36m(HTTPProxyActor pid=60984)[0m INFO 2022-06-02 19:33:52,753 http_proxy 127.0.0.1 http_proxy.py:320 - POST /BatchSizePredictor 200 528.0ms
[2m[36m(HTTPProxyActor pid=60984)[0m INFO 2022-06-02 19:33:52,754 http_proxy 127.0.0.1 http_proxy.py:320 - POST /BatchSizePredictor 200 533.4ms
[2m[36m(HTTPProxyActor pid=60984)[0m INFO 2022-06-02 19:33:52,754 http_proxy 127.0.0.1 http_proxy.py:320 - POST /BatchSizePredictor 200 528.0ms
[2m[36m(HTTPProxyActor pid=60984)[0m INFO 2022-06-02 19:33:52,754 http_proxy 127.0.0.1 http_proxy.py:320 - POST /Ba