# A Guided Tour of Ray Core: Remote Objects

© 2019-2022, Anyscale. All Rights Reserved

📖 [Back to Table of Contents](./ex_00_tutorial_overview.ipynb)<br>
➡ [Next notebook](./ex_03_remote_classes.ipynb) <br>
⬅️ [Previous notebook](./ex_01_remote_funcs.ipynb) <br>

### Overview


In Ray, tasks and actors create and compute on objects. We refer to these objects as remote objects because they can be stored anywhere in a Ray cluster, and we use object refs to refer to them. Remote objects are cached in Ray’s distributed shared-memory object store, and there is one object store per node in the cluster. In the cluster setting, a remote object can live on one or many nodes, independent of who holds the object ref(s). Collectively, these individual object store makes a shared object store across the the Ray Cluster, as shown in the diagram below.

[Remote Objects](https://docs.ray.io/en/latest/walkthrough.html#objects-in-ray)
reside in a distributed [shared-memory object store](https://en.wikipedia.org/wiki/Shared_memory).

|<img src="images/ray_arch.png" width="70%" height="30%" loading="lazy">|
|:--|
|Ray archictecture with Ray nodes, each with its own object store. Collectively, it's a shared object store across the cluster.|

Objects are immutable and can be accessed from anywhere on the cluster, as they are stored in the cluster shared memory. An object ref is essentially a pointer or a unique ID that can be used to refer to a remote object without seeing its value. If you’re familiar with futures in Python, Java or Scala, Ray object refs are conceptually similar.


In general, small objects are stored in their owner’s **in-process store** (**<=100KB**), while large objects are stored in the **distributed object store**. This decision is meant to reduce the memory footprint and resolution time for each object. Note that in the latter case, a placeholder object is stored in the in-process store to indicate the object has been promoted to shared memory.

In the case if there is no space in the shared-memory, objects are spilled over to disk. But the main point here is that
shared-memory allows _zero-copy_ access to processes on the same worker node.

<img src="images/shared_memory_plasma_store.png"  height="40%" width="65%">

### Learning objectives

In this tutorial, you learn about:
 * Ray Futures as one of the patterns
 * Ray's distributed Plasma object store
 * How obejcts are stored and fetched from the distributed shared object store
     * Use `ray.get` and `ray.put` examples
 * How to use Ray tasks and object store to do inference batching at scale

### Object references as futures pattern

First, let's start Ray…

In [6]:
import logging
import numpy as np
import pandas as pd
import pyarrow.parquet as pq
from typing import Tuple
import random
import torch
import ray

In [7]:
if ray.is_initialized:
    ray.shutdown()
ray.init(logging_level=logging.ERROR)

0,1
Python version:,3.8.13
Ray version:,2.3.0
Dashboard:,http://127.0.0.1:8265


### Example 1: Remote Objects

To start, we'll create some python objects and put them in shared memory using the [Ray Core APIs](https://docs.ray.io/en/latest/ray-core/package-ref.html)

* `ray.put()` - put an object in the in-memory object store and return its `RefObjectID`. Use this `RefObjectID` to pass object to any remote task or an Actor method call
* `ray.get()` - get the values from a remote object or a list of remote objects from the object store


|<img src="images/object_store.png" width="70%" loading="lazy">|
|:--|
|Diagram of workers in worker nodes using `ray.put()` to place values and using `ray.get()` to retrieve them from each node's object store. If the workder node's does not have the value of the ObjectRefID, it'll fetched or copied from the worker's node that created it.|


Create a function to return an random tensor shape. We use this
tensor to store in our object store and retrieve it later for processing.

In [8]:
def create_rand_tensor(size: Tuple[int, int]) -> torch.tensor:
    return torch.randn(size=(size), dtype=torch.float)

In [9]:
@ray.remote
def transform_rand_tensor(tensor: torch.tensor) -> torch.tensor:
    return torch.mul(tensor, random.randint(2, 10))

#### Create random tensors and store them in object store
1. create a random tensor
2. put it in the object store
3. the final list returned from the comprehension is list of `ObjectRefIDs`

In [10]:
torch.manual_seed(42)
#
# Create a 100 random tensors of shape (X, 50)
#
tensor_list_obj_ref = [ray.put(create_rand_tensor(((i+1)*25, 50))) for i in range(0, 100)]
tensor_list_obj_ref[:2], len(tensor_list_obj_ref)

([ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000001000000),
  ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000002000000)],
 100)

