# Ray Core:
## Remote Functions as Tasks, Remote Stateful Class Instances as Actors

Ray enables arbitrary functions to be executed asynchronously on separate Python workers. These asynchronous Ray functions are called “tasks.” You can specify task's resource requirements in terms of CPUs, GPUs, and custom resources. These resource requests are used by the cluster scheduler to distribute tasks across the cluster for parallelized execution.  

<img src="images/python_to_ray_concept_map.png" />

In [19]:
import os
import time
import logging

import numpy as np
from numpy import loadtxt
import ray

## Tasks Parallel Pattern

Ray converts decorated functions with `@ray.remote` into stateless tasks, scheduled anywhere on a Ray node's worker in the cluster. 

Where they will be executed (and by whom), you don't have to worry about its details. All that is taken care for you. Nor do 
you have to reason about it — all that burden is Ray's job. You simply take your existing Python functions and covert them into 
distributed stateless *Ray Tasks*: **as simple as that!**

### Example 1: Serial vs Parallelism

Let's look at simple tasks running serially and then in parallel. For illustration, we'll use a simple task, but this could be a compute-intensive task as part of your workload.


There are a few key differences between the original function and the decorated one:

**Invocation**: The regular version is called with `regular_function()`, whereas the remote version is called with `remote_function.remote()`. Keep this pattern in mind for all Ray remote execution methods.

**Mode of execution and return values**: `regular_function` executes synchronously and returns the result of the function as the value `1` (in our case), whereas `remote_function` immediately returns an `ObjectID` (a future) and then executes the task in the background on a remote worker process. The result of the future is obtained by calling `ray.get` on the `ObjectID`. This is a blocking function.

In [2]:
# A regular Python function.
def regular_function():
    time.sleep(1)
    return 1

In [3]:
# A Ray remote function.
@ray.remote
def remote_function():
    time.sleep(1)
    return 1

In [4]:
# Let's invoke the regular function
assert regular_function() == 1

In [5]:
# Let's invoke the remote function.
obj_ref = remote_function.remote()
obj_ref

