### Ray AI Runtime: An end-to-end for image classification example

[Ray AI Runtime (AIR)](https://docs.ray.io/en/latest/ray-air/getting-started.html) is a scalable and unified toolkit for machine learning (ML) applications. AIR enables simple scaling of individual workloads and end-to-end workflows using popular ML frameworks, all in just Python.

AIR builds on Ray’s best-in-class libraries for [Preprocessing](https://docs.ray.io/en/latest/data/dataset.html#datasets), [Training](https://docs.ray.io/en/latest/train/train.html#train-docs), [Tuning](https://docs.ray.io/en/latest/tune/index.html#tune-main), [Scoring](https://docs.ray.io/en/latest/ray-air/predictors.html#air-predictors), [Serving](https://docs.ray.io/en/latest/serve/index.html#rayserve), and [Reinforcement Learning](https://docs.ray.io/en/latest/rllib/index.html#rllib-index) to bring together an ecosystem of integrations. 

Out of the box with Ray AIR, you can easily build the common machine line pipeline with AIR's components.

<img src="images/air_ml_workflow.png" width="60%" height="30%">

This notebook employs each of these components to illustrate how in six distinct steps, using each of the Ray AIR [components and its APIs](https://docs.ray.io/en/latest/ray-air/package-ref.html#ray-air-api), to build a common ML pipeline for an image classification.


### The CIFAR-10 dataset for classification

The CIFAR-10 dataset consists of 60,000 32x32 colour images in 10 classes, with 6000 images per class. There are 50000 training images and 10000 test images.

The dataset is divided into five training batches and one test batch, each with 10000 images. The test batch contains exactly 1000 randomly-selected images from each class. The training batches contain the remaining images in random order, but some training batches may contain more images from one class than another. Between them, the training batches contain exactly 5000 images from each class.

Here are the classes in the dataset, as well as 10 random images from each:

<img src="images/cifar-10.png" width="60%" height="30%">

[Cifar-10 source](https://www.cs.toronto.edu/~kriz/cifar.html)

### Learning objectives

  * Use AIR's PyTorch Trainers, Tuners, Checkpoint, Batch & Online inference predictors
  * Understand how Ray data integrates with Ray AIR for data ingestion and prediction
  * Use out-of-box Preprocessors
  * Load model from the best model checkpoint and use for batch inference
  * Deploy best checkpoint model and use for online inference
  * Using all the Ray discrete components to write an end-to-end ML application in a single Python script

In [1]:
import ray

from ray import serve
from ray.serve import PredictorDeployment
 
import cifar_utils

from ray.train.torch import TorchTrainer
from ray.tune import Tuner
from ray.tune import TuneConfig
from ray.air.config import RunConfig
from ray.air.config import ScalingConfig
from ray.air.config import CheckpointConfig

from ray.train.torch import TorchPredictor
from ray.train.batch_predictor import BatchPredictor

import torch
import numpy as np
import pandas as pd
import requests

In [2]:
if ray.is_initialized():
    ray.shutdown()
ray.init()

2023-01-02 13:41:51,018	INFO worker.py:1529 -- Started a local Ray instance. View the dashboard at [1m[32mhttp://127.0.0.1:8266 [39m[22m


0,1
Python version:,3.8.13
Ray version:,2.2.0
Dashboard:,http://127.0.0.1:8266


In [3]:
ray.cluster_resources()

{'memory': 48791280026.0,
 'CPU': 10.0,
 'object_store_memory': 2147483648.0,
 'node:127.0.0.1': 1.0}

### Step 1. Fetch Data

Use Ray data to fetch data from a data source. Here we use a simple built-in `SimpleTorchDatasouce`. For preprocessing needs, such as scaling or normalizing, 
Ray AIR provides a host of preprocessors you can use; these [preprocessors](https://docs.ray.io/en/latest/ray-air/preprocessors.html) can be supplied downstream to `Trainers` and `BatchPredictors` to preprocess your 
input before training, tuning, or scoring, relieving you the onus of doing it yourself. 

<img src="images/data_prep.png" width="360%" height="30%">

In [4]:
train_torchvision_dataset = cifar_utils.train_dataset_factory()
test_torchvision_dataset = cifar_utils.test_dataset_factory()

Files already downloaded and verified
Files already downloaded and verified


In [5]:
train_torchvision_dataset, type(train_torchvision_dataset) 

(Dataset CIFAR10
     Number of datapoints: 50000
     Root location: /Users/jules/data
     Split: Train
     StandardTransform
 Transform: Compose(
                ToTensor()
                Normalize(mean=(0.5, 0.5, 0.5), std=(0.5, 0.5, 0.5))
            ),
 torchvision.datasets.cifar.CIFAR10)

**Note**: The above are PyTorch torchvision data types. We want to convert
them into Ray Dataset types.

### Step 2: Create a preprocessor to transform data
* Convert Torchvisiion data into Ray data
* Transform data into Pandas DataFrame because internally Ray data uses Pandas by default as it internal representation
* Use dataset `map_batches`() to convert 

In [6]:
train_dataset: ray.data.Dataset = ray.data.from_torch(train_torchvision_dataset)
test_dataset: ray.data.Dataset = ray.data.from_torch(test_torchvision_dataset)
train_dataset, type(train_dataset)

(Dataset(num_blocks=200, num_rows=50000, schema=<class 'tuple'>),
 ray.data.dataset.Dataset)

In [7]:
train_dataset = train_dataset.map_batches(cifar_utils.convert_batch_to_numpy)
test_dataset = test_dataset.map_batches(cifar_utils.convert_batch_to_numpy)
train_dataset, type(train_dataset)

Map_Batches: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 200/200 [00:01<00:00, 114.18it/s]
Map_Batches: 100%|██████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 200/200 [00:00<00:00, 658.71it/s]


(Dataset(num_blocks=200, num_rows=50000, schema={image: ArrowTensorType(shape=(3, 32, 32), dtype=float), label: int64}),
 ray.data.dataset.Dataset)

### Step 3: Create a `TorchTrainer` for PyTorch DDP training
This trainer is then fed into tuner for doing some hyper-parameter optimization. Ray AIR supports popular [Trainers](https://docs.ray.io/en/latest/ray-air/trainer.html)
for the common ML libraries: PyTorch, TensorFlow, XGBoost, scikit-learn. These can be used alongside [Tuner](https://docs.ray.io/en/latest/ray-air/tuner.html)

<img src="images/trainer.png" width="60%" height="30%">

In [8]:
trainer = TorchTrainer(train_loop_per_worker=cifar_utils.train_loop_per_worker,
                        datasets={"train": train_dataset},
                        train_loop_config={"epochs": 40, "batch_size": 32},
                        scaling_config=ScalingConfig(num_workers=4, use_gpu=False), # change to True if on GPU
)

### Step 4: Create the Tuner 

Tuning a model over a hyperparameters space is a common workload and paramount to get better accuracy. Here we tune the relevant tune configuration such as `lr`, `batch_size`, `epochs`.
However, you can use SOTA [search algorithms](https://docs.ray.io/en/latest/tune/api_docs/suggestion.html) and [schedulers](https://docs.ray.io/en/latest/tune/api_docs/schedulers.html) alongside [Tuner](https://docs.ray.io/en/latest/tune/api_docs/execution.html#ray.tune.Tuner) to control your trials, depending on your model requirements and desired results.


<img src="images/tuner.png" width="60%" height="30%">

In [10]:
tuner = Tuner(trainer,
        param_space={
            "train_loop_config": {
                "lr": ray.tune.grid_search([0.001, 0.01]),
            }
        },
        # specific tune metrics to collect and checkpoint
        # during trials
        tune_config=TuneConfig(metric="train_loss", mode="min"),
        run_config=RunConfig(checkpoint_config=CheckpointConfig(num_to_keep=1, 
                        checkpoint_score_attribute="train_loss", 
                        checkpoint_score_order="min")
        )
    )

Once we have set up our Trainer and Tuner, all we need to create distributed training and trials is use the 
familiar API .`.fit`() to launch our distributed trainiing and tuning.

In [None]:
# Run the tuner, which will call trainer for each trial with
# the parameters configurations as part of it HPO and trains it in parallel
# with a shard of its own data. This is distributed training and tunning in parallel

results = tuner.fit()

0,1
Current time:,2023-01-03 07:09:15
Running for:,17:22:15.19
Memory:,34.0/64.0 GiB

Trial name,status,loc,train_loop_config/lr,iter,total time (s),train_loss,_timestamp,_time_this_iter_s
TorchTrainer_fc79e_00000,RUNNING,127.0.0.1:32555,0.001,17,62146.2,1.26575,1672758168,3445.67
TorchTrainer_fc79e_00001,RUNNING,127.0.0.1:32562,0.01,17,62131.8,0.434617,1672758156,3444.93


[2m[36m(RayTrainWorker pid=32563)[0m 2023-01-02 13:47:04,260	INFO config.py:86 -- Setting up process group for: env:// [rank=0, world_size=4]
[2m[36m(RayTrainWorker pid=32563)[0m 2023-01-02 13:47:06,061	INFO train_loop_utils.py:270 -- Moving model to device: cpu
[2m[36m(RayTrainWorker pid=32563)[0m 2023-01-02 13:47:06,062	INFO train_loop_utils.py:330 -- Wrapping provided model in DistributedDataParallel.
[2m[36m(RayTrainWorker pid=32581)[0m 2023-01-02 13:47:07,010	INFO config.py:86 -- Setting up process group for: env:// [rank=0, world_size=4]
[2m[36m(RayTrainWorker pid=32581)[0m 2023-01-02 13:47:07,643	INFO train_loop_utils.py:270 -- Moving model to device: cpu
[2m[36m(RayTrainWorker pid=32581)[0m 2023-01-02 13:47:07,643	INFO train_loop_utils.py:330 -- Wrapping provided model in DistributedDataParallel.


Trial name,_time_this_iter_s,_timestamp,_training_iteration,date,done,episodes_total,experiment_id,hostname,iterations_since_restore,node_ip,pid,should_checkpoint,time_since_restore,time_this_iter_s,time_total_s,timestamp,timesteps_since_restore,timesteps_total,train_loss,training_iteration,trial_id,warmup_time
TorchTrainer_fc79e_00000,3445.67,1672758168,17,2023-01-03_07-02-48,False,,2703a3aeebc241af871d20e41999bab9,Juless-MacBook-Pro-16,17,127.0.0.1,32555,True,62146.2,3445.73,62146.2,1672758168,0,,1.26575,17,fc79e_00000,0.311514
TorchTrainer_fc79e_00001,3444.93,1672758156,17,2023-01-03_07-02-36,False,,329889f7d4b44059862c5edb36a3d610,Juless-MacBook-Pro-16,17,127.0.0.1,32562,True,62131.8,3444.96,62131.8,1672758156,0,,0.434617,17,fc79e_00001,0.30891


In [None]:
# Get the best checkpoint result
best_checkpoint = results.get_best_result(metric="train_loss", mode="min").checkpoint
best_checkpoint

In [None]:
best_checkpoint.to_directory("./best_checkpoint")

### Step 5: Use `TorchPredictor` using the `Checkpoint` object
This allows batch inference at scale:
* Test our model with [`TorchPredictor`](https://docs.ray.io/en/latest/_modules/ray/train/torch/torch_predictor.html?highlight=TorchPredictor) using the [`Checkpoint`](https://docs.ray.io/en/latest/ray-air/key-concepts.html#checkpoints) object.
* Fetch the best_checkpoint from the  checkpoint. 
* Use batch predictor to test the entire batch in one go


<img src="images/batch_predictor.png" width="60%" height="30%">

In [None]:
# drop the label column
predict_dataset = test_dataset.drop_columns(cols=["label"])

# Create our BatchPredictor from the best checkpoint obtained above after all the trials are finished
batch_predictor = BatchPredictor.from_checkpoint(
    checkpoint=best_checkpoint,
    predictor_cls=TorchPredictor,
    model=my_utils.ResNet18(),
    use_gpu=True
)

# Do prediction at scale over the entire batch
output: ray.data.Dataset = batch_predictor.predict(
    data=test_dataset, dtype=torch.float, 
    feature_columns=["image"], 
    keep_columns=["label"],
    num_gpus_per_worker=1,
)

In [None]:
# Get the predictions from the output and convert them into classes
predictions = output.map_batches(my_utils.convert_logits_to_classes, batch_format="pandas")

# Get all predictions for test_dataset 
scores = predictions.map_batches(my_utils.calculate_prediction_scores)

# compute total prediction accuracy. That is all predictions equal to ground truth
# That is, predictated accurately.
total_acc = scores.sum(on="correct") / scores.count()

print(f"Prediction accuracy from the test data of 10,000 images: {total_acc * 100:.2f}%")

### Step 6: Online prediction
 * Deploy our model to the network for online prediction
 * Use `TorchPredictor` and [`PredictorDeployment`](https://docs.ray.io/en/latest/ray-air/package-ref.html?highlight=PredictorDeployment#ray.serve.air_integrations.PredictorDeployment) APIs
 * Start a Ray serve in detached mode, with two replicas
 
 This will automatically deploy the model from the lastest checkpoint. You can adjust num of replicas in the options.
 
 <img src="images/online_predictor.png" width="60%" height="25%">
 

In [None]:
# Start Ray Serve in detached mode so that if this application terminates, Serve can still process the incoming
# requests. 
serve.start(detached=True)

# create the PredictorDeployment and give it a name and number of replicas. Depending on your
# request loads, you can scale up the number of replicas
deployment = PredictorDeployment.options(name="cifar-deployment", num_replicas=2)
deployment.deploy(TorchPredictor, best_checkpoint, batching_params=False, model=my_utils.ResNet18(), http_adapter=my_utils.json_to_numpy)

### Test online prediction from the test data

In [None]:
# Test online deployment
batch = test_dataset.take(10)
for i in range(10):
    array = np.expand_dims(np.array(batch[i]["image"]), axis=0)
    label = np.array(batch[i]["label"])
    # send request and fetch prediction
    payload  = {"array": array.tolist()}
    response = requests.post(deployment.url, json=payload)
    result = response.json()[0]
    idx, cls = my_utils.to_prediction_cls(result)
    matched = idx == label
    my_utils.img_show(batch[i]["image"])
    print(f"prediction: {idx}; class: {cls}; matched: {matched}")

In [None]:
# Shutdown our server
serve.shutdown()

### Summary

In this end-to-end ML application, you walked through a step-by-step guide on how to use Ray AIR library and its various components at each stage of an ML workflow and its respective workload: from data preprocessing and ingestion, training and tuning, to batch inference, and finaly deploying it for online inference. For this image classificaiton use case, we use PyTorch and its relevant AIR integrations such
as TorchTrainer, TorchCheckpoint, TorchPredictor, etc. AIR supports other common frameworks such as TensorFlow, XGBoost, Scikit-learn, HugginFace etc.

While we used fixed set of hyperparameters, try training and tuning with some different parameters in the exercise below.

**Note**: On single machine, an M1 Mac laptop with 10 cores, this took over 15+ hours to train. On an Anyscale Ray cluster with 4 nodes and 32 GPUs, it was matter of minutes!


### Exercises

 * Try scaling your Trainer and Tunner
  * increase the `num_workers` to 3 or 4
  * extend the tuning space 
    * `epochs=[75, 100, 125]`
    * `batch_size=[64, 128]`

### Homework 
 * Read the resources below


### Resources

Here some extra reading and resources for you:
 * Watch the Ray Summit 2022 on [Introduction to Ray AIR](https://www.anyscale.com/ray-summit-2022/agenda/sessions/226)
 * Ray AIR [documentation](https://docs.ray.io/en/latest/ray-air/getting-started.html)
 * Understand its [Components and APIs](https://docs.ray.io/en/latest/ray-air/package-ref.html)
 * Ray AIR [user guides](https://docs.ray.io/en/latest/ray-air/user-guides.html) and [examples](https://docs.ray.io/en/latest/ray-air/examples/index.html) 