# Overview of Ray

<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Generic/ray_logo.png" width="20%" loading="lazy">

## About this notebook

### Is it right for you?

This is an introductory notebook that offers a broad overview of the Ray project. The ideal learner will find themselves in the following scenarios:

* You are new to Ray and are looking for a project primer.
* You are interested in how Ray, a Python first distributed computing framework, can scale your Python applications and machine learning workloads.

### Prerequisites

For this notebook, you should satisfy the following requirements:

* Practical Python and machine learning experience
* No prior experience with Ray or distributed computing

### Learning objectives

* Understand what Ray is.
* Recognize key characteristics of Ray.
* Tour the three layers of Ray.
    * Ray Core
    * Native libraries
    * Ecosystem of integrations
* Explore the most common Ray use cases.
* Implement a regression task.
    * Sequentially in generic Python
    * In parallel with Ray
* Identify where to go next with Ray.

### What will you do?

In *Part 1* of this notebook you will learn about Ray project. Then, in *Part 2* you will run an illustrative code example that will give you better "feel" for Ray.

## Part 1: Ray project

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/ray_project.png" width="70%" loading="lazy">|
|:--|
|Ray is one of the leading open source ML projects. (date accessed: Nov 2, 2022)|

### Introduction

#### What is Ray?

<div class="alert alert-info">
  <strong><a href="https://www.ray.io/" target="_blank">Ray</a></strong> is an open-source unified compute framework that makes it easy to scale AI and Python workloads.
</div>

Ray provides the compute layer to scale applications without becoming a distributed systems expert. These are some key processes that Ray automatically handles:

* **Orchestration.** Managing the various components of a distributed system.
* **Scheduling.** Coordinating when and where tasks are executed.
* **Fault tolerance.** Ensuring tasks complete regardless of inevitable points of failure.
* **Auto-scaling.** Adjusting the number of resources allocated to dynamic demand.

To lower the effort needed to scale compute intensive workloads, Ray takes a Python-first approach and integrates with many common data science tools. This allows ML practitioners to parallelize Python applications from a laptop to a cluster with minimal code changes.