You can view the object store in the [Ray Dashboard](https://docs.ray.io/en/latest/ray-core/ray-dashboard.html)

#### Fetch the random tensors from the object store

Retrieve the value of this object reference. 

Small objects are resolved by copying them directly from the _owner’s_ **in-process store**. For example, if the owner calls `ray.get`, the system looks up and deserializes the value from the local **in-process store**. For larger objects greater than 100KB, they will be stored in the distributed object store.

In [11]:
# Since we got back a list of ObjectRefIDs, index into the first value of the tensor from 
# the list of ObectRefIDs
val = ray.get(tensor_list_obj_ref[0])
val.size(), val

(torch.Size([25, 50]),
 tensor([[ 1.9269,  1.4873,  0.9007,  ..., -0.4879, -0.9138, -0.6581],
         [ 0.0780,  0.5258, -0.4880,  ...,  0.4880,  0.7846,  0.0286],
         [ 0.6408,  0.5832,  1.0669,  ...,  1.4506,  0.2695, -0.2104],
         ...,
         [ 0.3258, -1.4584,  1.8989,  ...,  0.1473, -1.3136, -0.6061],
         [ 0.6450, -0.2477, -1.4078,  ...,  1.7561,  0.2113,  1.4860],
         [ 0.5585,  0.3491,  0.8484,  ...,  2.2683, -0.0661, -1.0740]]))

Alternatively, you can fetch all the values of multiple object references.

In [12]:
results = ray.get(tensor_list_obj_ref)
results[:1], results[:1][0].size()

([tensor([[ 1.9269,  1.4873,  0.9007,  ..., -0.4879, -0.9138, -0.6581],
          [ 0.0780,  0.5258, -0.4880,  ...,  0.4880,  0.7846,  0.0286],
          [ 0.6408,  0.5832,  1.0669,  ...,  1.4506,  0.2695, -0.2104],
          ...,
          [ 0.3258, -1.4584,  1.8989,  ...,  0.1473, -1.3136, -0.6061],
          [ 0.6450, -0.2477, -1.4078,  ...,  1.7561,  0.2113,  1.4860],
          [ 0.5585,  0.3491,  0.8484,  ...,  2.2683, -0.0661, -1.0740]])],
 torch.Size([25, 50]))

#### Transform tensors stored in the object store

Let's transform our tensors stored in the object store, store the transformed tensors in the object store (the ray remote task will implicity store it as a returned value), and then fetch the values.

Transform each tensor in the object store with a remote task

In [22]:
transformed_object_list = [transform_rand_tensor.remote(t_obj_ref) for t_obj_ref in tensor_list_obj_ref]
transformed_object_list[:2]

[ObjectRef(8b2e7fa63270c5f3ffffffffffffffffffffffff0100000001000000),
 ObjectRef(10ef70752e9c9a2bffffffffffffffffffffffff0100000001000000)]

In [23]:
transformed_tensor_values = ray.get(transformed_object_list)
transformed_tensor_values[:2]

[tensor([[ 3.8538,  2.9746,  1.8014,  ..., -0.9758, -1.8276, -1.3163],
         [ 0.1560,  1.0516, -0.9760,  ...,  0.9760,  1.5692,  0.0573],
         [ 1.2815,  1.1665,  2.1339,  ...,  2.9012,  0.5390, -0.4208],
         ...,
         [ 0.6516, -2.9169,  3.7978,  ...,  0.2946, -2.6272, -1.2122],
         [ 1.2900, -0.4954, -2.8156,  ...,  3.5123,  0.4226,  2.9720],
         [ 1.1170,  0.6983,  1.6967,  ...,  4.5367, -0.1322, -2.1481]]),
 tensor([[  4.4440,   7.1669,   6.7023,  ...,   7.9999, -16.3133,  -5.4662],
         [  5.7750,   8.5447,  -0.4408,  ...,   9.7114,  -5.7739,   2.4815],
         [ -1.9652,  -1.7028,  15.7128,  ...,   3.7381,  -6.2063,   5.5746],
         ...,
         [ -3.7279,  -0.2907,  -2.9323,  ...,   3.8987,   4.6501,  -1.3954],
         [ -6.8392,  -4.0922, -10.8494,  ...,  -1.6914,  14.8104,   1.8503],
         [ -7.8164,   7.9808,  -8.0498,  ...,  10.8340,   4.4420, -10.3995]])]

### Recap
Ray's object store is a shared memory store spanning a Ray cluster. Workers on each Ray node have their own object store, and they can use simple Ray APIs,`ray.put()` and `ray.get()`, to insert values and fetch values of Ray objects created by Ray tasks or Actor methods. Collectively, these individual object stores per node comprise a shared and distributed object store.  

In the above exercise, we created random tensors, inserted them into our object store, transformed them, by iterating over each `ObjectRefID`, sending this `ObjectRefID` to a Ray task, and then fetching the transformed tensor returned by each Ray remote task. 

### Exercise
See if you can write your own transformation to modify the tensor. Consider any of the following
tensor transformations:
 1. [torch.transpose](https://pytorch.org/docs/stable/generated/torch.transpose.html)
 2. [torch.dot](https://pytorch.org/docs/stable/generated/torch.dot.html)
 3. [torch.reshape](https://pytorch.org/docs/stable/generated/torch.reshape.html)

#### Solution

In [19]:
@ray.remote
def transform_rand_tensor_2(tensor: torch.tensor) -> torch.tensor:
    return torch.transpose(tensor, 0, 1)

### Passing Objects by Reference

Ray object references can be freely passed around a Ray application. This means that they can be passed as arguments to tasks, actor methods, and even stored in other objects. Objects are tracked via distributed reference counting, and their data is automatically freed once all references to the object are deleted.

In [10]:
# Define a Task
@ray.remote
def echo(x):
    print(f"current value of argument x: {x}")

In [11]:
# Define some variables
x = list(range(10))
obj_ref_x = ray.put(x)
y = 25

### Pass-by-value

Send the object to a task as a top-level argument.
The object will be *de-referenced* automatically, so the task only sees its value.

In [12]:
# send y as value argument
echo.remote(y)

ObjectRef(298e3e66d66deed9ffffffffffffffffffffffff0100000001000000)

In [13]:
# send a an object reference
# note that the echo function deferences it
echo.remote(obj_ref_x)

ObjectRef(664a780010703836ffffffffffffffffffffffff0100000001000000)

### Pass-by-reference

When a parameter is passed inside a Python list or as any other data structure,
the *object ref is preserved*, meaning it's not *de-referenced*. The object data is not transferred to the worker when it is passed by reference, until `ray.get()` is called on the reference.

You can pass by reference in two ways:
 1. as a dictionary `.remote({"obj": obj_ref_x})`
 2. as list of objRefs `.remote([obj_ref_x])`

In [14]:
x = list(range(20))
obj_ref_x = ray.put(x)
# Echo will not automaticall de-reference it
echo.remote({"obj": obj_ref_x})

ObjectRef(5b39a414803e3f8effffffffffffffffffffffff0100000001000000)

In [15]:
echo.remote([obj_ref_x])

[2m[36m(echo pid=21111)[0m current value of argument x: 25


ObjectRef(66736a23c9cfb453ffffffffffffffffffffffff0100000001000000)

[2m[36m(echo pid=21111)[0m current value of argument x: [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
[2m[36m(echo pid=21111)[0m current value of argument x: {'obj': ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000066000000)}
[2m[36m(echo pid=21111)[0m current value of argument x: [ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000066000000)]


## What about long running tasks?

Sometimes, you may have tasks that are long running, past their expected times due to some problem, maybe blocked on accessing a variable in the object store. How do you exit or terminate it? Use a timeout!

Now let's set a timeout to return early from an attempted access of a remote object that is blocking for too long...

In [16]:
import time

@ray.remote
def long_running_function ():
    time.sleep(10)
    return 42

You can control how long you want to wait for the task to finish

In [17]:
%%time

from ray.exceptions import GetTimeoutError

obj_ref = long_running_function.remote()

try:
    ray.get(obj_ref, timeout=6)
except GetTimeoutError:
    print("`get` timed out")

`get` timed out
CPU times: user 36.3 ms, sys: 21.7 ms, total: 58 ms
Wall time: 6.03 s


### Example 2:  How to use Tasks and object store for distributed batch inference 

Batch inference is a common distributed application workload in machine learning. It's a process of using a trained model to generate predictions for a collection of observations. 
Primarily, it has the following elements:

**Input dataset**: This is a large collection of observations to generate predictions for. The data is usually stored in an external storage system like S3, HDFS or database, across
many files.

**ML model**: This is a trained ML model that is usually also stored in an external storage system or in a model store.

**Predictions**: These are the outputs when applying the ML model on observations. Normally, predictions are usually written back to the storage system.

For purpose of this exercise, we make the following provisions:
 * create a dummy model that returns some fake prediction
 * use real-world NYC taxi data to provide large data set for batch inference
 * return the predictions instead of writing it back to the disk

As an example of scaling pattern called **Different Data Same Function** (DDSF), also known as **Distributed Data Parallel** (DDP), our function in this diagram is the 
pretrained **model**, and the data is split and disributed as **shards**.

|<img src="images/batch-inference.png" width="65%" height="35%">|
|:--|
|Distributed batch inference: Different Data Same Function (DDSF.|



Define a Python closure to load our pretrained model. This model is just a fake model that predicts whether a 
tip is warranted contigent on the number of fares (2 or more) on collective rides.

**Note**: This prediction is fake. The real model will invoke model's `model.predict(input_data)`. Yet
it suffices for this example.

In [18]:
def load_trained_model():
    # A fake model that predicts whether tips were given based on number of passengers in the taxi cab.
    def model(batch: pd.DataFrame) -> pd.DataFrame:
        
        # Some model weights and payload so Ray copies the model in the 
        # shared plasma store to tasks scheduled across nodes.
        model.payload = np.arange(100, 100_000_000, dtype=float)
        model.cls = "regression"
        
        # give a tip if 2 or more passengers
        predict = batch["passenger_count"] >= 2 
        return pd.DataFrame({"score": predict})
    
    return model    

Let's define a Ray task that will handle each shard of the NYC taxt data

In [19]:
@ray.remote
def make_model_batch_predictions(model, shard_path, verbose=False):
    if verbose:
        print(f"Batch inference for shard file: {shard_path}")
    df = pq.read_table(shard_path).to_pandas()
    result = model(df)

    # Return our prediction data frame
    return result

Get the 12 files consisting of NYC data per month

In [20]:
# 12 files, one for each remote task.
input_files = [
    f"s3://anonymous@air-example-data/ursa-labs-taxi-data/downsampled_2009_full_year_data.parquet"
    f"/fe41422b01c04169af2a65a83b753e0f_{i:06d}.parquet" for i in range(12)]

### Insert model into the object store

`ray.put()` the model just once to local object store, and then pass the reference to the remote tasks.

It would be highly inefficient if you are passing the model itself like `make_model_prediction.remote(model, file)`,
which in order to pass the model to remote node will implicitly do a `ray.put(model)` for each task, potentially overwhelming
the local object store and causing out-of-memory or out-of-disk error.

Instead, we will just pass a reference, and the node where the task is scheduled deference it.

This is [Ray core API](https://docs.ray.io/en/latest/ray-core/package-ref.html) for putting objects into the Ray Plasma store. 

In [21]:
# Get the model 
model = load_trained_model()

# Put the model object into the shared object store.
model_ref = ray.put(model)
model_ref

ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000067000000)

In [22]:
# List for holding all object references returned from the model's predictions
result_refs = []

# Launch all prediction tasks. For each file create a Ray remote task to do a batch inference
for file in input_files:
    
    # Launch a prediction task by passing model reference and shard file to it.
    result_refs.append(make_model_batch_predictions.remote(model_ref, file))

Fetch the results

In [23]:
results = ray.get(result_refs)

In [24]:
# Let's check predictions and output size.
for r in results:
    print(f"Predictions dataframe size: {len(r)} | Total score for tips: {r['score'].sum()}")

Predictions dataframe size: 141062 | Total score for tips: 46360
Predictions dataframe size: 133932 | Total score for tips: 42175
Predictions dataframe size: 144014 | Total score for tips: 45175
Predictions dataframe size: 143087 | Total score for tips: 45510
Predictions dataframe size: 148108 | Total score for tips: 47713
Predictions dataframe size: 141981 | Total score for tips: 45188
Predictions dataframe size: 136394 | Total score for tips: 43234
Predictions dataframe size: 136999 | Total score for tips: 45142
Predictions dataframe size: 139985 | Total score for tips: 44138
Predictions dataframe size: 156198 | Total score for tips: 49909
Predictions dataframe size: 142893 | Total score for tips: 46112
Predictions dataframe size: 145976 | Total score for tips: 48036


In [25]:
ray.shutdown()

### Homework

1. Read references to get advanced deep dives and more about Ray objects
2. [Serialization](https://docs.ray.io/en/latest/ray-core/objects/serialization.html)
3. [Memory Management](https://docs.ray.io/en/latest/ray-core/objects/memory-management.html)
4. [Object Spilling](https://docs.ray.io/en/latest/ray-core/objects/object-spilling.html)
5. [Fault Tolerance](https://docs.ray.io/en/latest/ray-core/objects/fault-tolerance.html)

### Recap

We covered how to 
 * use Ray `tasks`, `ray.get()` and `ray.put`, 
 * understand distributed remote object store
 * how you to access objects from object store for transformation

Let's move on to the [Ray Actors lesson](ex_03_remote_classes.ipynb).

### References

 * [Ray Architecture Reference](https://docs.google.com/document/d/1tBw9A4j62ruI5omIJbMxly-la5w4q_TjyJgJL_jN2fI/preview)
 * [Ray Internals: A peek at ray,get](https://www.youtube.com/watch?v=a1kNnQu6vGw)
 * [Ray Internals: Object management with Ownership Model](https://www.youtube.com/watch?v=1oSBxTayfJc)
 * [Deep Dive into Ray scheduling Policies](https://www.youtube.com/watch?v=EJUYKXWGzfI)
 * [Redis in Ray: Past and future](https://www.anyscale.com/blog/redis-in-ray-past-and-future)
 * [StackOverFlow: How Ray Shares Data](https://stackoverflow.com/questions/58082023/how-exactly-does-ray-share-data-to-workers/71500979#71500979)
 

📖 [Back to Table of Contents](./ex_00_tutorial_overview.ipynb)<br>
➡ [Next notebook](./ex_03_remote_classes.ipynb) <br>
⬅️ [Previous notebook](./ex_01_remote_funcs.ipynb) <br>