2022-12-27 09:58:42,798	INFO worker.py:1529 -- Started a local Ray instance. View the dashboard at [1m[32m127.0.0.1:8268 [39m[22m


ObjectRef(c8ef45ccd0112571ffffffffffffffffffffffff0100000001000000)

In [6]:
assert ray.get(obj_ref) == 1

#### Serial execution in Python with no parallelism
Invocations of `regular_function` in a comprehension loop happens `serially`:

In [8]:
%%time

# These are executed one at a time, back-to-back, in a list comprehension

results = [regular_function() for _ in range(5)]
assert sum(results) == 5

CPU times: user 17.5 ms, sys: 18.5 ms, total: 36 ms
Wall time: 5.02 s


#### Parallel execution in Python with Ray

Invocations of `remote_function` in a loop happen `asynchronously` and in parallel:

In [10]:
%%time

# Executing these functions, in comprehension list, happens at the same time in the background,
# and we get the results using ray.get.

results = [remote_function.remote() for _ in range(5)]
assert sum(ray.get(results)) == 5

CPU times: user 6.17 ms, sys: 4.23 ms, total: 10.4 ms
Wall time: 1.01 s


### Example 2: Adding two np arrays

<img src="images/remote_task_api_add_array.png" width="80%"/>

Define a function as a Ray task to read an array

In [11]:
@ray.remote
def read_array(fn: str) -> np.array:
    arr = loadtxt(fn, comments="#", delimiter=",", unpack=False)
    return arr.astype("int")

Define a function as a Ray task to add two np arrays return the sum

In [12]:
@ray.remote
def add_array(arr1: np.array, arr2: np.array) -> np.array:
    return np.add(arr1, arr1)

Define a function as a Ray task to sum the contents of an np array

In [13]:
@ray.remote
def sum_array(arr1: np.array) -> int:
    return np.sum(arr1)

Now let's execute our tasks. For now we will run on a single node (this could be your laptop), with potential access to utilize all the available cores when necessary.

Ray executes immediately and returns an object reference `ObjectRef` as a future. This enables Ray to parallelize tasks and execute them asynchronously.

#### Read both arrays

Use the `func_name.remote(args)` extention to invoke a remote Ray Task

In [14]:
obj_ref_arr1 = read_array.remote("data/file_1.txt")
print(f"array 1: {obj_ref_arr1}")

array 1: ObjectRef(1e8ff6d236132784ffffffffffffffffffffffff0100000001000000)


In [15]:
obj_ref_arr2 = read_array.remote("data/file_2.txt")
print(f"array 2: {obj_ref_arr2}")

array 2: ObjectRef(85748392bcd969ccffffffffffffffffffffffff0100000001000000)


#### Add both arrays

Let's add our two arrays by calling the remote method. *Note*: We are sending Ray `ObjectRef` references as arguments. Those arguments will be resolved inline and fetched from owner's object store. That is, the cluster node that creates the `ObjectRef` owns the meta data associated and stores it in its object store. 

Ray scheduler is aware of where these object references reside or who owns them, so it will schedule this remote task on node on the worker process for data locality.

In [16]:
result_obj_ref = add_array.remote(obj_ref_arr1, obj_ref_arr2)
result_obj_ref

ObjectRef(d695f922effe6d99ffffffffffffffffffffffff0100000001000000)

#### Fetch the result

This will task if not finished will block during `.get(object_ref`)

In [17]:
result = ray.get(result_obj_ref)
print(f"Result: add arr1 + arr2: \n {result}")

Result: add arr1 + arr2: 
 [[  0  96 144 150 108 178 168 136  18  76]
 [  6  80 146 116  20  70 192  12 130  66]
 [110 134  24 194 104 146  14 152  78 100]
 [118  68  40  80 184 110  22  78 186  76]
 [178 178  74 104  96 172  98   6  38 100]
 [168  74 136  22  40  72  92 122 104 154]
 [140 180 112 110  98 152 188  56  64  46]
 [ 10  88 184  30 106 126 174 150 122  50]
 [102 116  58  60 186 188 104 144 160  54]
 [  2  56 164  70 178  72  20 168 170 130]]


Add the array elements within an `np.array` and get the sum. 
**Note** that we are sending `ObjRefs` as arguments to the function. Ray will resolve or fetch the value of these arrays. 

In [None]:
sum_1 = ray.get(sum_array.remote(obj_ref_arr1))
sum_2 = ray.get(sum_array.remote(obj_ref_arr2))

In [None]:
print(f"Sum of arr1: {sum_1}")
print(f"Sum of arr2: {sum_2}")

## Remote Stateful Class Instances as Actors

Actors extend the Ray API from functions (tasks) to classes. An actor is essentially a stateful worker (or a service). When a new actor is instantiated, a new worker is created, and methods of the actor are scheduled on that specific worker and can access and mutate the state of that worker. Like tasks, actors support CPU, GPU, and custom resource requirements.

## Remote class as a stateful actor pattern

[*Remote Classes*](https://docs.ray.io/en/latest/walkthrough.html#remote-classes-actors)  (just as remote tasks) use a `@ray.remote` decorator on a Python class. 

Ray Actor pattern is powerful. They allow you to take a Python class and instantiate it as a stateful microservice that can be queried from other actors and tasks and even other Python applications. Actors can be passed as arguments to other tasks and actors. 

<img src="images/ray_worker_actor_1.png" height="40%" width="70%">

When you instantiate a remote Actor, a separate worker process is attached to a worker process and becomes an Actor process on that worker node—all for the purpose of running methods called on the actor. Other Ray tasks and actors can invoke its methods on that process, mutating its internal state if desried. Actors can also be terminated manually if needed. The examples code below show all these cases.

<img src="images/ray_worker_actor_2.png" height="40%" width="70%">

To start, we'll define a class and use the decorator: `@ray.remote`

### Example: Method tracking 
**Problem**: We want to keep track of who invoked a particular method. This could be a use case for telemetry data we want to track: who's invoking what

Let's use this actor to track method invocation of an actor's methods. Each instance will track who invoked it and number of times.

In [31]:
CALLERS = ["A", "B", "C"]

@ray.remote
class MethodStateCounter:
    def __init__(self):
        self.invokers = {"A": 0, "B": 0, "C": 0}

    def invoke(self, name):
        # pretend to do some work here
        time.sleep(0.5)
        # update times invoked
        self.invokers[name] += 1
        # return the state of that invoker
        return self.invokers[name]

    def get_all_invoker_state(self):
        # reeturn the state of all invokers
        return self.invokers

In [32]:
# Create an instance of our Actor

worker_invoker = MethodStateCounter.remote()
worker_invoker

Actor(MethodStateCounter, b39a1ef42ec776f8f739ec5801000000)

Iterate and call the `invoke()` method by random callers and keep track of who called it.

In [33]:
import random

for _ in range(10):
    name = random.choice(CALLERS)
    worker_invoker.invoke.remote(name)
    
print(ray.get(worker_invoker.get_all_invoker_state.remote()))

{'A': 2, 'B': 3, 'C': 5}


Invoke a random caller and fetch the value or invocations of a random caller

In [34]:
for _ in range(5):
    random_name_invoker = random.choice(CALLERS)
    times_invoked = ray.get(worker_invoker.invoke.remote(random_name_invoker))
    print(f"Named caller: {random_name_invoker} called {times_invoked} times")

Named caller: B called 4 times
Named caller: B called 5 times
Named caller: A called 3 times
Named caller: C called 6 times
Named caller: A called 4 times


Note that we did not have to reason about where and how the actors are scheduled.

We did not worry about the socket connection or IP addresses where these actors reside. All that's abstracted away from us. 

All we did is write Python code, using Ray core APIs, convert our classes into distributed stateful services!