#### Distributed computing: a bit of context and project history

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/ai_compute_annotated.png" width="70%" loading="lazy">|
|:--|
|Amount of compute used in the largest AI training runs far outpaces the processing power of individual CPUs, GPUs, and TPUs. Original diagram from [OpenAI](https://openai.com/blog/ai-and-compute/) with overlaid annotations.|

Distributed computing is becoming increasingly relevant for modern machine learning systems. OpenAI's recent paper [AI and Compute](https://openai.com/blog/ai-and-compute/) suggests that the amount of compute needed to train AI models has roughly doubled every 3.5 months since 2012.

However, distributed systems are hard to program. Scaling a Python application to a cluster introduces challenges in communication, scheduling, security, failure handling, heterogeneity, transparency, and much more.

This context drove the development of Ray: a solution to enable developers to run Python code on clusters without having to think about how to orchestrate and utilize individual machines. The same researchers who created Ray at the University of California Berkeley's [RISELab](https://rise.cs.berkeley.edu/) (the successor to the [AMPLab](https://amplab.cs.berkeley.edu/about/) that created [Apache Spark](https://spark.apache.org/) and [Mesos](https://mesos.apache.org/)) founded [Anyscale](https://www.anyscale.com/)——a managed Ray platform that offers hosted solutions for Ray applications.

### Key Ray characteristics

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/python_first.jpeg" width="70%" loading="lazy">|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/simple_and_flexible_api.jpeg" width="70%" loading="lazy">|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/scalability.jpeg" width="70%" loading="lazy">|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/heterogeneous_hardware.jpeg" width="70%" loading="lazy">|
|:-:|:-:|:-:|:-:|
|Python first approach|Simple and flexible API|Scalability|Support for heterogeneous hardware|

#### Python first approach

<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/python_first.jpeg" width="100px" loading="lazy">

Ray allows you to flexibly compose distributed applications with easy to use primitives in native Python code. This way, you can scale your existing workloads with minimal code changes. Getting started with Ray Core involves just a few key abstractions:

1. [**Tasks**](https://docs.ray.io/en/latest/ray-core/key-concepts.html#tasks). Remote, stateless Python functions
1. [**Actors**](https://docs.ray.io/en/latest/ray-core/key-concepts.html#actors). Remote, stateful Python classes
1. [**Objects**](https://docs.ray.io/en/latest/ray-core/key-concepts.html#objects). Tasks and actors create and compute on objects that can be stored and accessed anywhere in the cluster; cached in Ray's distributed [shared-memory](https://en.wikipedia.org/wiki/Shared_memory) object store

You will learn more about these abstractions in the [Ray Core tutorials](https://github.com/ray-project/ray-educational-materials/tree/main/Ray_Core).

#### Simple and flexible API

<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/simple_and_flexible_api.jpeg" width="100px" loading="lazy">

##### Ray Core

<div class="alert alert-info">
  <strong><a href="https://docs.ray.io/en/latest/ray-core/walkthrough.html" target="_blank">Ray Core</a></strong> is an open-source, Python, general purpose, distributed computing library that enables ML engineers and Python developers to scale Python applications and accelerate machine learning workloads.
</div>

Acting as the foundational library for the whole ecosystem, Ray Core provides a minimalist API that enables distributed computing. With just a few methods, you can start building distributed apps:

* **`ray.init()`**  
Start Ray runtime and connect to the Ray cluster.
* **`@ray.remote`**  
Decorator that specifies a Python function or class to be executed as a task (remote function) or actor (remote class) in a different process.
* **`.remote`**  
Postfix to the remote functions and classes; remote operations are *asynchronous*.
* **`ray.put()`**  
Put an object in the in-memory object store; returns an object reference used to pass the object to any remote function or method call.
* **`ray.get()`**  
Get a remote object(s) from the object store by specifying the object reference(s).

*(In the second part of this notebook you will see illustrative example for some of these methods.)*

##### Ray AI Runtime (AIR)

<div class="alert alert-info">
  <strong><a href="https://docs.ray.io/en/latest/ray-air/getting-started.html" target="_blank">Ray AI Runtime (AIR)</a></strong> is an open-source, Python, domain-specific set of libraries that equips ML engineers, data scientists, and researchers with a scalable and unified toolkit for ML applications.
</div>

Ray AI Runtime (AIR) (sometimes referred to as native libraries and ecosystem of integrations) provides higher level APIs that cater to more domain-specific use cases. Ray AIR enables data scientists and ML engineers to scale individual workloads, end-to-end workflows, and popular ecosystem frameworks, all in Python.

#### Scalability

<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/scalability.jpeg" width="100px" loading="lazy">

Ray allows users to utilize large compute clusters in an easy, productive, and resource-efficient way.

Fundamentally, Ray treats the entire cluster as a single, unified pool of resources and takes care of optimally mapping compute workloads to the pool. By doing so, Ray largely eliminates non-scalable factors in the system. 

Some examples of successful user stories include the following:
* [Instacart](https://www.youtube.com/watch?v=3t26ucTy0Rs) uses Ray to power their large scale fulfillment ML pipline.
* [OpenAI](https://twitter.com/anyscalecompute/status/1562136159135973380) trains their largest models (including ChatGPT).
* Companies like [HuggingFace and Cohere](https://www.youtube.com/watch?v=For8yLkZP5w) use Ray Train for scaling model training.

A notable strength is Ray's [autoscaler](https://docs.ray.io/en/latest/cluster/key-concepts.html#autoscaling) implements automatic scaling of Ray clusters based on the resource demands of an application. The autoscaler will increase worker nodes when the Ray workload exceeds the cluster's capacity. Whenever worker nodes sit idle, the autoscaler will scale them down.

#### Support for heterogeneous hardware

<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/heterogeneous_hardware.jpeg" width="100px" loading="lazy">

Heterogeneous systems present new challenges to distribution because each compute unit has its own programming model. Ray natively supports heterogeneous hardware to achieve load balancing, coherency, and consistency under the hood. All you need to do is specify hardware when initializing a task or actor. For example, a developer can specify in the same application that a one task needs 128 CPUs, another task only requires 0.5 GPUs, and an actor requires 36 CPUs and 8 GPUs.

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/heterogeneous_hardware_code.png" width="60%" loading="lazy">|
|:--|
|Easily specify amount of resources needed, by using `num_cpus` and `num_gpus`|

An illustrative example is the production [deep learning pipeline at Uber](https://www.anyscale.com/ray-summit-2022/agenda/sessions/215). A heterogeneous setup of 8 GPU nodes and 9 CPU nodes improves the pipeline throughput by 50%, while substantially saving capital cost, compared with the legacy setup of 16 GPU nodes.

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/uber.png" width="70%" loading="lazy">|
|:--|
|Deep learning pipeline at Uber using a heterogeneous hardware setup with Ray.|

### Ray libraries

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/map.png" width="70%" loading="lazy">|
|:--|
|Stack of Ray libraries - unified toolkit for ML workloads.|

There are four layers to Ray's unified compute framework:

1. Ray cluster
1. Ray Core
1. Ray AI Runtime (native libraries)
1. Integrations and ecosystem

#### Ray cluster

<div class="alert alert-info">
  <strong><a href="https://docs.ray.io/en/latest/cluster/getting-started.html" target="_blank">Ray cluster</a></strong> is a set of worker nodes connected to a common Ray head node. Ray clusters can be fixed-size, or they can autoscale up and down according to the resources requested by applications running on the cluster.
</div>

Starting at the bottom with a [cluster](https://docs.ray.io/en/latest/cluster/getting-started.html), Ray sets up and manages clusters of computers so that you can run distributed applications on them. You can deploy a Ray cluster on AWS, GCP or on Kubernetes via the officially supported [KubeRay](https://docs.ray.io/en/latest/cluster/kubernetes/index.html) project. 

Note: [Anyscale](https://www.anyscale.com/), the company behind Ray, builds enterprise-ready AI compute platform for running and managing Ray applications.

#### Ray Core

<div class="alert alert-info">
  <strong><a href="https://docs.ray.io/en/latest/ray-core/walkthrough.html" target="_blank">Ray Core</a></strong> is an open-source, Python, general purpose, distributed computing library that enables ML engineers and Python developers to scale Python applications and accelerate machine learning workloads.
</div>

Ray Core is the foundation that Ray's ML libraries (Ray AIR) and third-party integrations (Ray ecosystem) are built on. This library enables Python developers to easily build scalable, distributed systems that can run on a laptop, cluster, cloud or Kubernetes.

Expanding on the key abstractions mentioned before:

1. [**Tasks**](https://docs.ray.io/en/latest/ray-core/key-concepts.html#tasks). Remote, stateless Python functions.  
Ray tasks are arbitrary Python functions that are executed asychronously on separate Python workers on a Ray cluster nodes. Users can specify their resource requirements in terms of CPUs, GPUs, and custom resources which are used by the cluster scheduler to distribute tasks for parallelized execution.

2. [**Actors**](https://docs.ray.io/en/latest/ray-core/key-concepts.html#actors). Remote stateful Python classes.  
What tasks are to functions, actors are to classes. An actor is a stateful worker, and the methods of an actor are scheduled on that specific worker and can access and mutate the state of that worker. Like tasks, actors support CPU, GPU, and custom resource requirements.

3. [**Objects**](https://docs.ray.io/en/latest/ray-core/key-concepts.html#objects). In-memory, immutable objects or values that can be accessed anywhere in the computing cluster.  
In Ray, tasks and actors create and compute on objects. These remote objects can be stored anywhere in a Ray cluster. Object References are used to refer to them, and they are cached in Ray's distributed shared memory object store.

#### Ray AI Runtime

<div class="alert alert-info">
  <strong><a href="https://docs.ray.io/en/latest/ray-air/getting-started.html" target="_blank">Ray AI Runtime (AIR)</a></strong> is an open-source, Python, domain-specific set of libraries that equip ML engineers, data scientists, and researchers with a scalable and unified toolkit for ML applications.
</div>

Ray AIR is built on top of Ray Core and focuses on distributed both individual and end-to-end machine learning workflows.

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Introduction_to_Ray_AIR/e2e_air.png" width="70%" loading="lazy">|
|:--|
|Ray AIR enables end-to-end ML development and provides multiple options to integrate with other tools and libraries from the MLOps ecosystem.|

Each of the five native libraries that Ray AIR wraps distributes a specific ML task:

1. [**Ray Data**](https://docs.ray.io/en/latest/data/dataset.html)  
Scalable, framework-agnostic data loading and transformation across training, tuning, and prediction.
    
2. [**Ray Train**](https://docs.ray.io/en/latest/train/train.html)  
Distributed multi-node and multi-core model training with fault tolerance that integrates with popular training libraries.

3. [**Ray Tune**](https://docs.ray.io/en/latest/tune/index.html)  
Scales hyperparameter tuning to optimize model performance.

4. [**Ray Serve**](https://docs.ray.io/en/latest/serve/index.html)  
Deploys models for online inference, with optional microbatching to improve performance.

5. [**Ray RLlib**](https://docs.ray.io/en/latest/rllib/index.html)  
Distributed reinforcement learning workloads that integrate with the other Ray AIR libraries.

#### Integrations and ecosystem libraries

Ray integrates with a [growing ecosystem](https://docs.ray.io/en/latest/ray-overview/ray-libraries.html) of the most popular Python and machine learning libraries and frameworks. Instead of trying to impose a new standards, Ray allows you to scale existing workloads by unifying tools in a common interface. This interface enables you to run ML tasks in a distributed way, a property most of the respective backends don't have, or not to the same extent.

Here are a handful of integrations to highlight:

* **Ray Datasets**
  * [Dask](https://docs.ray.io/en/latest/ray-more-libs/dask-on-ray.html)
  * [Pandas/Modin](https://docs.ray.io/en/latest/data/modin/index.html)
* **Ray Train and RLlib** 
  * [Tensorflow](https://docs.ray.io/en/latest/train/api.html#ray.train.tensorflow.TensorflowTrainer)
  * [PyTorch](https://docs.ray.io/en/latest/train/api.html#ray.train.torch.TorchTrainer)
* **Ray Tune**
  * [HyperOpt](https://docs.ray.io/en/latest/tune/examples/hyperopt_example.html)
  * [Optuna](https://docs.ray.io/en/latest/tune/examples/optuna_example.html)
* **Ray Serve**
  * [FastAPI](https://www.anyscale.com/blog/ray-serve-fastapi-the-best-of-both-worlds)
  * [gradio](https://docs.ray.io/en/latest/serve/tutorials/gradio-integration.html)

### Ray use cases

Now that you have a sense of what Ray is in theory, it's important to discuss what makes Ray so useful in practice. In this section, you will encounter the ways that individuals, organizations, and companies leverage Ray to build their AI applications.

First, you will explore how Ray scales common ML workloads. Then, you will about some advanced implementations.

#### Scaling common ML workloads

##### Parallel training of many models
When any given model you want to train can fit on a single GPU, then Ray can assign each training run to a separate Ray Task. In this way, all available workers are utilized to run independent remote training rather than one worker running jobs sequentially.

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/training_small_models.png" width="70%" loading="lazy">|
|:--|
|Data parallelism pattern for distributed training on large datasets.|

##### Distributed training of large models
In contrast to training many models, model parallelism partitions a large model across many machines for training. Ray Train has built-in abstractions for distributing shards of models and running training in parallel.

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/model_parallelism.png" width="70%" loading="lazy">|
|:--|
|Model parallelism pattern for distributed large model training.|

##### Managing parallel hyperparameter tuning experiments
Running multiple hyperparameter tuning experiments is a pattern apt for distributed computing because each experiment is independent of one another. Ray Tune handles the hard bit of distributing hyperparameter optimization and makes available key features such as checkpointing the best result, optimizing scheduling, specifying search patterns, and more.

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/tuning_use_case.png" width="70%" loading="lazy">|
|:--|
|Distributed tuning with distributed training per trial.|

##### Reinforcement learning
Ray RLlib offers support for production-level, distributed reinforcement learning workloads while maintaining unified and simple APIs for a large variety of industry applications.

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/rllib_use_case.png" width="70%" loading="lazy">|
|:--|
|Decentralized distributed proximal polixy optimiation (DD-PPO) architecture, supported by Ray RLLib, where sampling and training are done on worker GPUs.|

##### Batch inference on CPUs and GPUs
Performing inference on incoming batches of data can be parallelized by exporting the architecture and weights of a trained model to the shared object store. Then, using these model replicas, Ray scales predictions on batches across workers.

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/batch_inference.png" width="70%" loading="lazy">|
|:--|
|Using Ray AIR's `BatchPredictor` for batch inference.|

##### Multi-model composition for model serving

[Ray Serve](https://docs.ray.io/en/latest/serve/index.html) supports complex [model deployment patterns](https://www.youtube.com/watch?v=mM4hJLelzSw) requiring the orchestration of multiple Ray actors, where different actors provide inference for different models. Serve handles both batch and online inference and can scale to thousands of models in production.

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/multi_model_serve.png" width="70%" loading="lazy">|
|:--|
|Deployment patterns with Ray Serve.|

##### ML platform

[Merlin](https://shopify.engineering/merlin-shopify-machine-learning-platform) is Shopify's ML platform built on Ray. It enables fast-iteration and [scaling of distributed applications](https://www.youtube.com/watch?v=kbvzvdKH7bc) such as product categorization and recommendations.

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/shopify.png" width="70%" loading="lazy">|
|:--|
|Merlin architecture built on Ray.|

Spotify [uses Ray for advanced applications](https://www.anyscale.com/ray-summit-2022/agenda/sessions/180) that include personalizing content recommendations for home podcasts and personalizing Spotify Radio track sequencing.

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/spotify.png" width="70%" loading="lazy">|
|:--|
|How Ray ecosystem empowers ML scientists and engineers at Spotify.|

#### Implementing advanced ML workloads

##### Alpa - training very large models with Ray

[Alpa](https://ai.googleblog.com/2022/05/alpa-automated-model-parallel-deep.html) is a Ray-native library that automatically partitions, schedules, and executes the training and serving computation of [very large deep learning models](https://www.anyscale.com/ray-summit-2022/agenda/sessions/170) on hundreds of GPUs.

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/alpa.png" width="70%" loading="lazy">|
|:--|
|Parallelization plans for a computational graph from Alpa. A, B, C, and D are operators. Each color represents a different device (i.e. GPU) executing a partition or the full operator leveraging Ray actors.|

##### Exoshuffle - large scale data shuffling

In Ray 2.0, [Exoshuffle](https://cs.paperswithcode.com/paper/exoshuffle-large-scale-shuffle-at-the) is integrated with the Ray Data to provide an application level shuffle system that [outperforms Spark](https://www.anyscale.com/ray-summit-2022/agenda/sessions/220) and achieves 82% of theoretical performance on a 100TB sort on 100 nodes.

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/exoshuffle.png" width="70%" loading="lazy">|
|:--|
|Shuffle on Ray architecture diagram.|

##### Hamilton - feature engineering

[Hamilton](https://github.com/stitchfix/hamilton) is an open source dataflow micro-framework that manages feature engineering for time series forecasting. Developed at [StitchFix](https://www.anyscale.com/ray-summit-2022/agenda/sessions/115), this library provides scalable parallelism, where each Hamilton function is distributed and data is limited by machine memory.

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/stitchfix.png" width="70%" loading="lazy">|
|:--|
|Hamilton architecture on Ray clusters.|

##### Riot Games - reinforcement learning

Riot Games uses [Ray to build bots](https://www.anyscale.com/ray-summit-2022/agenda/sessions/148) that play new battles at various skill levels. These reinforcement learning agents provide additional signals to their designers to deliver the best experiences for players.

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/riot.png" width="70%" loading="lazy">|
|:--|
|Riot reinforcement learning workflow on Ray including data transformation and training.|

### Summary of Part 1

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/map_layers_only.png" width="70%" loading="lazy">|
|:--|
|Stack of Ray libraries - unified toolkit for ML workloads.|

To review, this section has covered the following topics:

* Introduction to the Ray project
* Key characteristics of Ray
* The three layers of Ray
    * Ray Core
    * Native libraries
    * Ecosystem of integrations
* Common Ray use cases

#### Concepts

Key concepts introduced in Part 1:

<div class="alert alert-info">
  <strong><a href="https://www.ray.io/" target="_blank">Ray</a></strong> is an open-source unified compute framework that makes it easy to scale AI and Python workloads.
</div>

<div class="alert alert-info">
  <strong><a href="https://docs.ray.io/en/latest/cluster/getting-started.html" target="_blank">Ray clusters</a></strong> are a set of worker nodes connected to a common Ray head node. Ray clusters can be fixed-size, or they can autoscale up and down according to the resources requested by applications running on the cluster.
</div>

<div class="alert alert-info">
  <strong><a href="https://docs.ray.io/en/latest/ray-core/walkthrough.html" target="_blank">Ray Core</a></strong> is an open-source, Python, general purpose, distributed computing library that enables ML engineers and Python developers to accelerate machine learning workloads and scale Python applications.
</div>

<div class="alert alert-info">
  <strong><a href="https://docs.ray.io/en/latest/ray-air/getting-started.html" target="_blank">Ray AI Runtime (AIR)</a></strong> is an open-source, Python, domain specific library that equips ML engineers, data scientists, and researchers with a scalable and unified toolkit for ML applications.
</div>

#### APIs and technical abstractions

Three Ray Core abstractions that enable parallel computation:

1. [**Tasks**](https://docs.ray.io/en/latest/ray-core/key-concepts.html#tasks). Remote, stateless Python functions
1. [**Actors**](https://docs.ray.io/en/latest/ray-core/key-concepts.html#actors). Remote, stateful Python classes
1. [**Objects**](https://docs.ray.io/en/latest/ray-core/key-concepts.html#objects). Tasks and actors create and compute on objects that can be stored and accessed anywhere in the cluster; cached in Ray's distributed [shared-memory](https://en.wikipedia.org/wiki/Shared_memory) object store

#### What's next?

Apply Ray concepts by running an illustrative code example that will demonstrate how to distribute a simple application.

## Part 2: Hands-on code example - scaling regression with Ray Core

### Introduction

To gain a better feel for Ray, this section will scale a bare bones version of a common ML task: regression on structured data.

#### Data

You will be performing regression on the [California House Prices](https://scikit-learn.org/stable/datasets/real_world.html#california-housing-dataset) dataset made available by scikit-learn.

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/California_dataset.png" width="80%" loading="lazy">|
|:--|
|`n_samples = 20640`, target is numeric and corresponds to the average house value in units of 100k.|

#### Model and task

You will train and score [random forest](https://scikit-learn.org/stable/modules/generated/sklearn.ensemble.RandomForestRegressor.html) models using [mean squared error](https://scikit-learn.org/stable/modules/generated/sklearn.metrics.mean_squared_error.html) as the metric.

In a lightweight version of hyperparameter tuning, you will be training many models with varying values of `n_estimators`. First, you will encounter a sequential version of model training where each experiment executes in series one after another. Then, you will distribute these training runs with Ray Core to achieve better performance and faster model training.

### Sequential implementation

Starting with a familiar implementation, an assortment of random forest models are trained one by one sequentially as depicted in the diagram below.

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/sequential_timeline.png" width="70%" loading="lazy">|
|:--|
|Timeline of sequential tasks all on one worker. Each "task" in this case is training a random forest model.|

#### Preliminaries

In [1]:
import time
from operator import itemgetter

import pandas as pd
from sklearn.datasets import fetch_california_housing
from sklearn.datasets import fetch_kddcup99
from sklearn.ensemble import RandomForestRegressor
from sklearn.metrics import mean_squared_error
from sklearn.model_selection import train_test_split

#### Prepare dataset

In [2]:
X_raw, y_raw = fetch_california_housing(return_X_y=True, as_frame=True)

In [3]:
X_raw.head(n=5)

Unnamed: 0,MedInc,HouseAge,AveRooms,AveBedrms,Population,AveOccup,Latitude,Longitude
0,8.3252,41.0,6.984127,1.02381,322.0,2.555556,37.88,-122.23
1,8.3014,21.0,6.238137,0.97188,2401.0,2.109842,37.86,-122.22
2,7.2574,52.0,8.288136,1.073446,496.0,2.80226,37.85,-122.24
3,5.6431,52.0,5.817352,1.073059,558.0,2.547945,37.85,-122.25
4,3.8462,52.0,6.281853,1.081081,565.0,2.181467,37.85,-122.25


In [12]:
import copy
X = copy.deepcopy(X_raw)
y = copy.deepcopy(y_raw)
for i in range(50):
    X = pd.concat([X, X_raw])
    y = pd.concat([y, y_raw])

In [5]:
X_train, X_test, y_train, y_test = train_test_split(
    X, y, test_size=0.2, random_state=201
)

#### Set number of models to train

In [6]:
# You will use NUM_MODELS as a benchmark to compare performance
# across sequential and parallel implementations.

NUM_MODELS = 20

#### Implement function to train and score model

In [7]:
def train_and_score_model(
    train_set: pd.DataFrame,
    test_set: pd.DataFrame,
    train_labels: pd.Series,
    test_labels: pd.Series,
    n_estimators: int,
):
    start_time = time.time()  # measure wall time for single model training

    model = RandomForestRegressor(n_estimators=n_estimators, random_state=201)
    model.fit(train_set, train_labels)
    y_pred = model.predict(test_set)
    score = mean_squared_error(test_labels, y_pred)

    time_delta = time.time() - start_time
    print(
        f"n_estimators={n_estimators}, mse={score:.4f}, took: {time_delta:.2f} seconds"
    )

    return n_estimators, score

This function takes data, creates a `RandomForestRegressor` model, trains it and scores the model on the test set.

`train_and_score_model` returns a tuple:
```
(n_estimators, mse_score)
```

For example:

```
(8, 0.2983)
```

#### Implement function that runs **sequential** model training

In [8]:
def run_sequential(n_models: int):
    return [
        train_and_score_model(
            train_set=X_train,
            test_set=X_test,
            train_labels=y_train,
            test_labels=y_test,
            n_estimators=8 + 4 * j,
        )
        for j in range(n_models)
    ]

This function trains `n_models` sequentially for an increasing number of `n_estimators` (increasing by 4 for each model, e.g. 8, 12, 16, 20, ...). 

`run_sequential` returns a list of tuples:
```
[(n_estimators, mse_score), (n_estimators, mse_score), ...]
```

For example:

```
[(8, 0.2983), (12, 0.2826), (16, 0.2761), (24, 0.2694)]
```

#### Run sequential model training 

In [9]:
%%time

mse_scores = run_sequential(n_models=NUM_MODELS)

n_estimators=8, mse=0.0000, took: 5.40 seconds
n_estimators=12, mse=0.0000, took: 8.11 seconds
n_estimators=16, mse=0.0000, took: 10.81 seconds
n_estimators=20, mse=0.0000, took: 13.52 seconds
n_estimators=24, mse=0.0000, took: 16.22 seconds
n_estimators=28, mse=0.0000, took: 18.94 seconds


KeyboardInterrupt: 

Note: wall time on an M1 MacBook Pro: 1min (60s).

#### Analyze results

In [10]:
best = min(mse_scores, key=itemgetter(1))
print(f"Best model: mse={best[1]:.4f}, n_estimators={best[0]}")

NameError: name 'mse_scores' is not defined

Looking at the results of training, make a note on how long training `NUM_MODELS` sequentially took. Continue on to the next section to learn about how to improve runtime by distributing this task.

### Parallel implementation

In contrast to the previous approach, you will now utilize all available resources to train these models in parallel. Ray will automatically detect the number of cores on your computer or the amount of resources in a cluster to distribute each defined task.

The diagram below offers an intuition for how tasks are assigned and executed in a parallel approach. You will notice that this introduces a scheduler which is responsible for managing incoming requests, assigning nodes, and detecting available resources.

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/distributed_timeline.png" width="80%" loading="lazy">|
|:--|
|A generic timeline with ten tasks running across 4 workers in parallel, with minor overhead from the scheduler.|

#### Initialize Ray runtime

In [None]:
import ray

if ray.is_initialized:
    ray.shutdown()

ray.init()

Begin by running `ray.init()` to start a fresh Ray cluster and take a look at some useful information:

* Python version
* Ray version
* Link to Ray Dashboard: an observability tool that provides insight into what Ray is doing via helpful metrics and charts

#### Put data in the object store

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/object_store.png" width="70%" loading="lazy">|
|:--|
|Workers use `ray.put()` to place objects and use `ray.get()` to retrieve them from each node's object store. These object stores form the shared distributed memory that makes objects available across a Ray cluster.|

In a distributed system, object references are pointers to objects in memory. Object references can be used to access objects that are stored on different machines, allowing them to communicate with each other and share data.

In [None]:
X_train_ref = ray.put(X_train)
X_test_ref = ray.put(X_test)
y_train_ref = ray.put(y_train)
y_test_ref = ray.put(y_test)

By placing the training and testing data into Ray's object store, these objects are now available to all remote tasks and actors in the cluster.

**Coding Exercise**

To practice working with object references, use the cell below to:

1. Print what `X_train_ref` looks like.
2. Retrieve `X_train` by using `ray.get()` on the object reference.

An example Object Reference looks like this:

`ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000002000000)`

In [None]:
### YOUR CODE HERE ###

**Solution**

In [None]:
### SAMPLE IMPLEMENTATION ###

# print the object reference
print(X_train_ref)

# inspect the in-memory object
ray.get(X_train_ref)

#### Implement function to train and score model

In [None]:
@ray.remote
def train_and_score_model(
    train_set_ref: pd.DataFrame,
    test_set_ref: pd.DataFrame,
    train_labels_ref: pd.Series,
    test_labels_ref: pd.Series,
    n_estimators: int,
):
    start_time = time.time()  # measure wall time for single model training

    model = RandomForestRegressor(n_estimators=n_estimators, random_state=201)
    model.fit(train_set_ref, train_labels_ref)
    y_pred = model.predict(test_set_ref)
    score = mean_squared_error(test_labels_ref, y_pred)

    time_delta = time.time() - start_time
    print(
        f"n_estimators={n_estimators}, mse={score:.4f}, took: {time_delta:.2f} seconds"
    )

    return n_estimators, score

Notice that `train_and_score_model` is *the same function* as in the sequential example, except here, you add the `@ray.remote` decorator to specify that this function will be executed in a distributed manner.

#### Implement function that runs **parallel** model training

In [None]:
def run_parallel(n_models: int):
    results_ref = [
        train_and_score_model.remote(
            train_set_ref=X_train_ref,
            test_set_ref=X_test_ref,
            train_labels_ref=y_train_ref,
            test_labels_ref=y_test_ref,
            n_estimators=8 + 4 * j,
        )
        for j in range(n_models)
    ]
    return ray.get(results_ref)

Before, you defined `run_sequential()` to train and score `NUM_MODELS`. Working from the inside-out, modifying this into `run_parallel()` involves three steps:

1. Append a `.remote` postfix to `train_and_score_model`.  
    * Remember that you specified this function as a remote task in the previous cell. In Ray, you append this suffix to every remote call.
2. Capture the resulting list of object references in `results_ref`.
    * Rather than waiting for the results, you immediately receive a list of references to results that are expected to be available in the future. This asychronous (non-blocking) call allows a program to continue executing other operations while the potentially time-consuming operations can be computed in the background.
3. Access results with `ray.get()`.
    * Once all models have been assigned to workers, call `ray.get()` on the list of object references `results_ref` to retrieve completed results. This is a synchronous (blocking) operation because it waits until all computation on objects complete.

For example,

```
ray.get([ObjectRef, ObjectRef, ObjectRef, ...])
```

returns list of `(n_estimators, score)` tuples.

#### Run parallel model training 

In [None]:
%%time

mse_scores = run_parallel(n_models=NUM_MODELS)

Notice **6x performance gain**:

* Parallel: 10s
* Sequential: 1min (60s)


*(experiment on the M1 MacBook Pro)*

#### Analyze results

In [None]:
best = min(mse_scores, key=itemgetter(1))
print(f"Best model: mse={best[1]:.4f}, n_estimators={best[0]}")

Training has completed with a **6x performance gain** due to parallel execution.

#### Shutdown Ray runtime

In [None]:
ray.shutdown()

Disconnect the worker and terminate processes started by `ray.init()`.

### Summary of Part 2: code example

You achieved a significant performance gain by introducing parallel model training. You adapted a sequential model training computational job to run in parallel by using the Ray Core API.

With Ray, you parallelized training without having to implement the orchestration, fault tolerance or autoscaling component that requires specialized knowledge of distributed systems.

#### Key concepts

1. [**Tasks**](https://docs.ray.io/en/latest/ray-core/key-concepts.html#tasks). Remote, stateless Python functions
1. [**Actors**](https://docs.ray.io/en/latest/ray-core/key-concepts.html#actors). Remote, stateful Python classes
1. [**Objects**](https://docs.ray.io/en/latest/ray-core/key-concepts.html#objects). Tasks and actors create and compute on objects that can be stored and accessed anywhere in the cluster; cached in Ray's distributed [shared-memory](https://en.wikipedia.org/wiki/Shared_memory) object store

#### Key API elements

* **`ray.init()`**  
Start Ray runtime and connect to the Ray cluster.
* **`@ray.remote`**  
Decorator that specifies a Python function or class to be executed as a task (remote function) or actor (remote class) in a different process.
* **`.remote`**  
Postfix to the remote functions and classes; remote operations are *asynchronous*.
* **`ray.put()`**  
Put an object in the in-memory object store; returns an object reference used to pass the object to any remote function or method call.
* **`ray.get()`**  
Get a remote object(s) from the object store by specifying the object reference(s).

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/side_by_side.png" width="100%" loading="lazy">|
|:--|
|Comparison of sample workflow with minimal code changes needed to distribute tasks on Ray.|

# Connect with the Ray community

You can learn and get more involved with the Ray community of developers and researchers:

* [**Ray documentation**](https://docs.ray.io/en/latest)

* [**Official Ray site**](https://www.ray.io/)  
Browse the ecosystem and use this site as a hub to get the information that you need to get going and building with Ray.

* [**Join the community on Slack**](https://forms.gle/9TSdDYUgxYs8SA9e8)  
Find friends to discuss your new learnings in our Slack space.

* [**Use the discussion board**](https://discuss.ray.io/)  
Ask questions, follow topics, and view announcements on this community forum.

* [**Join a meetup group**](https://www.meetup.com/Bay-Area-Ray-Meetup/)  
Tune in on meet-ups to listen to compelling talks, get to know other users, and meet the team behind Ray.

* [**Open an issue**](https://github.com/ray-project/ray/issues/new/choose)  
Ray is constantly evolving to improve developer experience. Submit feature requests, bug-reports, and get help via GitHub issues.

* [**Become a Ray contributor**](https://docs.ray.io/en/latest/ray-contribute/getting-involved.html)  
We welcome community contributions to improve our documentation and Ray framework.

<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Generic/ray_logo.png" width="20%" loading="lazy">