# Introduction to Ray AI Runtime (AIR) - Data and Train

<img src="../_static/assets/Generic/ray_logo.png" width="20%" loading="lazy">

<div class="alert alert-info">
  <strong><a href="https://docs.ray.io/en/latest/ray-air/getting-started.html" target="_blank">Ray AI Runtime (AIR)</a></strong> is an open-source, Python-based, domain-specific library that equips ML engineers, data scientists, and researchers with a scalable and unified toolkit for ML applications.
</div>

Built on top of Ray Core, Ray AIR inherits all the performance and scalability benefits offered by Core while providing a convenient abstraction layer for machine learning. Ray AIR's Python-first native libraries allow ML practitioners to distribute individual workloads, end-to-end applications, and build custom use cases in a unified framework.

### Machine learning workflow with Ray AIR

|<img src="../_static/assets/Introduction_to_Ray_AIR/e2e_air.png" width="70%" loading="lazy">|
|:--|
|Ray AIR enables end-to-end ML development and provides multiple options for integrating with other tools and libraries form the MLOps ecosystem.|

### Data loading and model training with Ray AI Runtime

To illustrate Ray AIR's capabilities, you will implement an data loading and transforming as well as model training. end-to-end machine learning application that predicts big tips using New York City taxi data. Each section will introduce the relevant Ray AIR library or component and demonstrating its functionality through code examples.

|Ray AIR Component|NYC Taxi Use Case|
|:--|:--|
|Ray Data|Use `Preprocessor` to load and transform input data.|
|Ray Train|Use `Trainer` to scale XGBoost model training.|

