In [79]:
!pip install xgboost transformers "ray[data,train]" --quiet

In [80]:
params = {
  "colsample_bynode": 0.8,
  "learning_rate": 1,
  "max_depth": 5,
  "num_parallel_tree": 100,
  "objective": "binary:logistic",
  "subsample": 0.8,
  "tree_method": "hist",
  "device": "cuda",
}

In [81]:
from typing import Tuple

import ray
from ray.data import Dataset, Preprocessor
from ray.data.preprocessors import StandardScaler
from ray.train.xgboost import XGBoostTrainer
from ray.train import Result, ScalingConfig
import xgboost
import pandas as pd
from ray.train import Checkpoint


In [82]:
def prepare_data() -> Tuple[Dataset, Dataset, Dataset]:
    dataset = ray.data.read_csv("s3://anonymous@air-example-data/breast_cancer.csv")
    train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)
    test_dataset = valid_dataset.drop_columns(["target"])
    return train_dataset, valid_dataset, test_dataset

In [83]:
def train_xgboost(num_workers: int, use_gpu: bool = False) -> Result:
    train_dataset, valid_dataset, _ = prepare_data()

    # Scale some random columns
    columns_to_scale = ["mean radius", "mean texture"]
    preprocessor = StandardScaler(columns=columns_to_scale)
    train_dataset = preprocessor.fit_transform(train_dataset)
    valid_dataset = preprocessor.transform(valid_dataset)

    # XGBoost specific params
    params = {
        "tree_method": "approx",
        "objective": "binary:logistic",
        "eval_metric": ["logloss", "error"],
    }

    trainer = XGBoostTrainer(
        scaling_config=ScalingConfig(num_workers=num_workers, use_gpu=use_gpu),
        label_column="target",
        params=params,
        datasets={"train": train_dataset, "valid": valid_dataset},
        num_boost_round=100,
        metadata = {"preprocessor_pkl": preprocessor.serialize()}
    )
    result = trainer.fit()
    print(result.metrics)

    return result

In [84]:
class Predict:

    def __init__(self, checkpoint: Checkpoint):
        self.model = XGBoostTrainer.get_model(checkpoint)
        self.preprocessor = Preprocessor.deserialize(checkpoint.get_metadata()["preprocessor_pkl"])

    def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
        preprocessed_batch = self.preprocessor.transform_batch(batch)
        dmatrix = xgboost.DMatrix(preprocessed_batch)
        return {"predictions": self.model.predict(dmatrix)}


def predict_xgboost(result: Result):
    _, _, test_dataset = prepare_data()

    scores = test_dataset.map_batches(
        Predict, 
        fn_constructor_args=[result.checkpoint], 
        concurrency=1, 
        batch_format="pandas"
    )
    
    predicted_labels = scores.map_batches(lambda df: (df > 0.5).astype(int), batch_format="pandas")
    predicted_labels.show()

In [85]:
result = train_xgboost(num_workers=2, use_gpu=False)

2024-11-05 23:03:32,291	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-11-05_21-43-13_393763_11030/logs/ray-data
2024-11-05 23:03:32,292	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV]
                                                                                                 
✔️  Dataset execution finished in 0.76 seconds: 100%|██████████| 569/569 [00:00<00:00, 752 row/s]

- ReadCSV->SplitBlocks(16): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 68.1KB object store: : 569 row [00:00, 753 row/s]
2024-11-05 23:03:33,055	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-11-05_21-43-13_393763_11030/logs/ray-data
2024-11-05 23:03:33,056	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV]
                                                         

== Status ==
Current time: 2024-11-05 23:03:34 (running for 00:00:00.12)
Using FIFO scheduling algorithm.
Logical resource usage: 3.0/8 CPUs, 0/0 GPUs
Result logdir: /tmp/ray/session_2024-11-05_21-43-13_393763_11030/artifacts/2024-11-05_23-03-33/XGBoostTrainer_2024-11-05_23-03-33/driver_artifacts
Number of trials: 1/1 (1 PENDING)


== Status ==
Current time: 2024-11-05 23:03:39 (running for 00:00:05.18)
Using FIFO scheduling algorithm.
Logical resource usage: 3.0/8 CPUs, 0/0 GPUs
Result logdir: /tmp/ray/session_2024-11-05_21-43-13_393763_11030/artifacts/2024-11-05_23-03-33/XGBoostTrainer_2024-11-05_23-03-33/driver_artifacts
Number of trials: 1/1 (1 RUNNING)




2024-11-05 23:03:39,896	INFO tune.py:1009 -- Wrote the latest version of all result files and experiment state to '/Users/leoncenshuti/ray_results/XGBoostTrainer_2024-11-05_23-03-33' in 0.0070s.
2024-11-05 23:03:39,897	INFO tune.py:1041 -- Total run time: 5.91 seconds (5.89 seconds for the tuning loop).


== Status ==
Current time: 2024-11-05 23:03:39 (running for 00:00:05.90)
Using FIFO scheduling algorithm.
Logical resource usage: 3.0/8 CPUs, 0/0 GPUs
Result logdir: /tmp/ray/session_2024-11-05_21-43-13_393763_11030/artifacts/2024-11-05_23-03-33/XGBoostTrainer_2024-11-05_23-03-33/driver_artifacts
Number of trials: 1/1 (1 TERMINATED)


