<img src="logo-ray.png" width="80px">


# Introduction to Ray Core

# Lifecycle of a task 

We start out detailing the full lifecylce of a **ray task** from when it is **created** and submitted till when it is **completed** and the **resulting objects are returned** to the user. 

## 10,000 feet view

We have a python function convenitenly named `expensive_computation` which executes an expensive computation. To keep it simple all it does is perform a naive matrix multiplication and returns the number of elements in the resulting matrix. 


It gets called in sequence a number of times (`n_runs`) to be specific

In [1]:
%%writefile utils.py
from itertools import product

def perform_naive_matrix_multiplication(n):
    matrix1 = matrix2 = [[1 for _ in range(n)] for _ in range(n)]

    result = [[0 for _ in range(n)] for _ in range(n)]
    for i, j, k in product(range(n), range(n), range(n)):
        result[i][j] += matrix1[i][k] * matrix2[k][j]

    return result

Overwriting utils.py


In [2]:
from utils import perform_naive_matrix_multiplication

n_runs = 10
n = 300

def expensive_computation(n):
    result = perform_naive_matrix_multiplication(n)
    n_rows, n_cols =  len(result), len(result[0])
    num_elements_in_matrix = n_rows * n_cols
    return num_elements_in_matrix

results = [expensive_computation(n) for _ in range(n_runs)]
assert sum(results) == n_runs * n * n

Below is the execution visualized

<img src="sequential_simple_.jpeg" height=300>

We want to:
- Run the same function but in a distributed fashion - i.e. in parallel on a cluster of machines

We do this by following these steps:
- Convert the `expensive_computation` function to a ray task by decorating it with `ray.remote`
- Submit a task for execution by calling `future = expensive_computation.remote()`
- Use the returned `future` object reference to fetch the result of the function by calling `ray.get(future)` 

In [3]:
import ray


@ray.remote  # decorator to convert python function to ray task
def expensive_computation(n):
    result = perform_naive_matrix_multiplication(n)
    n_rows, n_cols =  len(result), len(result[0])
    num_elements_in_matrix = n_rows * n_cols
    return num_elements_in_matrix

# submit n_run ray tasks to a ray cluster
# and keep a reference to the task futures
futures = [expensive_computation.remote(n) for _ in range(n_runs)]

# wait for all tasks to complete and get the resulting objects
# results are returned in the same order as submitted
results = ray.get(futures)

# confirm that we got the right result
assert sum(results) == n_runs * n * n

