In [1]:
"""
A simple test of Ray
This example uses placement_group API to spread work around
"""

import os
import platform
import ray
import time
ray.init(ignore_reinit_error=True)

@ray.remote
class Actor():
    def __init__(self) -> None:
        self.pid = os.getpid()
        self.hostname = platform.node()
        self.ip = ray._private.services.get_node_ip_address()

    def ping(self):
        
        print(f"{self.pid} {self.hostname} {self.ip} - ping")
        time.sleep(1)
        return f"{self.pid} {self.hostname} {self.ip} - pong"

@ray.remote
def main():
   
    # Get list of nodes to use
    print(f"Found {len(actors)} Worker nodes in the Ray Cluster:")

    # Setup one Actor per node
    print(f"Setting up {len(actors)} Actors...")
    actor = []
    for a in actors:
        node_ip_str = f"node:{ray._private.services.get_node_ip_address()}"
        actor.append(Actor.remote())
        time.sleep(1)

    # Ping-Pong test
    for _ in range(2):
        messages = [a.ping.remote() for a in actors]
        for msg in ray.get(messages):
            time.sleep(1)
            print(f"Received back message {msg}")
            time.sleep(1)

            
actors = [Actor.remote() for _ in range(2)]
print(actors)

if __name__ == "__main__":
    main.remote()

2021-06-29 21:30:22,285	INFO services.py:1317 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8272[39m[22m


[Actor(Actor,8935859c430befc29f4d5eea01000000), Actor(Actor,5b638c802999d4c482bc9c6501000000)]
[2m[36m(pid=12776)[0m Found 2 Worker nodes in the Ray Cluster:
[2m[36m(pid=12776)[0m Setting up 2 Actors...
[2m[36m(pid=12775)[0m 12775 Anthonys-MacBook-Air.local 192.168.0.139 - ping
[2m[36m(pid=12778)[0m 12778 Anthonys-MacBook-Air.local 192.168.0.139 - ping


2021-06-29 21:30:35,920	ERROR worker.py:79 -- Unhandled error (suppress with RAY_IGNORE_UNHANDLED_ERRORS=1): [36mray::main()[39m (pid=12776, ip=192.168.0.139)
  File "python/ray/_raylet.pyx", line 535, in ray._raylet.execute_task
  File "/Users/smanthony/opt/anaconda3/lib/python3.7/site-packages/ray/util/tracing/tracing_helper.py", line 330, in _function_with_tracing
    return function(*args, **kwargs)
  File "<ipython-input-1-1a7d26a0607a>", line 42, in main
  File "/Users/smanthony/opt/anaconda3/lib/python3.7/site-packages/ray/_private/client_mode_hook.py", line 62, in wrapper
    return func(*args, **kwargs)
TypeError: wait() expected a list of ray.ObjectRef, got list containing <class 'list'>


In [None]:
# The original code used the resources option in Ray 
# to allocate one Actor per node. It makes sense that you can't cr
# eate multiple actors in the same way in a notebook because you onl
# y have one available node. You could try removing the resources option i
# n the 'actor.append(Actor.options(resources={f"{node_ip_str}": 1}).remote())' line 
# and see if that solves it, i.e. just change it to 'Actor.remote()'.