OrderedDict([('train-logloss', 0.00583838475616555), ('train-error', 0.0), ('valid-logloss', 0.07179918796305976), ('valid-error', 0.02941176470588235), ('timestamp', 1730869418), ('checkpoint_dir_name', 'checkpoint_000000'), ('should_checkpoint', True), ('done', True), ('training_iteration', 101), ('trial_id', '793d6_00000'), ('date', '2024-11-05_23-03-38'), ('time_this_iter_s', 0.002496957778930664), ('time_total_s', 3.2102787494659424), ('pid', 17361), ('hostname', 'Leonces-MacBook-Air.local'), ('node_ip', '127.0.0.1'), ('config', {}), ('time_since_restore', 3.2102787494659424), ('iterations_since_restore', 101), ('experiment_tag', '0')])


In [86]:
predict_xgboost(result)

2024-11-05 23:03:40,300	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-11-05_21-43-13_393763_11030/logs/ray-data
2024-11-05 23:03:40,300	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV]
                                                                                                 
✔️  Dataset execution finished in 0.83 seconds: 100%|██████████| 569/569 [00:00<00:00, 686 row/s]

- ReadCSV->SplitBlocks(16): Tasks: 0; Queued blocks: 0; Resources: 0.0 CPU, 94.2KB object store: : 569 row [00:00, 688 row/s]
2024-11-05 23:03:41,137	INFO streaming_executor.py:108 -- Starting execution of Dataset. Full logs are in /tmp/ray/session_2024-11-05_21-43-13_393763_11030/logs/ray-data
2024-11-05 23:03:41,137	INFO streaming_executor.py:109 -- Execution plan of Dataset: InputDataBuffer[Input] -> TaskPoolMapOperator[ReadCSV]
                                                         

{'predictions': 1}
{'predictions': 1}
{'predictions': 0}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}
{'predictions': 0}
{'predictions': 1}
{'predictions': 1}
{'predictions': 0}





In [87]:
# File name: model.py
from transformers import pipeline

class Translator:
    def __init__(self):
        # Load model
        self.model = pipeline("translation_en_to_fr", model="t5-small")

    def translate(self, text: str) -> str:
        # Run inference
        model_output = self.model(text)

        # Post-process output to return only the translation text
        translation = model_output[0]["translation_text"]

        return translation


In [88]:
from ray import serve

translator = Translator()

translation = translator.translate("Hello world!")
print(translation)

ModuleNotFoundError: No module named 'grpc'. You can run `pip install "ray[serve]"` to install all Ray Serve dependencies.

In [None]:
import ray
from fastapi import FastAPI

from transformers import pipeline

app = FastAPI()


In [None]:
@serve.deployment(num_replicas=2, ray_actor_options={"num_cpus": 0.2, "num_gpus": 0})
@serve.ingress(app)
class Translator:
    def __init__(self):
        # Load model
        self.model = pipeline("translation_en_to_fr", model="t5-small")

    @app.post("/")
    def translate(self, text: str) -> str:
        # Run inference
        model_output = self.model(text)

        # Post-process output to return only the translation text
        translation = model_output[0]["translation_text"]

        return translation

translator_app = Translator.bind()

In [None]:
# File name: model_client.py
import requests
from ray import serve

response = requests.post("http://127.0.0.1:8000/", params={"text": "Hello world!"})
french_text = response.json()

print(french_text)

ModuleNotFoundError: No module named 'grpc'. You can run `pip install "ray[serve]"` to install all Ray Serve dependencies.

In [None]:
import os
import tempfile
import numpy as np
from starlette.requests import Request
from typing import Dict

In [None]:
import tensorflow as tf

In [None]:
TRAINED_MODEL_PATH = os.path.join(tempfile.gettempdir(), "mnist_model.h5")

def train_and_save_model():
    # Load mnist dataset
    mnist = tf.keras.datasets.mnist
    (x_train, y_train), (x_test, y_test) = mnist.load_data()
    x_train, x_test = x_train / 255.0, x_test / 255.0

    # Train a simple neural net model
    model = tf.keras.models.Sequential(
        [
            tf.keras.layers.Flatten(input_shape=(28, 28)),
            tf.keras.layers.Dense(128, activation="relu"),
            tf.keras.layers.Dropout(0.2),
            tf.keras.layers.Dense(10),
        ]
    )
    loss_fn = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True)
    model.compile(optimizer="adam", loss=loss_fn, metrics=["accuracy"])
    model.fit(x_train, y_train, epochs=1)

    model.evaluate(x_test, y_test, verbose=2)
    model.summary()

    # Save the model in h5 format in local file system
    model.save(TRAINED_MODEL_PATH)

In [None]:
if not os.path.exists(TRAINED_MODEL_PATH):
    train_and_save_model()

In [None]:
@serve.deployment
class TFMnistModel:
    def __init__(self, model_path: str):
        import tensorflow as tf

        self.model_path = model_path
        self.model = tf.keras.models.load_model(model_path)

    async def __call__(self, starlette_request: Request) -> Dict:
        # Step 1: transform HTTP request -> tensorflow input
        # Here we define the request schema to be a json array.
        input_array = np.array((await starlette_request.json())["array"])
        reshaped_array = input_array.reshape((1, 28, 28))

        # Step 2: tensorflow input -> tensorflow output
        prediction = self.model(reshaped_array)

        # Step 3: tensorflow output -> web output
        return {"prediction": prediction.numpy().tolist(), "file": self.model_path}

NameError: name 'serve' is not defined

In [None]:
import requests
import numpy as np

In [None]:
resp = requests.get(
    "http://localhost:8000/", json={"array": np.random.randn(28 * 28).tolist()}
)
print(resp.json())