# A Guided Tour of Ray Core: Remote Stateful Classes

![Anyscale Academy](../images/AnyscaleAcademyLogo.png)


[*Remote Classes*](https://docs.ray.io/en/latest/walkthrough.html#remote-classes-actors)
involve using a `@ray.remote` decorator on a class. 

This implements an [*actor*](https://patterns.eecs.berkeley.edu/?page_id=258) pattern, with properties: *stateful*, *message-passing semantics*

Actors are extremely powerful. They allow you to take a Python class and instantiate it as a stateful microservice that can be queried from other actors and tasks and even other Python applications.

When you instantiate a remote Actor, a separate worker process is created as a worker process and becomes an Actor process on the workder node, for the purpose of running methods called on the actor. Other Ray tasks and actors can invoke its methods on that process, mutating its internal state. Actors can also be terminated manually if needed. The examples code below show all these cases.

<img src="../images/ray_worker_actor_1.png" height="30%" width="60%">
<img src="../images/ray_worker_actor_2.png" height="30%" width="60%">

---

First, let's start Ray…

In [1]:
import logging
import time
import ray
import random
from random import randint
import numpy as np

In [2]:
ray.init(
    ignore_reinit_error=True,
    logging_level=logging.ERROR,
)

{'node_ip_address': '127.0.0.1',
 'raylet_ip_address': '127.0.0.1',
 'redis_address': None,
 'object_store_address': 'tcp://127.0.0.1:62107',
 'raylet_socket_name': 'tcp://127.0.0.1:60000',
 'webui_url': '127.0.0.1:8265',
 'session_dir': 'C:\\Users\\chjinche\\AppData\\Local\\Temp\\ray\\session_2022-04-12_11-09-10_987504_12152',
 'metrics_export_port': 65182,
 'gcs_address': '127.0.0.1:64921',
 'address': '127.0.0.1:64921',
 'node_id': '6e24b2b24eb87667d62d9d8d6ca0b4a809195d4448bdbee2d9a4f8d1'}

# Tree of Actors Pattern

A common pattern used in Ray libraries [Ray Tune](https://docs.ray.io/en/latest/tune/index.html), [Ray Train](https://docs.ray.io/en/latest/train/train.html), and [RLlib](https://docs.ray.io/en/latest/rllib/index.html) to train models in a parallel or conduct distributed HPO.

In this common pattern, tree of actors, a collection of workers as actors, are managed by a supervisor. For example, you want to train multiple models at the same time, while being able to checkpoint/inspect its state.

<img src="https://docs.ray.io/en/latest/_images/tree-of-actors.svg" width="40%" height="20%">

Let's implement a simple example to illustrate this pattern.

In [3]:
STATES = ["RUNNING", "DONE"]

class Model:

    def __init__(self, m:str):
        self._model = m

    def train(self):
        # do some training work here
        time.sleep(60)
        print("training process for 1 min...")
        time.sleep(60)
        print("training process ended.")

# Factory function to return an instance of a model type
def model_factory(m: str):
    return Model(m)

### Create a Worker Actor

In [4]:
@ray.remote
class Worker(object):
    def __init__(self, m:str):
        # type of a model: lr, cl, or nn
        self._model = m                  
        
    def state(self) -> str:
        return random.choice(STATES)
    # Do the work for this model
    def work(self) -> None:
        model_factory(self._model).train()

### Create Supervisor Actor 

In [5]:
from datetime import datetime

@ray.remote
class Supervisor:
    def __init__(self):
        # Create three Actor Workers, each by its unique model type
        self.workers = [Worker.remote(name) for name in ["lr", "cl", "nn"]]
                        
    def work(self):
        # do the work 
        [w.work.remote() for w in self.workers]

    def add_worker(self, m:str):
        self.workers.append(Worker.remote(m))

    def delete_worker(self):
        print(f'deleting... {datetime.now()}')
        if len(self.workers) > 0:
            del self.workers[0]
        print(f'deleted. {datetime.now()}')
        
    def terminate(self):
        [ray.kill(w) for w in self.workers]
        
    def state(self):
        return ray.get([w.state.remote() for w in self.workers])

Create a Actor instance for supervisor and launch its workers

In [6]:
sup = Supervisor.remote()
# Launch remote actors as workers
# sup.work.remote()

In [7]:
# Launch remote actors as workers
sup.work.remote()

ObjectRef(16310a0f0a45af5cf3a3a25f573c0b77dbfcf2920100000001000000)

In [8]:
# add a new node when other nodes ran less than 1 min.
sup.add_worker.remote('new')
sup.work.remote()

ObjectRef(32d950ec0ccf9d2af3a3a25f573c0b77dbfcf2920100000001000000)

 pid=1324)[0m training process for 1 min...
 pid=13572)[0m training process for 1 min...
 pid=19000)[0m training process for 1 min...


In [9]:
# delete the first worker when it ran for 1 min.
sup.delete_worker.remote()

ObjectRef(e0dc174c83599034f3a3a25f573c0b77dbfcf2920100000001000000)

 pid=8484)[0m deleting... 2022-04-12 11:13:35.508059
 pid=8484)[0m deleted. 2022-04-12 11:13:35.508059
 pid=10020)[0m training process for 1 min...
 pid=1324)[0m training process ended.
 pid=13572)[0m training process ended.
 pid=19000)[0m training process ended.
 pid=10020)[0m training process ended.
 pid=1324)[0m training process for 1 min...
 pid=19000)[0m training process for 1 min...
 pid=13572)[0m training process for 1 min...
 pid=1324)[0m training process ended.
 pid=13572)[0m training process ended.
 pid=19000)[0m training process ended.


In [10]:
# delete the second worker when it finished training, ran for more than 2 mins.
sup.delete_worker.remote()

ObjectRef(f4402ec78d3a2607f3a3a25f573c0b77dbfcf2920100000001000000)

 pid=8484)[0m deleting... 2022-04-12 11:17:15.098386
 pid=8484)[0m deleted. 2022-04-12 11:17:15.098386


In [None]:
# terminate worker
sup.terminate.remote()

In [None]:
# terminate supervisor
ray.kill(sup)

In [None]:
ray.shutdown()

### Look at the Ray Dashboard
http://127.0.0.1:8265