# Ray Core
## [Getting Started](https://docs.ray.io/en/latest/ray-core/walkthrough.html#)

In [1]:
import ray
print(f'Ray version {ray.__version__}')
ray.init()


Ray version 2.0.0


2022-09-12 14:59:13,746	INFO worker.py:1518 -- Started a local Ray instance.


0,1
Python version:,3.10.5
Ray version:,2.0.0


**Running a Task:**
- Decorate your function with `@ray.remote` to declare that you want to run this function remotely.
- Call that function with `.remote()`. This remote call yields a future, a so-called Ray object reference, that you can then fetch with `ray.get`

In [3]:
# Define the square task.
@ray.remote
def square(x):
    return x * x


# Launch four parallel square tasks.
futures = [square.remote(i) for i in range(4)]

# Retrieve results.
print(ray.get(futures))


[0, 1, 4, 9]


**Calling an actor:**
- When you instantiate a class that is a Ray actor, Ray will start a remote instance of that class in the cluster.
- Actor can then execute remote method calls and maintain its own internal state

In [4]:
# Define the Counter actor.
@ray.remote
class Counter:
    def __init__(self):
        self.i = 0

    def get(self):
        return self.i

    def incr(self, value):
        self.i += value


# Create a Counter actor.
c = Counter.remote()

# Submit calls to the actor. These calls run asynchronously but in
# submission order on the remote actor process.
for _ in range(10):
    c.incr.remote(1)

# Retrieve final actor state.
print(ray.get(c.get.remote()))


10


**Passing an Object:**
- Ray stores task and actor call results in its distributed object store, returning object references that can be later retrieved.
- Object references can also be created explicitly via `ray.put`, and object references can be passed to tasks as substitutes for argument values

In [5]:
import numpy as np


@ray.remote
def sum_matrix(matrix):
    # Define a task that sums the values in a matrix.
    return np.sum(matrix)


# Call the task with a literal argument value.
print(ray.get(sum_matrix.remote(np.ones((100, 100)))))
# -> 10000.0

# Put a large array into the object store.
matrix_ref = ray.put(np.ones((1000, 1000)))

# Call the task with the object reference as an argument.
print(ray.get(sum_matrix.remote(matrix_ref)))


