In [None]:
import ray
import time
import os
import cmlapi
import json

# Set the setup variables needed by CML APIv2
HOST = os.getenv("CDSW_API_URL").split(":")[0] + "://" + os.getenv("CDSW_DOMAIN")
USERNAME = os.getenv("CDSW_PROJECT_URL").split("/")[6]  # args.username  # "vdibia"
API_KEY = os.getenv("CDSW_APIV2_KEY")
PROJECT_NAME = os.getenv("CDSW_PROJECT")
PROJECT_ID=os.getenv("CDSW_PROJECT_ID")

cml = cmlapi.default_client(url=HOST,cml_api_key=API_KEY)

def set_environ(Cml,Item,Value):
  Project=Cml.get_project(os.getenv("CDSW_PROJECT_ID"))
  if Project.environment=='':
    Project_Environment={}
  else:
    Project_Environment=json.loads(Project.environment)
  Project_Environment[Item]=Value
  Project.environment=json.dumps(Project_Environment)
  Cml.update_project(Project,project_id=os.getenv("CDSW_PROJECT_ID"))

def get_environ(Cml,Item):
  Project=Cml.get_project(os.getenv("CDSW_PROJECT_ID"))
  Project_Environment=json.loads(Project.environment)
  return Project_Environment[Item]

In [None]:
runtime_env = {
    #"env_vars": {"__MODIN_AUTOIMPORT_PANDAS__": "1"}
}


ray.init(address=get_environ(cml,"RAY_ADDRESS"),runtime_env=runtime_env)

In [None]:
@ray.remote
class MessageActor(object):
    def __init__(self):
        self.messages = []
    
    def add_message(self, message):
        self.messages.append(message)
    
    def get_and_clear_messages(self):
        messages = self.messages
        self.messages = []
        return messages


# Define a remote function which loops around and pushes
# messages to the actor.
@ray.remote
def worker(message_actor, j):
    for i in range(100):
        time.sleep(1)
        message_actor.add_message.remote(
            "Message {} from worker {}.".format(i, j))


# Create a message actor.
message_actor = MessageActor.remote()

# Start 3 tasks that push messages to the actor.
[worker.remote(message_actor, j) for j in range(3)]

# Periodically get the messages and print them.
for _ in range(100):
    new_messages = ray.get(message_actor.get_and_clear_messages.remote())
    print("New messages:", new_messages)
    time.sleep(1)

In [None]:
def fibonacci_local(sequence_size):
    fibonacci = []
    for i in range(0, sequence_size):
        if i < 2:
            fibonacci.append(i)
            continue
        fibonacci.append(fibonacci[i-1]+fibonacci[i-2])
    return sequence_size

# Ray task
@ray.remote
def fibonacci_distributed(sequence_size):
    fibonacci = []
    for i in range(0, sequence_size):
        if i < 2:
            fibonacci.append(i)
            continue
        fibonacci.append(fibonacci[i-1]+fibonacci[i-2])
    return sequence_size

In [None]:
%%time
fibonacci_local(100000)

In [None]:
%%time
fibonacci_distributed.remote(100000)