# A Guided Tour of Ray Core: Remote Objects

© 2019-2023, Anyscale. All Rights Reserved

### Overview


In Ray, tasks and actors create and compute on objects. We refer to these objects as remote objects because they can be stored anywhere in a Ray cluster, and we use object refs to refer to them. Remote objects are cached in Ray’s distributed shared-memory object store, and there is one object store per node in the cluster. In the cluster setting, a remote object can live on one or many nodes, independent of who holds the object ref(s). Collectively, these individual object store makes a shared object store across the the Ray Cluster, as shown in the diagram below.

[Remote Objects](https://docs.ray.io/en/latest/walkthrough.html#objects-in-ray)
reside in a distributed [shared-memory object store](https://en.wikipedia.org/wiki/Shared_memory).


|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Ray_Core/ray_system_architecture.png" width="70%" height="30%" loading="lazy">|
|:--|
|Ray archictecture with Ray nodes, each with its own object store. Collectively, it's a shared object store across the cluster.|


Objects are immutable and can be accessed from anywhere on the cluster, as they are stored in the cluster shared memory. An object ref is essentially a pointer or a unique ID that can be used to refer to a remote object without seeing its value. If you’re familiar with futures in Python, Java or Scala, Ray object refs are conceptually similar.


In general, small objects are stored in their owner’s **in-process store** (**<=100KB**), while large objects are stored in the **distributed object store**. This decision is meant to reduce the memory footprint and resolution time for each object. Note that in the latter case, a placeholder object is stored in the in-process store to indicate the object has been promoted to shared memory.

In the case if there is no space in the shared-memory, objects are spilled over to disk. But the main point here is that
shared-memory allows _zero-copy_ access to processes on the same worker node.

<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Ray_Core/shared_memory_plasma_store.png" height="60%" width="65%">


### Learning objectives

In this tutorial, you learn about:
 * Ray Futures as one of the patterns
 * Ray's distributed Plasma object store
 * How obejcts are stored and fetched from the distributed shared object store
     * Use `ray.get` and `ray.put` examples
 * How to use Ray tasks and object store to do inference batching at scale

### Object references as futures pattern

First, let's start Ray…

In [None]:
import logging
import numpy as np
import pandas as pd
import pyarrow.parquet as pq
from typing import Tuple
import random
import torch
import ray

In [None]:
if ray.is_initialized:
    ray.shutdown()
ray.init(logging_level=logging.ERROR)

### Example 1: Remote Objects

To start, we'll create some python objects and put them in shared memory using the [Ray Core APIs](https://docs.ray.io/en/latest/ray-core/package-ref.html)

* `ray.put()` - put an object in the in-memory object store and return its `RefObjectID`. Use this `RefObjectID` to pass object to any remote task or an Actor method call
* `ray.get()` - get the values from a remote object or a list of remote objects from the object store

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Overview_of_Ray/object_store.png" width="70%" loading="lazy">|
|:--|
|Diagram of workers in worker nodes using `ray.put()` to place values and using `ray.get()` to retrieve them from each node's object store. If the workder node's does not have the value of the ObjectRefID, it'll fetched or copied from the worker's node that created it.|


Batch inference is a common distributed application workload in machine learning. It's a process of using a trained model to generate predictions for a collection of observations. 
Primarily, it has the following elements:

**Input dataset**: This is a large collection of observations to generate predictions for. The data is usually stored in an external storage system like S3, HDFS or database, across
many files.

**ML model**: This is a trained ML model that is usually also stored in an external storage system or in a model store.

**Predictions**: These are the outputs when applying the ML model on observations. Normally, predictions are usually written back to the storage system.

For purpose of this exercise, we make the following provisions:
 * create a dummy model that returns some fake prediction
 * use real-world NYC taxi data to provide large data set for batch inference
 * return the predictions instead of writing it back to the disk

As an example of scaling pattern called **Different Data Same Function** (DDSF), also known as **Distributed Data Parallel** (DDP), our function in this diagram is the 
pretrained **model**, and the data is split and disributed as **shards**.

|<img src="https://technical-training-assets.s3.us-west-2.amazonaws.com/Ray_Core/different_data_same_function.png" width="65%" height="35%">|
|:--|
|Distributed batch inference: Different Data Same Function (DDSF).|



Define a Python closure to load our pretrained model. This model is just a fake model that predicts whether a 
tip is warranted contigent on the number of fares (2 or more) on collective rides.

**Note**: This prediction is fake. The real model will invoke model's `model.predict(input_data)`. Yet
it suffices for this example.

In [None]:
def load_trained_model():
    # A fake model that predicts whether tips were given based on number of passengers in the taxi cab.
    def model(batch: pd.DataFrame) -> pd.DataFrame:
        
        # Some model weights and payload so Ray copies the model in the 
        # shared plasma store to tasks scheduled across nodes.
        model.payload = np.arange(100, 100_000_000, dtype=float)
        model.cls = "regression"
        
        # give a tip if 2 or more passengers
        predict = batch["passenger_count"] >= 2 
        return pd.DataFrame({"score": predict})
    
    return model    

Let's define a Ray task that will handle each shard of the NYC taxt data

In [None]:
@ray.remote
def make_model_batch_predictions(model, shard_path, verbose=False):
    if verbose:
        print(f"Batch inference for shard file: {shard_path}")
    df = pq.read_table(shard_path).to_pandas()
    result = model(df)

    # Return our prediction data frame
    return result

Get the 12 files consisting of NYC data per month

In [None]:
# 12 files, one for each remote task.
input_files = [
    f"s3://anonymous@air-example-data/ursa-labs-taxi-data/downsampled_2009_full_year_data.parquet"
    f"/fe41422b01c04169af2a65a83b753e0f_{i:06d}.parquet" for i in range(12)]

### Insert model into the object store

`ray.put()` the model just once to local object store, and then pass the reference to the remote tasks.

It would be highly inefficient if you are passing the model itself like `make_model_prediction.remote(model, file)`,
which in order to pass the model to remote node will implicitly do a `ray.put(model)` for each task, potentially overwhelming
the local object store and causing out-of-memory or out-of-disk error.

Instead, we will just pass a reference, and the node where the task is scheduled deference it.

This is [Ray core API](https://docs.ray.io/en/latest/ray-core/package-ref.html) for putting objects into the Ray Plasma store. 

In [None]:
# Get the model 
model = load_trained_model()

# Put the model object into the shared object store.
model_ref = ray.put(model)
model_ref

In [None]:
# List for holding all object references returned from the model's predictions
result_refs = []

# Launch all prediction tasks. For each file create a Ray remote task to do a batch inference
for file in input_files:
    
    # Launch a prediction task by passing model reference and shard file to it.
    result_refs.append(make_model_batch_predictions.remote(model_ref, file))

Fetch the results

In [None]:
results = ray.get(result_refs)

In [None]:
# Let's check predictions and output size.
for r in results:
    print(f"Predictions dataframe size: {len(r)} | Total score for tips: {r['score'].sum()}")

In [None]:
ray.shutdown()

### Homework

1. Read references to get advanced deep dives and more about Ray objects
2. [Serialization](https://docs.ray.io/en/latest/ray-core/objects/serialization.html)
3. [Memory Management](https://docs.ray.io/en/latest/ray-core/objects/memory-management.html)
4. [Object Spilling](https://docs.ray.io/en/latest/ray-core/objects/object-spilling.html)
5. [Fault Tolerance](https://docs.ray.io/en/latest/ray-core/objects/fault-tolerance.html)

### Recap

We covered how to 
 * use Ray `tasks`, `ray.get()` and `ray.put`, 
 * understand distributed remote object store
 * how you to access objects from object store for transformation

Let's move on to the [Ray Actors lesson](ex_03_remote_classes.ipynb).

### References

 * [Ray Architecture Reference](https://docs.google.com/document/d/1tBw9A4j62ruI5omIJbMxly-la5w4q_TjyJgJL_jN2fI/preview)
 * [Ray Internals: A peek at ray,get](https://www.youtube.com/watch?v=a1kNnQu6vGw)
 * [Ray Internals: Object management with Ownership Model](https://www.youtube.com/watch?v=1oSBxTayfJc)
 * [Deep Dive into Ray scheduling Policies](https://www.youtube.com/watch?v=EJUYKXWGzfI)
 * [Redis in Ray: Past and future](https://www.anyscale.com/blog/redis-in-ray-past-and-future)
 * [StackOverFlow: How Ray Shares Data](https://stackoverflow.com/questions/58082023/how-exactly-does-ray-share-data-to-workers/71500979#71500979)
 