2023-11-21 13:06:39,937	INFO worker.py:1458 -- Connecting to existing Ray cluster at address: 127.0.0.1:6379...
2023-11-21 13:06:39,946	INFO worker.py:1633 -- Connected to Ray cluster. View the dashboard at [1m[32m127.0.0.1:8265 [39m[22m
[2m[33m(raylet)[0m [2023-11-21 13:06:40,224 I 17097 12251738] logging.cc:230: Set ray log level from environment variable RAY_BACKEND_LOG_LEVEL to -1


Here is what is happening under the hood:

<img src="parallel_simple.jpeg" height="300">

## 1000 feet view

Let's detail the parallel execution of the function a bit more.

More specifically:
- **ray tasks** are executed on a **ray cluster** as part of a **ray job**
- **ray workers** are the processes that execute the tasks
- **futures** in ray are called `ObjectRef`s short for **object references**
- results are stored as **objects** in an "**object store**"
- `ray.get()` is used to wait and fetch the **object value** given the **object reference** from the "**object store**"

Here is a more detailed view of the parallel execution


<img src="parallel_1000_feet.png" height="300">

Let's use the ray state client to verify the above.

We re-declare the `expensive_computation` but give it a unique name so we can easily track its state and a longer sleep time so we can see the state evolve more clearly

In [4]:
from uuid import uuid4
import ray

task_sleep_time = 20


@ray.remote
def my_task():
    import time

    time.sleep(task_sleep_time)
    return 1


id_ = str(uuid4())[:8]
name = f"expensive_computation_{id_}"
ray_task = my_task.options(name=name)

We submit the task and inspect the future object reference - we see that it is a ray.ObjectRef with a given id

In [5]:
future_object_ref = ray_task.remote()
future_object_ref

ObjectRef(35260dd7c82a1dceffffffffffffffffffffffff0a00000001000000)

We now request the cluster state to see our task running

In [6]:
from ray.util.state import get_task
import time

start_time = time.time()

while time.time() - start_time < (task_sleep_time + 10):
    time.sleep(5)
    task = get_task(id=future_object_ref.task_id().hex())
    print(
        f"task {task.name} is in state={task.state} running on worker {task.worker_id[:8]} as part of Job ID {task.job_id}"
    )

task expensive_computation_ce2fa6ca is in state=RUNNING running on worker 2550baa4 as part of Job ID 0a000000
task expensive_computation_ce2fa6ca is in state=RUNNING running on worker 2550baa4 as part of Job ID 0a000000
task expensive_computation_ce2fa6ca is in state=RUNNING running on worker 2550baa4 as part of Job ID 0a000000
task expensive_computation_ce2fa6ca is in state=RUNNING running on worker 2550baa4 as part of Job ID 0a000000
task expensive_computation_ce2fa6ca is in state=FINISHED running on worker 2550baa4 as part of Job ID 0a000000
task expensive_computation_ce2fa6ca is in state=FINISHED running on worker 2550baa4 as part of Job ID 0a000000


We use `ray.get` to fetch the resulting object value now that the task is completed

In [7]:
object_value = ray.get(future_object_ref)
object_value

1

## 100 feet view

Let's further detail the lifecycle of a ray task.

More specifically here is what a cluster looks like:


<img src="ray_cluster.png" height="500">

Things to keep in mind:

- The **head node** is a special node that runs the **global control service**, **cluster level services** and usually the **driver**
  - The **global control service** keeps track of the **cluster state** that is not supposed to change often
  - Cluster level services are services that are shared across the cluster suc as autoscaling, job submission, etc. 
  - The **driver** can submit tasks but does not execute them 
- Each **worker process** will keep track of all the **tasks** it owns/submits in its **ownership table**
- Small **objects** (< 100KB) are stored in the **in-process object store** of a **worker**
- Large **objects** are stored in the **plasma object store** which is **shared across worker processes** on the same node
  - The **plasma object store** by default is in-memory and takes up **30% of the memory of the node**
  - If the **plasma object store** is full, objects are **spilled to disk**
  - The **plasma object store** is also referred to as the **shared memory object store**

With the cluster architecture in mind, let's look at the lifecycle of a task in more detail.

#### Submitting a task
<img src="submit_task.png">

#### Data locality in ray

- The owner will select the **raylet** where **most of the objects the task depends on** are located
  - This can be a **raylet** running on a **different node**!

#### Scheduling a Task

<img src="scheduling_task.png">

## Scheduling policies deep-dive

How does a raylet's scheduler choose a worker node to lease work from?

### Classifying nodes as feasible/infeasible and available/unavailable

<img src="resource_state_definition.png" height="500">

Note that every 100ms, the **GCS pulls resource availability** from each **raylet** and then aggregates and **rebroadcasts them back to each raylet**.

### Scheduling Policies

#### Default Hybrid policy


This is the default policy used by ray. It is a hybrid policy that combines the following two modes:
- Bin packing mode
- Load balancing mode
  

The diagram below shows the two modes in action when scheduling two tasks Task1 and Task2

<img src="default_hybrid_policy_.png">

**Note** you can set the following environment variables to configure the default hybrid policy:

- `RAY_scheduler_spread_threshold` - default is 0.5 or 50% utilization of the node
- `RAY_scheduler_top_k_fraction` - default is 0.2 or 20% of the nodes
  - You can also set `RAY_scheduler_top_k_absolute` to set an absolute number of nodes to use
  - Note that it is the max of `RAY_scheduler_top_k_fraction` and `RAY_scheduler_top_k_absolute` that is used

In [8]:
import ray

@ray.remote(scheduling_strategy="DEFAULT") # this is the default so we don't need to specify it
def default_schedule_func():
    return 2

ray.get(default_schedule_func.remote())

2

#### Node Affinity Policy 

Assigns tasks to a given node in either a strict or soft manner.

<img src="node_affinity_policy.png" width="700px">

In [9]:
import ray
from ray.util.scheduling_strategies import NodeAffinitySchedulingStrategy


@ray.remote(
    scheduling_strategy=NodeAffinitySchedulingStrategy(
        node_id=ray.get_runtime_context().get_node_id(),
        soft=False,
    )
)
def node_affinity_schedule():
    return 2


ray.get(node_affinity_schedule.remote())

2

#### SPREAD Policy 

As the name suggests, the SPREAD policy spreads the tasks across the nodes.

Note that it spreads across all the available nodes first and then the feasible nodes.

<img src="spread_scheduling_policy.png" width="500px">

In [10]:
import ray

@ray.remote(scheduling_strategy="SPREAD")
def spread_default_func():
    return 2

ray.get(spread_default_func.remote())

2

### Placement Group Policy

In cases when we want to treat a set of resources as a single unit, we can use placement groups.


<img src="placement_group_policy.png" width="300px">

**Things to keep in mind**:

- A **placement group** is formed from a set of **resource bundles**
  - A **resource bundle** is a list of resource requirements that fit in a single node
- A **placement group** can specify a **placement strategy** that determines how the **resource bundles** are placed
  - The **placement strategy** can be one of the following:
    - **PACK**: pack the **resource bundles** into as few nodes as possible
    - **SPREAD**: spread the **resource bundles** across as many nodes as possible
    - **STRICT_PACK**: pack the **resource bundles** into as few nodes as possible and fail if not possible
    - **STRICT_SPREAD**: spread the **resource bundles** across as many nodes as possible and fail if not possible
- **Placement Groups** are **atomic** 
  -  i.e. either all the **resource bundles** are placed or none are placed
  -  GCS uses a two-phase commit protocol to ensure atomicity



In [11]:
import ray
from ray.util.scheduling_strategies import PlacementGroupSchedulingStrategy
# Import placement group related functions
from ray.util.placement_group import (
    placement_group,
    placement_group_table,
    remove_placement_group,
)

# Reserve a placement group of 1 bundle that reserves 0.1 CPU
pg = placement_group([{"CPU": 0.1}], strategy="PACK", name="my_pg")

# Wait until placement group is created.
ray.get(pg.ready(), timeout=10)

# look at placement group states using the table
print(placement_group_table(pg))


@ray.remote(
    scheduling_strategy=PlacementGroupSchedulingStrategy(
        placement_group=pg,
    ),
    # task requirement needs to be less than placement group capacity
    num_cpus=0.1,
)
def placement_group_schedule():
    return 2


out = ray.get(placement_group_schedule.remote())
print(out)

# Remove placement group.
remove_placement_group(pg)

{'placement_group_id': 'b54ea3ae05ba27635687307740520a000000', 'name': 'my_pg', 'bundles': {0: {'CPU': 0.1}}, 'bundles_to_node_id': {0: '437a139387ca944e4b08ec1c3bb45382e2dc2d6274c0243a8121d760'}, 'strategy': 'PACK', 'state': 'CREATED', 'stats': {'end_to_end_creation_latency_ms': 5.396, 'scheduling_latency_ms': 4.704, 'scheduling_attempt': 1, 'highest_retry_delay_ms': 0.0, 'scheduling_state': 'FINISHED'}}
2


#### Fetching task results

<img src="fetch_result.png">

Note: If the owner is fetching the result from a different node than the one where the task was executed, the result is first copied to the local object store of the owner node and then returned to the owner.

### Object management and dependency resolution

Let's drill down on how a task's dependencies are resolved - using the following example of simple batch inference:

- we load a model
- we use the model to make predictions on an input

In [12]:
import ray
import numpy as np


def load_model(size_mb):
    weights = np.ones((1024, 1024, size_mb), dtype=np.uint8)
    assert weights.nbytes / 1024**2 == size_mb
    return weights


@ray.remote
def predict(model, input):
    return model * input

We start with this simple implementation

In [13]:
# load 1 GB model in memory
model = load_model(1_000) 

# submit 3 tasks to the cluster
futures = ray.get([predict.remote(model, i) for i in range(3)])

There are 3 `predict` tasks that will be submitted.

- The owner of each task will need to go over all the task arguments and:
    - check that all the arguments are available
    - store a reference to all the available arguments in the plasma/shared object store or inprocess object store
- In the case of our 1 GB "model", the owner will make use of the shared object store given it exceeds the 100KB limit of the inprocess object store
- Each owner will create a copy of the model and produce an object reference to use as the argument for the task
- Each owner process will now execute their task

The outcome is that we have made 3 copies of the model in the shared object store.

Instead to save on memory, we should use the `ray.put` API to store the model in the shared object store and pass the reference to the model as an argument to the task.

Here is the optimized implementation:


In [14]:
# put the model in the object store and get a reference to it
model_ref = ray.put(model)

# submit 3 tasks to the cluster using the same model reference
futures = ray.get([predict.remote(model_ref, i) for i in range(3)])

## 10 feet view of ray

### Inspecting debug logs

Given the below code, we can inspect the debug logs to see what is happening under the hood

In [15]:
import ray
import numpy as np


def load_model(size_mb):
    weights = np.ones((1024, 1024, size_mb), dtype=np.uint8)
    assert weights.nbytes / 1024**2 == size_mb
    return weights


@ray.remote
def predict(model, input):
    return model * input


model = load_model(size_mb=1000)
obj_ref = predict.remote(model, 1)
result = ray.get(obj_ref)  # c8ef45ccd0112571ffffffffffffffffffffffff0100000001000000

Below are the debug logs, color-categorized and annotated

<img src="debug_logs_annotated_cropped.png" height=500>

### Fault Tolerance of Ray Tasks and Objects

- If a task raises an application-level exception, the task will fail and the exception will be propagated to the caller.
- If instead a system-level failures, i.e the worker process executing the task crashes then:
    - Ray will rerun the task until either the task succeeds or the maximum number of retries is exceeded. 
        - The default number of retries is 3 and can be overridden by specifying max_retries in the @ray.remote decorator.

In [16]:
# application-level failure flakiness but with infinite retries
import sys
import ray
import pickle

with open("x.pkl", "wb") as f:
    pickle.dump({"x": 0}, f)    

@ray.remote(max_retries=-1) # infinite retries
def flaky_app_task():
    with open("x.pkl", "rb") as f:
        data = pickle.load(f)
    x = data["x"]
    if x % 2 == 0:
        x += 1
        with open("x.pkl", "wb") as f:
            data = pickle.dump({"x": x}, f)
        raise ValueError("x is even - that's odd!")
    return 1

try:
    out = ray.get(flaky_app_task.remote())
except ray.exceptions.RayTaskError:
    print("application-level exceptions shortcircuit retries")

application-level exceptions shortcircuit retries


In [17]:
# system-level failure flakiness but with infinite retries
import sys
import ray
import pickle

with open("y.pkl", "wb") as f:
    pickle.dump({"y": 0}, f)    

@ray.remote(max_retries=-1)
def flaky_sys_task():
    with open("y.pkl", "rb") as f:
        data = pickle.load(f)
    y = data["y"]
    if y % 2 == 0:
        y += 1
        with open("y.pkl", "wb") as f:
            data = pickle.dump({"y": y}, f)
        raise sys.exit(1)
    return 1

# never raises an error given retries eventually succeed
out = ray.get(flaky_sys_task.remote())
print("returned", out, "after retrying worker failure")

[2m[36m(flaky_sys_task pid=17097)[0m Worker exits with an exit code 1.
[2m[36m(flaky_sys_task pid=17097)[0m Traceback (most recent call last):
[2m[36m(flaky_sys_task pid=17097)[0m   File "python/ray/_raylet.pyx", line 1999, in ray._raylet.task_execution_handler
[2m[36m(flaky_sys_task pid=17097)[0m   File "python/ray/_raylet.pyx", line 1894, in ray._raylet.execute_task_with_cancellation_handler
[2m[36m(flaky_sys_task pid=17097)[0m   File "python/ray/_raylet.pyx", line 1558, in ray._raylet.execute_task
[2m[36m(flaky_sys_task pid=17097)[0m   File "python/ray/_raylet.pyx", line 1559, in ray._raylet.execute_task
[2m[36m(flaky_sys_task pid=17097)[0m   File "python/ray/_raylet.pyx", line 1610, in ray._raylet.execute_task
[2m[36m(flaky_sys_task pid=17097)[0m   File "python/ray/_raylet.pyx", line 1616, in ray._raylet.execute_task
[2m[36m(flaky_sys_task pid=17097)[0m   File "/var/folders/5s/b_0j_yts17zc8wdkwf3nxlmr0000gn/T/ipykernel_15786/2138810642.py", line 18, in fl

returned 1 after retrying worker failure


The below diagram shows the fault tolerance of ray objects - taken from https://www.usenix.org/system/files/nsdi21-wang.pdf

<img src="object_fault_tolerance.png">

- When an object value is lost from the object store, such as during node failures
- Ray will use lineage reconstruction to recover the object.
- Ray will first automatically attempt to recover the value by looking for copies of the same object on other nodes.
  - If none are found, then Ray will automatically recover the value by re-executing the task that previously created the value. 
    - Arguments to the task are recursively reconstructed through the same mechanism.



# Lifecycle of an Actor

An actor is a stateful object that can be used to encapsulate state and methods that operate on that state.


### Why use an actor ?

- We can't naively share a global variable across tasks
  - Global variables are not shared across worker processes - i.e. across tasks

In [18]:
import ray

global_var = 3

@ray.remote
def increment_global_var():
    global global_var
    global_var += 1
    return global_var

@ray.remote
def decrement_global_var():
    global global_var
    global_var -= 1
    return global_var

step1 = ray.get(increment_global_var.remote())
step2 = ray.get(decrement_global_var.remote())

# we expect 4, 3 but we get 4, 2
# given the two tasks have separate copies of the global variable
print(step1, step2)

4 2


- Storing state in a database is slow
  - Actors are in-memory
  - Actors are distributed across nodes

In [19]:
import time
import ray
import json
from pathlib import Path

def read_from_db(key):
    # only for demo purposes: mimic reading from a database
    time.sleep(1)
    return json.loads(Path("table.json").read_text())[key]


def write_to_db(key, val):
    data = {key: val}
    # only for demo purposes: mimic reading from a database
    time.sleep(1)
    Path("table.json").write_text(json.dumps(data))


@ray.remote
def increment_global_var():
    global_var = read_from_db("global_var")
    global_var += 1
    write_to_db("global_var", global_var)
    return global_var


@ray.remote
def decrement_global_var():
    global_var = read_from_db("global_var")
    global_var -= 1
    write_to_db("global_var", global_var)
    return global_var


write_to_db("global_var", 3)
step1 = ray.get(increment_global_var.remote())
step2 = ray.get(decrement_global_var.remote())
print(step1, step2)

[2m[33m(raylet)[0m [2023-11-21 13:07:54,354 I 17280 12252812] logging.cc:230: Set ray log level from environment variable RAY_BACKEND_LOG_LEVEL to -1


4 3


## 10,000 feet view

Let's take an example of a simple counter actor. We create an actor handle by calling `Counter.remote()`. 

In [20]:
import ray


@ray.remote
class MyCounter:
    def __init__(self) -> None:
        self.counter = 0

    def increment(self):
        time.sleep(3)
        self.counter += 1

    def get_counter(self):
        return self.counter


my_counter_handle = MyCounter.remote()

We can then call methods on the actor handle to increment the counter and get the current value of the counter. The methods will be executed sequentially against the actor process.

In [21]:
# this will take 3 seconds * 2 = 6 seconds at least
ray.get([my_counter_handle.increment.remote() for _ in range(2)])

[2m[33m(raylet)[0m [2023-11-21 13:07:59,750 I 17292 12252921] logging.cc:230: Set ray log level from environment variable RAY_BACKEND_LOG_LEVEL to -1


[None, None]

In [22]:
ray.get(my_counter_handle.get_counter.remote())

2

Here is a diagram showing the lifecycle of our actor (note that our actor is referred to as a "synchronous" actor)


<img src="actor_simple_.jpeg" height="300">

- A special "create actor" task is executed on the cluster to create the actor process
- The actor process can be thought of as a special worker process
- The actor tasks are executed sequentially on the actor process using a FIFO queue

## 1,000 feet view of ray actors

In this section we will detail the lifecycle of an actor in more detail.

- Actors are always owned by the GCS (global control service), unlike tasks which are owned by the worker process that submitted them
- The GCS maintains an actor table that keeps track of all the actors in the cluster
- Actors hold the resources they need to execute their tasks until they are killed
- Actors can be launched in a detached mode, in which case they do not fate share with a ray driver/job - instead they need to be killed manually

See the below diagram for more details


<img src="actor_centralized.jpeg" height="300">


### Asynchronous Actors

Our actors can be asynchronous - this is especially useful for actors whose methods are IO bound and whose state can be easily shared and locked if needed

In [23]:
import ray
from asyncio import sleep


@ray.remote
class MyAsyncService:
    def __init__(self) -> None:
        self.fixed_state = 1

    async def run(self):
        await sleep(15)
        return self.fixed_state


my_async_actor_handle = MyAsyncService.remote()

Given the service run is mostly IO bound (sleeping), we can run it asynchronously using an asynchronous actor implementation

In [24]:
%%time

ray.get([my_async_actor_handle.run.remote() for _ in range(2)])

[2m[33m(raylet)[0m [2023-11-21 13:08:06,106 I 17309 12253081] logging.cc:230: Set ray log level from environment variable RAY_BACKEND_LOG_LEVEL to -1


CPU times: user 26.8 ms, sys: 23.7 ms, total: 50.6 ms
Wall time: 15.3 s


[1, 1]

Here is a diagram visualizing task execution against an asynchroneous actor.

<img src="actor_async.jpeg" height="300">

## Fault tolerance of Ray Actors

- Ray can automatically restart actors that crash unexpectedly. 
  - This behavior is controlled using `max_restarts`, which sets the maximum number of times that an actor will be restarted.   
- When an actor is restarted, its state will be recreated by rerunning its constructor. After the specified number of restarts, subsequent actor methods will raise a RayActorError.
- Onus is on the user to manually implement ray actor checkpointing

In [25]:
import ray
import os
import json
import tempfile
import sys
import shutil


@ray.remote(max_restarts=-1, max_task_retries=-1)
class ImmortalActor:
    def __init__(self, checkpoint_file):
        self.checkpoint_file = checkpoint_file

        if os.path.exists(self.checkpoint_file):
            # Restore from a checkpoint
            with open(self.checkpoint_file, "r") as f:
                self.state = json.load(f)
        else:
            self.state = {}

    def update(self, key, value):
        import random

        if random.randrange(10) < 5:
            sys.exit(1)

        self.state[key] = value

        # Checkpoint the latest state
        with open(self.checkpoint_file, "w") as f:
            json.dump(self.state, f)

    def get(self, key):
        return self.state[key]


checkpoint_dir = tempfile.mkdtemp()
actor = ImmortalActor.remote(os.path.join(checkpoint_dir, "checkpoint.json"))
ray.get(actor.update.remote("1", 1))
ray.get(actor.update.remote("2", 2))
assert ray.get(actor.get.remote("1")) == 1
shutil.rmtree(checkpoint_dir)

[2m[33m(raylet)[0m [2023-11-21 13:08:21,435 I 18583 12257661] logging.cc:230: Set ray log level from environment variable RAY_BACKEND_LOG_LEVEL to -1
[2m[36m(ImmortalActor pid=18583)[0m Worker exits with an exit code 1.
[2m[36m(ImmortalActor pid=18583)[0m Traceback (most recent call last):
[2m[36m(ImmortalActor pid=18583)[0m   File "python/ray/_raylet.pyx", line 1999, in ray._raylet.task_execution_handler
[2m[36m(ImmortalActor pid=18583)[0m   File "python/ray/_raylet.pyx", line 1894, in ray._raylet.execute_task_with_cancellation_handler
[2m[36m(ImmortalActor pid=18583)[0m   File "python/ray/_raylet.pyx", line 1558, in ray._raylet.execute_task
[2m[36m(ImmortalActor pid=18583)[0m   File "python/ray/_raylet.pyx", line 1559, in ray._raylet.execute_task
[2m[36m(ImmortalActor pid=18583)[0m   File "python/ray/_raylet.pyx", line 1610, in ray._raylet.execute_task
[2m[36m(ImmortalActor pid=18583)[0m   File "python/ray/_raylet.pyx", line 1616, in ray._raylet.execute_tas