For this classification task, you will apply a simple [XGBoost](https://xgboost.readthedocs.io/en/stable/) (a gradient boosted trees framework) model to the June 2021 [New York City Taxi & Limousine Commission's Trip Record Data](https://www1.nyc.gov/site/tlc/about/tlc-trip-record-data.page). This dataset contains over 2 million samples of yellow cab rides, and the goal is to predict whether a trip will result in a tip greater than 20% or not.

## Part 1: Ray Data
---

|<img src="../_static/assets/Introduction_to_Ray_AIR/data_highlight.png" width="70%" loading="lazy">|
|:--|
|Ray AIR wraps Ray Data to provide distributed data ingestion and transformation during training, tuning, and inference.|

### Introduction to Ray Datasets

Backed by PyArrow, [Ray Datasets](https://docs.ray.io/en/latest/data/user-guide.html) parallelize the loading and transforming of data and provide a standard way to pass references to data across Ray libraries and applications. Datasets are not intended to replace more general data processing systems. Instead, it serves as a last-mile bridge from ETL pipeline outputs to distributed applications and libraries in Ray.

#### Key features

- **Flexibility**

    Datasets are compatible with a variety of file formats, data sources, and distributed frameworks. They work seamlessly with library integrations like Dask on Ray and can be passed between Ray tasks and actors without copying data.

- **Performance for ML Workloads**

    Datasets offer important features like accelerator support, pipelining, and global random shuffles that accelerate ML training and inference workloads. They also support basic distributed data transformations such as map, filter, sort, groupby, and repartition.

- **Persistent Preprocessor**

    The `Preprocessor` primitive captures and stores the transformations applied to convert inputs into features. It is applied during training, tuning, batch prediction, and serving to keep the preprocessing consistent across the pipeline.
    
- **Built on Ray Core**

    Datasets inherits scalability to hundreds of nodes, efficient memory usage, object spilling, and failure handling from Ray Core. Because Datasets are just lists of object references, they can be passed between tasks and actors without needing to make a copy of the data, which is crucial for making data-intensive applications and libraries scalable.

|<img src="../_static/assets/Introduction_to_Ray_AIR/data_code.png" width="70%" loading="lazy">|
|:--|
|A general pattern for creating a `Dataset`, configuring a `Preprocessor`, and passing these into the `Trainer` for consistent data handling throughout the pipeline.|

### Start Ray runtime

In [None]:
import ray

In [None]:
ray.init()

Start a Ray cluster (see: installation [instructions](https://docs.ray.io/en/latest/ray-overview/installation.html)) so that Ray can utilize all the cores and GPUs available to you as workers.

### Create Ray Datasets

In [None]:
# Read Parquet file to Ray Dataset.
dataset = ray.data.read_parquet(
    "s3://anyscale-training-data/intro-to-ray-air/nyc_taxi_2021.parquet"
)

In [None]:
# Split data into training and validation subsets.
train_dataset, valid_dataset = dataset.train_test_split(test_size=0.3)

In [None]:
# Split datasets into blocks for parallel preprocessing.
# `num_blocks` should be lower than number of cores in the cluster.
train_dataset = train_dataset.repartition(num_blocks=5)
valid_dataset = valid_dataset.repartition(num_blocks=5)

### Common operations on datasets

There are many [`Dataset` API elements](https://docs.ray.io/en/latest/data/api/dataset.html#) available for common transformations and operations. Below are few examples.

Inspect [the schema](https://docs.ray.io/en/latest/data/api/dataset.html#inspecting-metadata) of the underlying Parquet metadata.

In [None]:
print(f"Schema of training dataset: \n {train_dataset.schema()}")

[Count](https://docs.ray.io/en/latest/data/api/dataset.html#inspecting-metadata) the number of rows in the training and validation datasets.

In [None]:
print(f"Number of samples in training dataset: \n {train_dataset.count()}")
print(f"Number of samples in validation dataset: \n {valid_dataset.count()}")

[Show](https://docs.ray.io/en/latest/data/api/dataset.html#consuming-datasets) the first five samples from either dataset.

In [None]:
train_dataset.show(5)

Calculate the average `fare_amount` [grouped by](https://docs.ray.io/en/latest/data/api/dataset.html#grouped-and-global-aggregations) `passenger_count`.

In [None]:
train_dataset.groupby("passenger_count").mean("fare_amount").show()

[Split](https://docs.ray.io/en/latest/data/api/dataset.html#ray.data.Dataset.split) dataset into N datasets.

In [None]:
datasets_list = train_dataset.split(n=5)
datasets_list

### Preprocess the dataset
To transform our raw data into features, you will define a `Preprocessor`. [Ray AIR's `Preprocessor`](https://docs.ray.io/en/latest/ray-air/package-ref.html#preprocessor) captures the data transformation you apply and persists:

- **During training**

    `Preprocessor` is passed into a `Trainer` to `fit` and `transform` input `Datasets`.
- **During tuning**

    Each `Trial` will create its own copy of the `Preprocessor`, and the fitting and transformation logic will occur once per `Trial`.
- **During checkpointing**

    The `Preprocessor` is saved in the `Checkpoint` if it was passed into the `Trainer`.
- **During predicting**

    If the `Checkpoint` contains a `Preprocessor`, then it will be used to call `transform_batch` on input batches prior to performing inference.

Ray AIR provides several [preprocessors out of the box](https://docs.ray.io/en/latest/ray-air/preprocessors.html#) and also supports the implementation of [custom preprocessors](https://docs.ray.io/en/latest/ray-air/preprocessors.html#implementing-custom-preprocessors). Later on, you can compare model performance between the given preprocessor and your custom configuration.

Select a [built-in](https://docs.ray.io/en/latest/ray-air/preprocessors.html#types-of-preprocessors) `Preprocessor` and use `fit_transform()` to [apply it](https://docs.ray.io/en/latest/ray-air/package-ref.html#preprocessor) to the dataset. Visualize the results (perhaps using the [integration with pandas](https://docs.ray.io/en/latest/data/api/input_output.html#ray.data.from_pandas) to generate a histogram view)

Note: You may want to create a sample dataset to transform, as the original data and preprocessor will be passed to the `Trainer` in the next step for transformation.

In [None]:
from ray.data.preprocessors import MinMaxScaler

In [None]:
# Define a preprocessor to normalize the columns by their range.
preprocessor = MinMaxScaler(columns=["trip_distance", "trip_duration"])

In [None]:
from ray.data.preprocessors import PowerTransformer

sample_data = ray.data.read_parquet(
    "s3://anyscale-training-data/intro-to-ray-air/nyc_taxi_2021.parquet"
)

# create new preprocessor
sample_preprocessor = PowerTransformer(
    columns=["trip_distance", "trip_duration"], power=0.5
)

# apply the transformation
transformed_data = sample_preprocessor.fit_transform(sample_data)

For numerical features, you can choose an appropriate AIR `Preprocessor` depending on your data's properties:

- `PowerTransformer`  
Your data isn't normal, but you need it to be.
- `Normalizer`  
You need unit norm rows.
- `MinMaxScaler`  
You are unsure of the distribution of your data.

### Summary

#### Key concepts

**`Dataset`**

The standard way to load and exchange data in Ray AIR. In AIR, Datasets are used extensively for data loading and transformation. They are meant as a last-mile bridge from ETL pipeline outputs to distributed applications and libraries in Ray.

**`Preprocessor`**

Preprocessors are primitives that transform input data into features. They operate on Datasets, making them scalable and compatible with a variety of datasources and dataframe libraries.

Preprocessors persist through various stages of the pipeline:

- During training to fit and transform input data
- In each trial of hyperparameter tuning
- Within a checkpoint
- On input batches for inference

AIR comes with a collection of built-in preprocessors, and you can also define your own with simple templates (see the [user guide](https://docs.ray.io/en/latest/ray-air/preprocessors.html) for more information).

## Part 2: Ray Train
---

|<img src="../_static/assets/Introduction_to_Ray_AIR/train_highlight.png" width="70%" loading="lazy">|
|:--|
|Ray AIR wraps Ray Train to provide distributed model training.|

### Introduction to Ray Train

#### Common challenges with training

ML pracitioners tend to run into a few common problems with training models that prompt them to consider distributed solutions:

1. Training time is [too long](https://www.anyscale.com/blog/how-anastasia-implements-ray-and-anyscale-to-speed-up-ml-processes-9x) to be practical.
2. The [data is too large](https://www.anyscale.com/blog/how-ray-and-anyscale-make-it-easy-to-do-massive-scale-machine-learning-on) to fit on one machine.
3. [Training many models](https://www.anyscale.com/blog/training-one-million-machine-learning-models-in-record-time-with-ray) sequentially doesn't utilize resources efficiently.
4. The [model itself is too large](https://www.uber.com/blog/horovod-ray/) to fit on a single machine.

[Ray Train](https://docs.ray.io/en/latest/ray-air/trainer.html) addresses these issues by improving performance through distributed multi-node training.

#### Integration with Ray ecosystem

Ray Train's `Trainers` integrates well with the rest of the Ray ecosystem:

* **Ray Data**  
    * Enables scalable data loading and preprocessing with Ray `Datasets` and `Preprocessors`.
* **Ray Tune**
    * Composes with `Tuners` for distributed hyperparameter tuning.
* **Ray AIR Predictor**
    * As a checkpointed trained model to be applied during inference.
* **Popular ML training frameworks**
    * [PyTorch](https://docs.ray.io/en/latest/ray-air/package-ref.html#pytorch)
    * [Tensorflow](https://docs.ray.io/en/latest/ray-air/package-ref.html#tensorflow)
    * [Horovod](https://docs.ray.io/en/latest/ray-air/package-ref.html#horovod)
    * [XGBoost](https://docs.ray.io/en/latest/ray-air/package-ref.html#xgboost)
    * [HuggingFace Transformers](https://docs.ray.io/en/latest/ray-air/package-ref.html#huggingface)
    * [Scikit-Learn](https://docs.ray.io/en/latest/ray-air/package-ref.html#scikit-learn)
    * and more!

#### Useful features

* Callbacks for early stopping
* Checkpointing
* Integration with observability tools like Tensorboard, Weights & Biases, and mlflow
* Export mechanisms for models

|<img src="../_static/assets/Introduction_to_Ray_AIR/train_code.png" width="70%" loading="lazy">|
|:--|
|Define the `Trainer` object and then fit it to the training dataset. This snippet uses a `TorchTrainer`, however, this may be swapped out with any [integration](https://docs.ray.io/en/latest/ray-air/package-ref.html#trainer-and-predictor-integrations) or custom-defined `Trainer`.|

In the next section, define and fit an XGBoost Trainer to fit the NYC taxi data.

### Define AIR `Trainer`

Ray AIR provides a variety of built-in [`Trainers`](https://docs.ray.io/en/latest/ray-air/trainer.html) (PyTorch, Tensorflow, HuggingFace, etc.). In the example below, you will use a Ray `XGBoostTrainer` which [offers support](https://docs.ray.io/en/latest/train/gbdt.html) for XGBoost models.

In [None]:
from ray.air.config import ScalingConfig
from ray.train.xgboost import XGBoostTrainer

In [None]:
trainer = XGBoostTrainer(
    label_column="is_big_tip",
    num_boost_round=50,
    scaling_config=ScalingConfig(
        num_workers=5,
        use_gpu=False,
    ),
    params={
        "objective": "binary:logistic",
        "eval_metric": ["logloss", "error"],
        "tree_method": "approx",
    },
    datasets={"train": train_dataset, "valid": valid_dataset},
    preprocessor=preprocessor,
)

To construct a `Trainer`, you provide three base components:

- A `ScalingConfig` which [specifies](https://docs.ray.io/en/releases-2.0.0rc0/ray-air/config-scaling.html) how many parallel training workers and what type of resources (CPUs/GPUs) to use per worker during training; supports seamless scaling across heterogeneous hardware.
- A dictionary of training and validation sets.
- The `Preprocessor` used to transform the `Datasets`.

Optionally, you can choose to add `resume_from_checkpoint` which allows you to continue training from a [saved checkpoint](https://docs.ray.io/en/latest/ray-air/package-ref.html#ray.air.checkpoint.Checkpoint) should the run be interrupted.

### Fit the Trainer

In [None]:
# Invoke training. 
# The resulting object grants access to metrics, checkpoints, and errors.
result = trainer.fit()

### Inspect training results

You can check out the training results from the `Result` object with the following calls:

```python
# returns last saved checkpoint
result.checkpoint

# returns the `n` best saved checkpoints as configured in `RunConfig.CheckpointConfig`
result.best_checkpoints

# returns the final metrics as reported
result.metrics

# returns an Exception if training failed
result.error
```

Inspect your training result below. What is the reported accuracy for the training and validation runs?

Note: `result.error` contains the binary classification error rate in this case calculated as `#(wrong cases)/#(all cases)`

In [None]:
print(f"Result metrics: \n {result.metrics} \n")

In [None]:
print(f"Training accuracy: {1 - result.metrics['train-error']:.4f}")
print(f"Validation accuracy: {1 - result.metrics['valid-error']:.4f}")

In [None]:
result.checkpoint

### Summary

#### Key concepts

**`Checkpoint`**

Store the full state of the model periodically, so that partially trained models are available and can be used to resume training from an intermediate point, instead of starting from scratch; also allows for the best model to be saved for batch inference later on.

**`Trainer`**

Trainers are wrapper classes around third-party training frameworks such as XGBoost, Pytorch, and Tensorflow. They are built to help integrate with Ray Actors (for distribution), Ray Datasets, and Ray Tune.

#### Shutdown Ray runtime

In [None]:
# Disconnect the worker, and terminate processes started by ray.init().
ray.shutdown()

## Part 3: Run data loading and model training as an Anyscale Job

<img src="../_static/assets/Generic/ray_logo.png" width="20%" loading="lazy">