[2m[36m(square pid=155530)[0m E0912 14:26:05.956039441  156011 chttp2_transport.cc:1103]   Received a GOAWAY with error code ENHANCE_YOUR_CALM and debug data equal to "too_many_pings"


10000.0
1000000.0


## [Tasks](https://docs.ray.io/en/latest/ray-core/tasks.html)

Ray enables arbitrary functions to be executed asynchronously on separate Python workers. These asynchronous Ray functions are called “remote functions”

In [12]:
import time


def normal_function():
    # A regular Python function.
    return 1


# By adding the `@ray.remote` decorator, a regular Python function
# becomes a Ray remote function.
@ray.remote
def my_function():
    print("my_function")
    return 1


# To invoke this remote function, use the `remote` method.
# This will immediately return an object ref (a future) and then create
# a task that will be executed on a worker process.
obj_ref = my_function.remote()

# The result can be retrieved with ``ray.get``.
assert ray.get(obj_ref) == 1


@ray.remote
def slow_function():
    time.sleep(10)
    print("slow_function")
    return 1


# Invocations of Ray remote functions happen in parallel.
# All computation is performed in the background, driven by Ray's internal event loop.
for _ in range(4):
    # This doesn't block.
    slow_function.remote()


[2m[36m(my_function pid=155516)[0m my_function
[2m[36m(slow_function pid=155530)[0m slow_function
[2m[36m(slow_function pid=155520)[0m slow_function
[2m[36m(slow_function pid=155521)[0m slow_function
[2m[36m(slow_function pid=155529)[0m slow_function
[2m[36m(slow_function pid=155527)[0m slow_function
[2m[36m(slow_function pid=155517)[0m slow_function
[2m[36m(slow_function pid=155516)[0m slow_function
[2m[36m(slow_function pid=155515)[0m slow_function


Object refs can also be passed into remote functions. When the function actually gets executed, the argument will be passed as the underlying Python value

- The second task will not be executed until the first task has finished executing because the second task depends on the output of the first task.
- If the two tasks are scheduled on different machines, the output of the first task (the value corresponding to obj_ref1/objRef1) will be sent over the network to the machine where the second task is scheduled.


In [13]:
@ray.remote
def function_with_an_argument(value):
    return value + 1


obj_ref1 = my_function.remote()
assert ray.get(obj_ref1) == 1

# You can pass an object ref as an argument to another Ray remote function.
obj_ref2 = function_with_an_argument.remote(obj_ref1)
assert ray.get(obj_ref2) == 2


[2m[36m(my_function pid=155517)[0m my_function


After launching a number of tasks, you can find out which ones have finished executing without blocking on all of them with `ray.wait`

        `ready_refs, remaining_refs = ray.wait(object_refs, num_returns=1, timeout=None)`


Python remote functions can return multiple object refs
- For tasks that return multiple objects, Ray also supports remote generators that allow a task to return one object at a time to reduce memory usage at the worker.


In [14]:
@ray.remote(num_returns=3)
def return_multiple():
    return 0, 1, 2


a, b, c = return_multiple.remote()


In [16]:
@ray.remote(num_returns=3)
def return_multiple_as_generator():
    for i in range(3):
        yield i


# NOTE: Similar to normal functions, these objects will not be available
# until the full task is complete and all returns have been generated.
a, b, c = return_multiple_as_generator.remote()


Remote functions can be canceled by calling `ray.cancel` 

In [17]:
from ray.exceptions import TaskCancelledError


@ray.remote
def blocking_operation():
    time.sleep(10e6)


obj_ref = blocking_operation.remote()
ray.cancel(obj_ref)


try:
    ray.get(obj_ref)
except TaskCancelledError:
    print("Object reference was cancelled.")


Object reference was cancelled.


## [Actors](https://docs.ray.io/en/latest/ray-core/actors.html)

When a new actor is instantiated, a new worker is created, and methods of the actor are scheduled on that specific worker and can access and mutate the state of that worker.


The `ray.remote` decorator indicates that instances of the Counter class will be actors. Each actor runs in its own Python process

In [24]:
@ray.remote
class Counter(object):
    def __init__(self):
        self.value = 0

    def increment(self):
        self.value += 1
        return self.value

    def get_counter(self):
        return self.value


# Create an actor from this class.
counter = Counter.remote()


You can specify resource requirements in actors

In [19]:
# Specify required resources for an actor.
@ray.remote(num_cpus=2, num_gpus=0.5)
class Actor(object):
    pass


We can interact with the actor by calling its methods with the remote operator.

- We can then call get on the object ref to retrieve the actual value.
- Methods called on different actors can execute in parallel
- Methods called on the same actor are executed serially in the order that they are called.
- Methods on the same actor will share state with one another, as shown below.


In [25]:
obj_ref = counter.increment.remote()
assert ray.get(obj_ref) == 1
obj_ref_2 = counter.increment.remote()
print(ray.get(obj_ref_2))

2


In [26]:
# Create ten Counter actors.
counters = [Counter.remote() for _ in range(10)]

# Increment each Counter once and get the results. These tasks all happen in
# parallel.
results = ray.get([c.increment.remote() for c in counters])
print(results)  # prints [1, 1, 1, 1, 1, 1, 1, 1, 1, 1]

# Increment the first Counter five times. These tasks are executed serially
# and share state.
results = ray.get([counters[0].increment.remote() for _ in range(5)])
print(results)  # prints [2, 3, 4, 5, 6]


[1, 1, 1, 1, 1, 1, 1, 1, 1, 1]
[2, 3, 4, 5, 6]


Actor handles can be passed into other tasks. We can define remote functions (or actor methods) that use actor handles.

In [28]:
import time


@ray.remote
def f(counter):
    for _ in range(1000):
        time.sleep(0.1)
        counter.increment.remote()


counter = Counter.remote()

# Start some tasks that use the actor.
[f.remote(counter) for _ in range(3)]

# Print the counter value.
for _ in range(10):
    time.sleep(1)
    print(ray.get(counter.get_counter.remote()))


27
57
87
117
147
177
207
237
267
297


## [Objects](https://docs.ray.io/en/latest/ray-core/objects.html)

- Tasks and actors create and compute on remote objects that can be stored anywhere in a Ray cluster.
- Referred to with object refs
    - An object ref is essentially a pointer or a unique ID that can be used to refer to a remote object without seeing its value
    - Returned by remote function calls or `put`
- Remote objects are cached in Ray’s distributed shared-memory object store (one object store per node in the cluster)

In [2]:
# Put an object in Ray's object store.
y = 1
object_ref = ray.put(y)

You can use the `get` method to fetch the result of a remote object from an object ref.

In [5]:
# Get the value of one object ref.
from ray.exceptions import GetTimeoutError
import time

obj_ref = ray.put(1)
assert ray.get(obj_ref) == 1

# Get the values of multiple object refs in parallel.
assert ray.get([ray.put(i) for i in range(3)]) == [0, 1, 2]

# You can also set a timeout to return early from a ``get``
# that's blocking for too long.
# ``GetTimeoutError`` is a subclass of ``TimeoutError``.


@ray.remote
def long_running_function():
    time.sleep(8)


obj_ref = long_running_function.remote()
try:
    ray.get(obj_ref, timeout=4)
except GetTimeoutError:  # You can capture the standard "TimeoutError" instead
    print("`get` timed out.")


`get` timed out.


Ray object references can be freely passed around a Ray application:
- Can be passed as arguments to tasks, actor methods, and even stored in other objects
- Tracked via distributed reference counting
- Data is automatically freed once all references to the object are deleted

Depending on the way an object is passed, Ray will decide whether to de-reference the object prior to task execution:

- Passing an object as a top-level argmuent: When an object is passed directly as a top-level argument to a task, Ray will de-reference the object
- Passing an object as a nested argument: When an object is passed within a nested object, for example, within a Python list, Ray will not de-reference it.

        # Examples of passing objects to actor constructors.
        actor_handle = Actor.remote(obj)  # by-value
        actor_handle = Actor.remote([obj])  # by-reference

        # Examples of passing objects to actor method calls.
        actor_handle.method.remote(obj)  # by-value
        actor_handle.method.remote([obj])  # by-reference


In [6]:
@ray.remote
def echo(a: int, b: int, c: int):
    """This function prints its input values to stdout."""
    print(a, b, c)


# Passing the literal values (1, 2, 3) to `echo`.
echo.remote(1, 2, 3)
# -> prints "1 2 3"

# Put the values (1, 2, 3) into Ray's object store.
a, b, c = ray.put(1), ray.put(2), ray.put(3)

# Passing an object as a top-level argument to `echo`. Ray will de-reference top-level
# arguments, so `echo` will see the literal values (1, 2, 3) in this case as well.
echo.remote(a, b, c)


ObjectRef(e0dc174c83599034ffffffffffffffffffffffff0100000001000000)

[2m[36m(echo pid=174563)[0m 1 2 3
[2m[36m(echo pid=174564)[0m 1 2 3


In [7]:
@ray.remote
def echo_and_get(x_list):  # List[ObjectRef]
    """This function prints its input values to stdout."""
    print("args:", x_list)
    print("values:", ray.get(x_list))


# Put the values (1, 2, 3) into Ray's object store.
a, b, c = ray.put(1), ray.put(2), ray.put(3)

# Passing an object as a nested argument to `echo_and_get`. Ray does not
# de-reference nested args, so `echo_and_get` sees the references.
echo_and_get.remote([a, b, c])

ObjectRef(f4402ec78d3a2607ffffffffffffffffffffffff0100000001000000)

[2m[36m(echo_and_get pid=174564)[0m args: [ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000011000000), ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000012000000), ObjectRef(00ffffffffffffffffffffffffffffffffffffff0100000013000000)]
[2m[36m(echo_and_get pid=174564)[0m values: [1, 2, 3]
