# An Introduction to the Ray AI Runtime


You can run this notebook directly in
[Colab TODO](https://colab.research.google.com/github/XXX).
<a target="_blank" href="https://colab.research.google.com/github/XXX">
<img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

TODO: Make this 2.5 or 2.6 later.
The book has been written for Ray 2.4.0, which you can install using `pip install ray==2.4.0`.

To run the examples for this chapter, you will also need to install the following dependencies:

In [None]:
# TODO pin all versions here
! pip install "ray[air]>=2.4.0" "accelerate>=0.16.0" "transformers>=4.26.0"
! pip install "numpy<1.24" "torch>=1.12.0" "datasets" "evaluate" "deepspeed"

## Overview

In this chapter we’ll introduce you to the core concepts of the Ray AI Runtime (AIR) and how you can
use it to build and deploy common ML workflows. To introduce its components we’ll build
an AIR application that fine-tunes an open-source language model, deploys it for online
inference and uses the model for offline batch inference.
We will also tell you when and why to use AIR and give you a brief overview of its ecosystem.
We close with an in-depth discussion of the relationship of AIR with other systems.

## Why and When to Use AIR?

Running ML workloads with Ray has been a constant evolution over the last couple
of years. Ray RLlib and Tune were the first libraries built on top of Ray Core.
Components like Ray Train, Serve, and more recently Datasets followed shortly
after. The addition of Ray AIR as an umbrella for all other Ray ML libraries is the
result of active discussions with and feedback from the ML community. Ray, as a
Python-native tool with good GPU support and stateful primitives (Ray actors) for
complex ML workloads, is a natural candidate for building a runtime like AIR.

Ray AIR is a unified toolkit for your ML workloads that offers many third-party
integrations for model training or accessing custom data sources. In the spirit of the
other ML libraries built on top of Ray Core, AIR hides lower-level abstractions and
provides an intuitive API that was inspired by common patterns from tools such as
scikit-learn.

At its core, Ray AIR was built for both data scientists and ML engineers alike. As
a data scientist, you can use it to build and scale your end-to-end experiments or
individual subtasks such as preprocessing, training, tuning, scoring, or serving of ML
models. As an ML engineer, you can go so far as to build a custom ML platform on
top of AIR or simply leverage its unified API to integrate it with other libraries from
your ecosystem. And Ray always gives you the flexibility to drop down and delve into
the lower-level Ray Core API.

As part of the Ray ecosystem, AIR can leverage all its benefits, which includes a
seamless transition from experimentation on a laptop to production workflows on a
cluster. You often see data science teams “hand over” their ML code to teams responsible
for production systems. In practice this can be expensive and time-consuming,
as this process often involves modifying or even rewriting parts of the code. As we
will see, Ray AIR helps you with this transition because AIR takes care of concerns
such as scalability, reliability, and robustness for you.

Ray AIR already has a respectable number of integrations today, but it’s also fully
extensible. And as we will show you in the next section, its unified API provides a
smooth workflow that allows you to drop-in-replace many of its components. For
instance, you can use the same interface to define an XGBoost or PyTorch Trainer
with AIR, which makes experimentation with various ML models convenient.

At the same time, by choosing AIR you can avoid the problem of working with
several (distributed) systems and writing glue code for them that’s difficult to deal
with. Teams working with many moving parts often experience rapid deprecation
of integrations and a high maintenance burden. These issues can lead to migration
fatigue, a reluctance to adopt new ideas due to the anticipated complexity of system
changes.

### Workloads to run with AIR

Now that we’ve seen examples of AIR and its fundamental concepts, let’s zoom out
a little and discuss in principle which kinds of workloads you can run with it. We’ve
tackled all of these workloads already throughout the book, but it’s good to recap
them systematically. As the name suggests, AIR is built to capture common tasks in
AI projects. These tasks can be roughly classified in the following way:

- Stateless computation: Tasks like preprocessing data or computing model predictions on a batch of data
    are stateless. Stateless workloads can be computed independently in parallel.
    If you recall our treatment of Ray tasks from Chapter 2, stateless computation
    is exactly what they were built for. AIR primarily uses Ray tasks for stateless
    workloads. Many big data processing tools fall into this category.
- Stateful computation: In contrast, model training and hyperparameter tuning are stateful operations, as
    they update the model state during their respective training procedure. Updating
    stateful workers in such distributed training is a difficult topic that Ray handles
    for you. AIR uses Ray actors for stateful computations.
- Composite workloads: Combining stateless and stateful computation, for instance by first processing
    features and then training a model, is quite common in AI workloads. In fact,
    it’s rare for end-to-end projects to exclusively use one or the other. Running such
    advanced composite workloads in a distributed fashion can be described as big
    data training, and AIR is built to handle both the stateless and stateful parts efficiently.
- Online serving: Lastly, AIR is built to handle scalable online serving of (multiple) models. The
    transition from the previous three workloads to serving is frictionless by design,
    as you still operate within the same AIR ecosystem.

You can use these types of workloads in different scenarios, too. For instance, you can
use AIR to replace and scale out a single component of an existing pipeline. Or you
can create your own end-to-end machine learning apps with AIR.
You can even use AIR to build your own AI platform, as we will see later.

![AIR Workloads](./images/AIR_workloads.png)

## The Key Components of Ray AIR

AIR’s design philosophy is to provide you with the ability to tackle your ML workloads
in a single script, run by a single system.


### Datasets and Preprocessors

The standard way to load data in Ray AIR is with Ray Datasets. AIR Preprocessors are
used to transform input data into features for ML experiments.
Since these preprocessors operate on Datasets and leverage the Ray ecosystem, they
allow you to scale your preprocessing steps efficiently. During training an AIR Preprocessor
is fitted to the specified training data and can then later be used for both
training and serving. AIR comes packaged with many common preprocessors that
cover many use cases. If you don’t find the one you need, you can easily define a
custom preprocessor on your own.

![AIR Data](./images/preprocessor_table.png)

### Trainers

Once you have your training and test datasets ready and your preprocessors defined,
you can move on to specifying a Trainer that runs an ML algorithm on your data.
Trainers provide a consistent wrapper for training frameworks such as TensorFlow, PyTorch, or
HuggingFace. In this example we’ll focus on the latter, but it’s important to note that
all other framework integrations work exactly the same way in terms of the Ray AIR
API.

Trainers provide scalable ML training that operates on AIR Datasets and preprocessors.
On top of that, they’re also built to integrate well with Ray Tune for HPO, as
we’ll see next.
To summarize this section, the following figure shows how AIR Trainers fit ML models on
Ray Datasets given preprocessors and a scaling configuration.

![AIR Trainers](./images/AIR_trainer.png)

### Tuners and Checkpoints

Tuners, introduced with Ray 2.0 as part of AIR, offer scalable hyperparameter tuning
through Ray Tune. Tuners work seamlessly with AIR Trainers, but also support arbitrary
training functions. In our example, instead of calling fit() on your trainer
instance from the previous section, you can pass your trainer into a Tuner. To do
so, a Tuner needs to be instantiated with a parameter space to search over, a so-called
TuneConfig. This config has all Tune-specific configurations like the metric you
want to optimize and an optional RunConfig that lets you configure runtime-specific
aspects such as the log verbosity of your Tune run.

Whenever you run AIR Trainers or Tuners, they generate framework-specific Checkpoints.
You can use these checkpoints to load models for usage across several AIR
libraries, such as Tune, Train, or Serve. You can get a Checkpoint by accessing the
result of a .fit() call on either a Trainer or a Tuner.

Having checkpoints is great because they’re AIR’s native model exchange format.
You can also use them to pick up trained models at a later stage, without having
to worry about custom ways to store and load the models in question. Figure 10-3
schematically shows how AIR Tuners work with AIR Trainers.

![AIR Trainers](./images/AIR_tuner.png)

### Running batch prediction

TODO: this needs to be adapted for new "map_batches" paradigm

![AIR Batch Inference](./images/AIR_predictor.png)

### Online Serving Deployments

Instead of using batch prediction and interacting with the model in question
directly, you can leverage Ray Serve to deploy an inference service that you can
query over HTTP. You do that by using the PredictorDeployment class and deploy
it using our checkpoint.

![AIR Deployments](./images/AIR_deployment.png)

Here's an overview of all components at once:

![AIR Overview](./images/air_plan.png)


It’s important to stress again that we’ve been using a single Python script for this
example and a single distributed system in Ray AIR to do all the heavy lifting. In
fact, you can use this example script and scale it out to a large cluster that uses CPUs
for preprocessing and GPUs for training and separately configure the deployment
simply by modifying the parameters of the scaling configuration and similar options
in that script. This isn’t as easy or common as it may seem, and it is not unusual
for data scientists to have to use multiple frameworks (e.g., one for data loading and
processing, one for training, and one for serving).

Note: You can also use Ray AIR with RLlib, but the integration is still in
its early stages. For instance, to integrate RLlib with AIR Trainers,
you’d use the RLTrainer that allows you to pass in all arguments
that you’d pass to a standard RLlib algorithm. After training, you
can store the resulting RL model in an AIR Checkpoint, just as
with any other AIR Trainer. To deploy your trained RL model,
you can use Serve’s PredictorDeployment class by passing your
checkpoint along with the RLPredictor class.
This API might be subject to change, but you can see an example of
how this works in the AIR documentation.

TODO: maybe 10.9 on distributed model training can be interesting for examples?

## An Example of Training and Deploying Large Language Models with AIR


In this example, we will showcase how to use the Ray AIR for fine-tuning GPT-J.
GPT-J is a GPT-2-like causal language model trained on the Pile dataset.
This particular model has 6 billion parameters.
For more information on GPT-J, click [here](https://huggingface.co/docs/transformers/model_doc/gptj).

We will use Ray AIR (with the 🤗 Transformers integration) and a pretrained model from Hugging Face hub. Note that you can easily adapt this example to use other similar models.

This example focuses more on the performance and distributed computing aspects of Ray AIR. If you are looking for a more beginner-friendly introduction to Ray AIR 🤗 Transformers integration, see /ray-air/examples/huggingface_text_classification.

It is highly recommended to read [Ray AIR Key Concepts](TODO) and [Ray Data Key Concepts](TODO) before starting this example.

### Setting up Ray

We will use `ray.init()` to initialize a local cluster.
By default, this cluster will consist of only the machine you are running this notebook on.
You can also run this notebook on an Anyscale cluster.

We define a {ref}`runtime environment <runtime-environments>` to ensure that the Ray workers have access to all the necessary packages. You can omit the `runtime_env` argument if you have all the packages already installed on each node in your cluster.


In [8]:
import ray

ray.init(
    runtime_env={
        "pip": [
            "datasets",
            "evaluate",
            "accelerate>=0.16.0",
            "transformers>=4.26.0",
            "torch>=1.12.0",
            "deepspeed",
        ]
    }
)

Usage stats collection is enabled by default for nightly wheels. To disable this, run the following command: `ray disable-usage-stats` before starting Ray. See https://docs.ray.io/en/master/cluster/usage-stats.html for more details.


2023-05-17 15:52:44,494	INFO worker.py:1554 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8265 [39m[22m


0,1
Python version:,3.7.16
Ray version:,3.0.0.dev0
Dashboard:,http://127.0.0.1:8265


### Data Loading and Preprocessing

We will be fine-tuning the model on the [`tiny_shakespeare` dataset](https://huggingface.co/datasets/tiny_shakespeare), comprised of 40,000 lines of Shakespeare from a variety of Shakespeare's plays. The aim will be to make the GPT-J model better at generating text in the style of Shakespeare.

In [None]:
from datasets import load_dataset

print("Loading tiny_shakespeare dataset")
current_dataset = load_dataset("tiny_shakespeare")
current_dataset

We will use [Ray Data](data) for distributed preprocessing and data ingestion. We can easily convert the dataset obtained from Hugging Face Hub to Ray Data by using {meth}`ray.data.from_huggingface`.

In [None]:
import ray.data

ray_datasets = ray.data.from_huggingface(current_dataset)
ray_datasets

Because the dataset is represented by a single large string, we will need to do some preprocessing. For that, we will define two [Ray AIR Preprocessors](air-preprocessors) using the {class}`~ray.data.preprocessors.BatchMapper` API, allowing us to define functions that will be applied on batches of data.

The `split_text` function will take the single string and split it into separate lines, removing empty lines and character names ending with ':' (eg. 'ROMEO:'). The `tokenize` function will take the lines and tokenize them using the 🤗 Tokenizer associated with the model, ensuring each entry has the same length (`block_size`) by padding and truncating. This is necessary for training.

```{note}
This preprocessing can be done in other ways. A common pattern is to tokenize first, and then split the obtained tokens into equally-sized blocks.
```

We will use the `splitter` and `tokenizer` Preprocessors below.

In [None]:
from transformers import AutoTokenizer

from ray.data.preprocessors import BatchMapper

block_size = 512

def split_text(batch: pd.DataFrame) -> pd.DataFrame:
    text = list(batch["text"])
    flat_text = "".join(text)
    split_text = [
        x.strip()
        for x in flat_text.split("\n")
        if x.strip() and not x.strip()[-1] == ":"
    ]
    return pd.DataFrame(split_text, columns=["text"])


def tokenize(batch: pd.DataFrame) -> dict:
    tokenizer = AutoTokenizer.from_pretrained(model_name, use_fast=False)
    tokenizer.pad_token = tokenizer.eos_token
    ret = tokenizer(
        list(batch["text"]),
        truncation=True,
        max_length=block_size,
        padding="max_length",
        return_tensors="np",
    )
    ret["labels"] = ret["input_ids"].copy()
    return dict(ret)


splitter = BatchMapper(split_text, batch_format="pandas")
tokenizer = BatchMapper(tokenize, batch_format="pandas")

### Fine-Tuning and Hyperparameter Optimization

We can now configure Ray AIR's {class}`~ray.train.hf_transformers.TransformersTrainer` to perform distributed fine-tuning of the model. In order to do that, we specify a `trainer_init_per_worker` function, which creates a 🤗 Transformers `Trainer` that will be distributed by Ray using Distributed Data Parallelism (using PyTorch Distributed backend internally). This means that each worker will have its own copy of the model, but operate on different data, At the end of each step, all the workers will sync gradients.

Because GPT-J is a relatively large model, it may not be possible to fit it on smaller GPU types (<=16 GB GRAM). To deal with that issue, we can use [DeepSpeed](https://github.com/microsoft/DeepSpeed), a library to optimize the training process and allow us to (among other things) offload and partition optimizer and parameter states, reducing GRAM usage. Furthermore, DeepSpeed ZeRO Stage 3 allows us to load large models without running out of memory.

🤗 Transformers and Ray AIR's integration ({class}`~ray.train.hf_transformers.TransformersTrainer`) allow you to easily configure and use DDP and DeepSpeed. All you need to do is specify the DeepSpeed configuration in the [`TrainingArguments`](https://huggingface.co/docs/transformers/en/main_classes/trainer#transformers.TrainingArguments) object.

```{tip}
There are many DeepSpeed settings that allow you to trade-off speed for memory usage. The settings used below are tailored to the cluster setup used (16 g4dn.4xlarge nodes) and per device batch size of 16. Some things to keep in mind:
- If your GPUs support bfloat16, use that instead of float16 mixed precision to get better performance and prevent overflows. Replace `fp16=True` with `bf16=True` in `TrainingArguments`.
- If you are running out of GRAM: try reducing batch size (defined in the cell below the next one), set `"overlap_comm": False` in DeepSpeed config.
- If you are running out of RAM, add more nodes to your cluster, use nodes with more RAM, set `"pin_memory": False` in the DeepSpeed config, reduce the batch size, and remove `"offload_param"` from the DeepSpeed config.

For more information on DeepSpeed configuration, refer to [Hugging Face documentation](https://huggingface.co/docs/transformers/main_classes/deepspeed) and [DeepSpeed documentation](https://www.deepspeed.ai/docs/config-json/).

Additionally, if you prefer a lower-level API, the logic below can be expressed as an [Accelerate training loop](https://github.com/huggingface/accelerate/blob/main/examples/by_feature/deepspeed_with_config_support.py) distributed by a Ray AIR {class}`~ray.train.torch.torch_trainer.TorchTrainer`.
```

#### Training speed

As we are using data parallelism, each worker operates on its own shard of the data. The batch size set in `TrainingArguments` is the **per device batch size** (per worker batch size). By changing the number of workers, we can change the **effective batch size** and thus the time needed for training to complete. The effective batch size is then calculated as `per device batch size * number of workers * number of gradient accumulation steps`. As we add more workers, the effective batch size rises and thus we need less time to complete a full epoch. While the speedup is not exactly linear due to extra communication overheads, in many cases it can be close to linear.

The preprocessed dataset has 1348 examples. We have set per device batch size to 16.

* With 16 g4dn.4xlarge nodes, the effective batch size was 256, which equals to 85 steps per epoch. One epoch took **~2440 seconds** (including initialization time).

* With 32 g4dn.4xlarge nodes, the effective batch size was 512, which equals to 43 steps per epoch. One epoch took **~1280 seconds** (including initialization time).

In [None]:
import evaluate
from transformers import Trainer, TrainingArguments
from transformers import (
    GPTJForCausalLM,
    AutoTokenizer,
    default_data_collator,
)
from transformers.utils.logging import disable_progress_bar, enable_progress_bar
import torch

from ray.air import session


def trainer_init_per_worker(train_dataset, eval_dataset=None, **config):
    # Use the actual number of CPUs assigned by Ray
    os.environ["OMP_NUM_THREADS"] = str(
        session.get_trial_resources().bundles[-1].get("CPU", 1)
    )
    # Enable tf32 for better performance
    torch.backends.cuda.matmul.allow_tf32 = True

    batch_size = config.get("batch_size", 4)
    epochs = config.get("epochs", 2)
    warmup_steps = config.get("warmup_steps", 0)
    learning_rate = config.get("learning_rate", 0.00002)
    weight_decay = config.get("weight_decay", 0.01)

    deepspeed = {
        "fp16": {
            "enabled": "auto",
            "initial_scale_power": 8,
        },
        "bf16": {"enabled": "auto"},
        "optimizer": {
            "type": "AdamW",
            "params": {
                "lr": "auto",
                "betas": "auto",
                "eps": "auto",
            },
        },
        "zero_optimization": {
            "stage": 3,
            "offload_optimizer": {
                "device": "cpu",
                "pin_memory": True,
            },
            "offload_param": {
                "device": "cpu",
                "pin_memory": True,
            },
            "overlap_comm": True,
            "contiguous_gradients": True,
            "reduce_bucket_size": "auto",
            "stage3_prefetch_bucket_size": "auto",
            "stage3_param_persistence_threshold": "auto",
            "gather_16bit_weights_on_model_save": True,
            "round_robin_gradients": True,
        },
        "gradient_accumulation_steps": "auto",
        "gradient_clipping": "auto",
        "steps_per_print": 10,
        "train_batch_size": "auto",
        "train_micro_batch_size_per_gpu": "auto",
        "wall_clock_breakdown": False,
    }

    print("Preparing training arguments")
    training_args = TrainingArguments(
        "output",
        per_device_train_batch_size=batch_size,
        logging_steps=1,
        save_strategy="no",
        per_device_eval_batch_size=batch_size,
        learning_rate=learning_rate,
        weight_decay=weight_decay,
        warmup_steps=warmup_steps,
        label_names=["input_ids", "attention_mask"],
        num_train_epochs=epochs,
        push_to_hub=False,
        disable_tqdm=True,  # declutter the output a little
        fp16=True,
        gradient_checkpointing=True,
        deepspeed=deepspeed,
    )
    disable_progress_bar()

    tokenizer = AutoTokenizer.from_pretrained(model_name)
    tokenizer.pad_token = tokenizer.eos_token

    print("Loading model")

    model = GPTJForCausalLM.from_pretrained(model_name, use_cache=False)
    model.resize_token_embeddings(len(tokenizer))

    print("Model loaded")

    enable_progress_bar()

    metric = evaluate.load("accuracy")

    def compute_metrics(eval_pred):
        logits, labels = eval_pred
        predictions = np.argmax(logits, axis=-1)
        return metric.compute(predictions=predictions, references=labels)

    trainer = Trainer(
        model=model,
        args=training_args,
        train_dataset=train_dataset,
        eval_dataset=eval_dataset,
        compute_metrics=compute_metrics,
        tokenizer=tokenizer,
        data_collator=default_data_collator,
    )
    return trainer

With our `trainer_init_per_worker` complete, we can now instantiate the {class}`~ray.train.hf_transformers.TransformersTrainer`. Aside from the function, we set the `scaling_config`, controlling the amount of workers and resources used, and the `datasets` we will use for training and evaluation.

We pass the preprocessors we have defined earlier as an argument, wrapped in a {class}`~ray.data.preprocessors.chain.Chain`. The preprocessor will be included with the returned {class}`~ray.air.checkpoint.Checkpoint`, meaning it will also be applied during inference.

```{note}
If you want to upload checkpoints to cloud storage (eg. S3), set {class}`air.RunConfig(storage_path) <ray.air.RunConfig>`. See {ref}`train-run-config` for an example. Using cloud storage is highly recommended, especially for production.
```

In [None]:
from ray.train.hf_transformers import TransformersTrainer
from ray.air.config import ScalingConfig
from ray.data.preprocessors import Chain


trainer = TransformersTrainer(
    trainer_init_per_worker=trainer_init_per_worker,
    trainer_init_config={
        "batch_size": 16,  # per device
        "epochs": 1,
    },
    scaling_config=ScalingConfig(
        num_workers=num_workers,
        use_gpu=use_gpu,
        resources_per_worker={"GPU": 1, "CPU": cpus_per_worker},
    ),
    datasets={"train": ray_datasets["train"], "evaluation": ray_datasets["validation"]},
    preprocessor=Chain(splitter, tokenizer),
)

Finally, we call the {meth}`~ray.train.hf_transformers.TransformersTrainer.fit` method to start training with Ray AIR. We will save the {class}`~ray.air.Result` object to a variable so we can access metrics and checkpoints.

In [None]:
trainer.fit()

You can use the returned {class}`~ray.air.Result` object to access metrics and the Ray AIR {class}`~ray.air.checkpoint.Checkpoint` associated with the last iteration.

In [None]:
checkpoint = results.checkpoint
checkpoint

#### Generate text from prompt

We can use the {class}`~ray.train.hf_transformers.huggingface_predictor.TransformersPredictor` to generate predictions from our fine-tuned model.

```{tip}
For large scale batch inference, consider configuring cloud checkpointing and then pass the cloud-backed {class}`~ray.air.checkpoint.Checkpoint` to {class}`~ray.train.batch_predictor.BatchPredictor`. More information [here](air-predictors).
```

Because the {class}`~ray.train.hf_transformers.huggingface_predictor.TransformersPredictor` uses a 🤗 Transformers [`pipeline`](https://huggingface.co/docs/transformers/en/main_classes/pipelines) under the hood, we disable the tokenizer AIR Preprocessor we have used for training and let the `pipeline` to tokenize the data itself.

In [None]:
checkpoint.set_preprocessor(None)

We also set `device_map="auto"` so that the model is automatically placed on the right device and set the `task` to `"text-generation"`. The `predict` method passes the arguments to a 🤗 Transformers `pipeline` call.

In [None]:
from ray.train.hf_transformers import TransformersPredictor
import pandas as pd

prompts = pd.DataFrame(["Romeo and Juliet", "Romeo", "Juliet"], columns=["text"])

# Predict on the head node.
predictor = TransformersPredictor.from_checkpoint(
    checkpoint=checkpoint,
    task="text-generation",
    torch_dtype=torch.float16 if use_gpu else None,
    device_map="auto",
    use_gpu=use_gpu,
)
prediction = predictor.predict(
    prompts,
    do_sample=True,
    temperature=0.9,
    min_length=32,
    max_length=128,
)

In [None]:
prediction

### Running Batch Inference

TODO intro

In [None]:
model_id = "EleutherAI/gpt-j-6B"
revision = "float16"  # use float16 weights to fit in 16GB GPUs
prompt = (
    "In a shocking finding, scientists discovered a herd of unicorns living in a remote, "
    "previously unexplored valley, in the Andes Mountains. Even more surprising to the "
    "researchers was the fact that the unicorns spoke perfect English."
)

For the purposes of this example, we will use a very small toy dataset composed of multiple copies of our prompt. Ray Data can handle much bigger datasets with ease.

In [None]:
import ray.data
import pandas as pd

ds = ray.data.from_pandas(pd.DataFrame([prompt] * 10, columns=["prompt"]))

Since we will be using a pretrained model from Hugging Face hub, the simplest way is to use {meth}`map_batches <ray.data.Dataset.map_batches>` with a [callable class UDF](transforming_data_actors). This will allow us to save time by initializing a model just once and then feed it multiple batches of data.

In [None]:
class PredictCallable:
    def __init__(self, model_id: str, revision: str = None):
        from transformers import AutoModelForCausalLM, AutoTokenizer
        import torch

        self.model = AutoModelForCausalLM.from_pretrained(
            model_id,
            revision=revision,
            torch_dtype=torch.float16,
            low_cpu_mem_usage=True,
            device_map="auto",  # automatically makes use of all GPUs available to the Actor
        )
        self.tokenizer = AutoTokenizer.from_pretrained(model_id)

    def __call__(self, batch: pd.DataFrame) -> pd.DataFrame:
        tokenized = self.tokenizer(
            list(batch["prompt"]), return_tensors="pt"
        )
        input_ids = tokenized.input_ids.to(self.model.device)
        attention_mask = tokenized.attention_mask.to(self.model.device)

        gen_tokens = self.model.generate(
            input_ids=input_ids,
            attention_mask=attention_mask,
            do_sample=True,
            temperature=0.9,
            max_length=100,
            pad_token_id=self.tokenizer.eos_token_id,
        )
        return pd.DataFrame(
            self.tokenizer.batch_decode(gen_tokens), columns=["responses"]
        )

All that is left is to run the `map_batches` method on the dataset. We specify that we want to use one GPU for each Ray Actor that will be running our callable class.

Also notice that we repartition the dataset into 100 partitions before mapping batches. This is to make sure there will be enough parallel tasks to take advantage of all the GPUs. 100 is an arbitrary number. You can pick any other numbers as long as it is more than the number of available GPUs in the cluster.

```{tip}
If you have access to large GPUs, you may want to increase the batch size to better saturate them.

If you want to use inter-node model parallelism, you can also increase `num_gpus`. As we have created the model with `device_map="auto"`, it will be automatically placed on correct devices. Note that this requires nodes with multiple GPUs.
```

In [None]:
preds = (
    ds
    .repartition(100)
    .map_batches(
        PredictCallable,
        batch_size=4,
        fn_constructor_kwargs=dict(model_id=model_id, revision=revision),
        batch_format="pandas",
        compute=ray.data.ActorPoolStrategy(),
        num_gpus=1,
    )
)

After `map_batches` is done, we can view our generated text.

In [None]:
preds.take_all()

You may notice that we are not using an AIR {class}`Predictor <ray.train.predictor.Predictor>` here. This is because Predictors are mainly intended to be used with AIR {class}`Checkpoints <ray.air.checkpoint.Checkpoint>`, which we don't for this example. See {ref}`air-predictors` for more information and usage examples.

### Running Online Model Serving

Setting up basic serving with Ray Serve is very similar to {doc}`batch inference with Ray Data </ray-air/examples/gptj_batch_prediction>`. First, we define a callable class that will serve as the [Serve deployment](serve-key-concepts-deployment). At runtime, a deployment consists of a number of *replicas*, which are individual copies of the class or function that are started in separate Ray Actors (processes). The number of replicas can be scaled up or down (or even autoscaled) to match the incoming request load.

We make sure to set the deployment to use 1 GPU by setting `"num_gpus"` in `ray_actor_options`. We load the model in `__init__`, which will allow us to save time by initializing a model just once and then use it to handle multiple requests.

```{tip}
If you want to use inter-node model parallelism, you can also increase `num_gpus`. As we have created the model with `device_map="auto"`, it will be automatically placed on correct devices. Note that this requires nodes with multiple GPUs.
```

In [None]:
import pandas as pd

from ray import serve
from starlette.requests import Request


@serve.deployment(ray_actor_options={"num_gpus": 1})
class PredictDeployment:
    def __init__(self, model_id: str, revision: str = None):
        from transformers import AutoModelForCausalLM, AutoTokenizer
        import torch

        self.model = AutoModelForCausalLM.from_pretrained(
            model_id,
            revision=revision,
            torch_dtype=torch.float16,
            low_cpu_mem_usage=True,
            device_map="auto",  # automatically makes use of all GPUs available to the Actor
        )
        self.tokenizer = AutoTokenizer.from_pretrained(model_id)

    def generate(self, text: str) -> pd.DataFrame:
        input_ids = self.tokenizer(text, return_tensors="pt").input_ids.to(
            self.model.device
        )

        gen_tokens = self.model.generate(
            input_ids,
            do_sample=True,
            temperature=0.9,
            max_length=100,
        )
        return pd.DataFrame(
            self.tokenizer.batch_decode(gen_tokens), columns=["responses"]
        )

    async def __call__(self, http_request: Request) -> str:
        json_request: str = await http_request.json()
        prompts = []
        for prompt in json_request:
            text = prompt["text"]
            if isinstance(text, list):
                prompts.extend(text)
            else:
                prompts.append(text)
        return self.generate(prompts)

We can now `bind` the deployment with our arguments, and use {meth}`~ray.serve.run` to start it.

```{note}
If you were running this script outside of a Jupyter notebook, the recommended way is to use the [`serve run` CLI command](serve-cli). In this case, you would remove the `serve.run(deployment)` line, and instead start the deployment by calling `serve run FILENAME:deployment`.

For more information, see [Serve Development Workflow](serve-dev-workflow).
```

In [None]:
deployment = PredictDeployment.bind(model_id=model_id, revision=revision)
serve.run(deployment)

Let's try submitting a request to our deployment. We will use the same prompt as before, and send a POST request. The deployment will generate a response and return it.

In [None]:
import requests

prompt = (
    "In a shocking finding, scientists discovered a herd of unicorns living in a remote, "
    "previously unexplored valley, in the Andes Mountains. Even more surprising to the "
    "researchers was the fact that the unicorns spoke perfect English."
)

sample_input = {"text": prompt}

output = requests.post("http://localhost:8000/", json=[sample_input]).json()
print(output)

This wraps up our extended LLM examples.
Let's next take a look at Ray AIR's ecosystem.

## An Overview of Ray AIR Integrations

Next, we'll show you the full extent of integrations currently available for Ray.
We do so by discussing this ecosystem as seen from Ray AIR so that we can discuss
it in the context of a representative AIR workflow.
Clearly, we simply can’t give you examples for all the libraries in Ray’s ecosystem.
Where appropriate, we’ll point you to more advanced resources to deepen your understanding.

![AIR Data Table](./images/data_eco_table.png)

![AIR Train Table](./images/training_eco_table.png)

![AIR Tune Table](./images/tune_eco_table.png)

![AIR Serve Table](./images/serve_eco_table.png)

### An Overview of Ray’s Integrations

Let’s summarize all the integrations mentioned in this chapter (and throughout the
book) in one concise diagram. In the following figure we list all integrations currently available:

![AIR Eco](./images/Ray_extended_eco.png)

## How AIR compares to related systems

Now that you know much more about Ray and its libraries, this chapter is also the
right place to compare what Ray offers to similar systems. As you’ve seen, Ray’s ecosystem
is quite complex, can be seen from different angles, and is used for different
purposes. That means many aspects of Ray can be compared to other tools in the
market. We’ll also comment on how to integrate Ray into more complex workflows in existing
ML platforms.

We’ve not made any direct comparisons with other systems up to this point, for the
simple reason that it makes little sense to compare Ray to something if you don’t
have a good grasp of what Ray is yet. As Ray is quite flexible and comes with a lot
of components, it can be compared to different types of tools in the broader ML
ecosystem.
Let’s start with a comparison of the more obvious candidates, namely, Python-based
frameworks for cluster computing.

### Distributed Python Frameworks

If you consider frameworks for distributed computing that offer full Python support
and don’t lock you into any cloud offering, the current “big three” are Dask, Spark,
and Ray. While there are certain technical and context-dependent performance differences
between these frameworks, it’s best to compare them in terms of the workloads
you want to run on them. Table XXX compares the most common workload
types:

![AIR Dask Spark Table](./images/dask_spark_ray_table.png)

### Ray AIR and the Broader ML Ecosystem

Ray AIR focuses primarily on AI compute, for instance by providing any kind of
distributed training via Ray Train, but it’s not built to cover every aspect of an AI
workload. For instance, AIR chooses to integrate with tracking and monitoring tools
for ML experiments, as well as with data storage solutions, rather than providing
native solutions.

On the other side of the spectrum, you can find categories of tools for which Ray AIR
can be considered an alternative. For instance, there are many framework-specific
toolkits such as TorchX or TFX that tie in tightly with their respective frameworks. In
contrast, AIR is framework-agnostic, thereby preventing vendor lock-in, and offers
similar tooling.

It’s also interesting to briefly touch on how Ray AIR compares to specific cloud offerings.
Some major cloud services offer comprehensive toolkits to tackle ML workloads
in Python. To name just one, AWS Sagemaker is a great all-in-one package that
allows you to connect well with your AWS ecosystem. AIR does not aim to replace
tools like SageMaker. Instead, it aims to provide alternatives for compute-intensive
components like training, evaluation, and serving.

AIR also represents a valid alternative to ML workflow frameworks such as KubeFlow
or Flyte. In contrast to many container-based solutions, AIR offers an intuitive,
high-level Python API and offers native support for distributed data.

Sometimes the situation is not as clear-cut, and Ray AIR can be seen or used as both
an alternative or a complementary component in the ML ecosystem.
For instance, as open source systems, Ray and AIR in particular can be used within
hosted ML platforms such as SageMaker, but you can also build your own ML
Platforms with it. Also, as mentioned, AIR can’t always compete with dedicated big
data processing systems like Spark or Dask, but often Ray Datasets can be enough to
suit your processing needs.

As we mentioned earlier, it is central to AIR’s design philosophy to have
the ability to express your ML workloads in a single script and execute it on Ray
as a single distributed system. Since Ray handles all the task placement and execution
on your cluster for you under the hood, there’s usually no need to explicitly
orchestrate your workloads (or stitch together many complex distributed systems).
Of course, this philosophy should not be taken too literally—sometimes you need
multiple systems or to split up tasks into several stages. On the other hand, dedicated
workflow orchestration tools like Argo or AirFlow can be very useful when used in
a complementary fashion. For instance, you might want to run Ray as a step in the
Lightning MLOps framework.


### How to Integrate AIR into Your ML Platform

Now that you have a deeper understanding of the relationship of Ray, and AIR in
particular, to other ecosystem components, let’s summarize what it takes to build your
own ML platform and integrate Ray with other ecosystem components.

The core of your ML system build with AIR consists of a set of Ray Clusters, each
responsible for different jobs. For instance, one cluster might run preprocessing, train
a PyTorch model, and run inference; another one might simply pick up previously
trained models for batch inference and model serving, and so on. You can leverage
the Ray Autoscaler to fulfill your scaling needs and could deploy the whole system
on Kubernetes with KubeRay. You can then augment this core system with other
components as you see fit, for example:

- You might want to add other compute steps to your setup, such as running
    data-intensive preprocessing tasks with Spark.
- You can use a workflow orchestrator such as AirFlow, Oozie, or SageMaker
    Pipelines to schedule and create your Ray Clusters and run Ray AIR apps and
    services. Each AIR app can be part of a larger orchestrated workflow, for instance
    by tying into a Spark ETL job from the first bullet point.
- You can also create your Ray AIR clusters for interactive use with Jupyter notebooks,
for instance hosted by Google Colab or Databricks Notebooks.
- If you need access to a feature store such as Feast or Tecton, Ray Train, Datasets,
and Serve have an integration for such tools.16
- For experiment tracking or metric stores, Ray Train and Tune provide integration
    with tools such as MLflow and Weights & Biases.
- You can also retrieve and store your data and models from external storage
    solutions like S3, as shown.

![AIR ML Platform Table](./images/AIR_ML_platform.png)

## Summary

In this chapter you’ve seen how all the Ray libraries we’ve introduced come together
to form the Ray AI Runtime. You’ve learned about all the key concepts that allow
you to build scalable ML projects, from experimentation to production. In particular,
you’ve seen how Ray Datasets are used for stateless computations such as feature
preprocessing, and how Ray Train and Tune are used for stateful computations
such as model training. Seamlessly combining these types of computations in
complex AI workloads and scaling them out to large clusters is a key strength of AIR.
Deploying your AIR projects comes essentially for free, as AIR fully integrates with
Ray Serve as well.

You also learned about Ray AIR’s ecosystem.
You should now be able to go out there and run your own AIR experiments,
together with all the tools you’re already using or intend to use in the future. We’ve
also discussed Ray’s limits, how it compares to various related systems, and how you
can use Ray with other tools to augment or build out your own ML platforms.