# Playing with Ray
## Step 1: Installing Ray Software
... and its dependencies

In [None]:
!pip install ray[all]
!pip install -U ipywidgets

## Step 2: Using Ray Python API (Ray Core)

In [None]:
import ray

## Step 3: Create or connect to a Ray Cluster
Ray.init() is not mandatory, it will be called without argument if not already called, the first time we will use the Ray API.

Without argument, Ray.init() *creates* a new Ray Cluster on the local host.

But if may be used to connect to an existing Ray cluster or modify the default parameters like we do here below:

In [None]:
def ray_init(local_cluster = True):
    if local_cluster:
        ray.init(dashboard_host="0.0.0.0", dashboard_port=8265)
    else:
        ray.init(address="ray://node-2.ezmeral.edrusb.org:10015")

In [None]:
ray_init(True)

### Let's check the cluster available resources:

In [None]:
print(ray.cluster_resources())

In [None]:
!pip install jq

In [None]:
import jq

print("Total CPU = {}".format(jq.compile(".CPU").input_value(ray.cluster_resources()).first()))
total_ram = jq.compile(".memory").input_value(ray.cluster_resources()).first()
print("Total RAM = {:.1f} GiB".format(total_ram/(1024**3)))

### What GPUs do we have available?

In [None]:
ray.get_gpu_ids()

## Step 4: Playing with Ray tasks (aka remote function)
Let's create a function that we will "decorate" for it runs on the Ray Cluster:

In [None]:
import time

In [None]:
@ray.remote(num_cpus=0.25)
def my_function(name, x):
    print("[{}] going to sleep {} seconds".format(name, x))
    time.sleep(x)
    print("[{}] exiting from sleep after {} seconds".format(name, x))
    return x # returned value

To invoke the function we have to use the remote() method added by the "@ray.remote" decoration and give it in argument the parameters to be passed to this function:

In [None]:
obj_ref = my_function.remote("coucou", 10)

This call returns right after even before the execution ends (it should take more than 10 seconds)

We can get the result (which leads the current thread to synchronize with this function, waiting for it completes if not already done). For that we call the get() method on the Ray object which reference was returned by the previous call:

In [None]:
print(ray.get(obj_ref))

Le's see more clearly that calling a function returns much earlier than its execution ends and that we can thus run several "tasks" in parallel: From a loop loop we will fire this function ten times:

In [None]:
tret=[]
num_tasks = 30

# let's measure the execution time of what follows:
start = time.time()

for x in range(0,num_tasks):
    tret.append(my_function.remote(x, 10))
    
if len(tret) != num_tasks:
    raise Exception("not the expected number of task objects")

step1 = time.time()
print("launching {} tasks took {} seconds".format(num_tasks, step1 - start))

At this time, none of the ten instances has completed, they all run in parallel (assuming there is enough ressources to do so on the cluster, else some a pending for resource to be available)


In [None]:
print("tasks are now launched, starting gather their result")
    
for x in range(0,num_tasks):
    print(ray.get(tret[x]))

end = time.time()
print("\nexecution of {} tasks took {} seconds\n".format(num_tasks, end - start))

## Step 5: Ray Actors

First creating a helper function to view object status keeping code simple and readable:

In [None]:
def see_counter_value(tag, value):
    print("{} counter value is {}".format(tag, value))
    
see_counter_value("example1", 199)

Let's first see class usage without ray, we wrapp this in a function to be able to repeat the same with ray decoration

In [None]:
def without_ray():
    '''
    wrapping all python class demo in that function to be able to repeat the 
    same code with ray right after
    '''
    class Counter:
        def __init__(self):
            self.value = 0

        def increment(self): 
            self.value += 1
            return self.value

        def get_counter(self):
            return self.value 
        
    object1 = Counter()
    see_counter_value(object1.get_counter(), "before increment")
    object1.increment()
    see_counter_value(object1.get_counter(), "after increment")

without_ray()

Now see **with ray**: 

very little change is required to create a actor template from this class definition:

In [None]:
@ray.remote
class Counter:
    def __init__(self):
        self.value = 0

    def increment(self): 
        self.value += 1
        return self.value

    def get_counter(self):
        return self.value 

In [None]:
object1 = Counter.remote()

In [None]:
for x in range(0,100):
  see_counter_value(ray.get(object1.get_counter.remote()), "before increment")
  object1.increment.remote()
  see_counter_value(ray.get(object1.get_counter.remote()), "after increment")

In [None]:
del object1

## Step 6: Playing with Ray Objects

We saw previously that a remote function returns a reference to a Ray object, but we can make Ray objects from anything like here a python list

Also note that the reference returned by remote function see above and here below by ray objects are created globally,
this is not a good thing as, as long as they live, this will consume ressource on the Ray cluster. Better use anonymous/temporary reference
or named reference in a inner level like the body of a function or of a class method...

Anyway, for the sake of clarity we continue doing what should not be done in real program.

In [None]:
my_table = ["citron", "fraise", "orange"]

remote_ref = ray.put(my_table)

print(ray.get(remote_ref)[1])

In [None]:
my_table.append("tomate")

print(my_table)

we modified the python object not the ray object....

the ray object is immutable, it cannot change and it thus easy to distribute (copy) where needed on the cluster (data locality):

In [None]:
ray.get(remote_ref).append("tomate")

print(ray.get(remote_ref))

# Annex: Experimentations

In [None]:
@ray.remote(num_cpus=0.5)
def heater(inter):
    ret = 1
    for x in range(0, inter):
        if x % 2 == 0:
            ret += x*x
        else:
            ret -= x*x
    return ret

start = time.time()
ret_obj = heater.remote(100000000)
ray.get(ret_obj)
end = time.time()
print("\nexecution of {} tasks took {} seconds\n".format(1, end - start))

In [None]:
num_threads = 10
tret = []

for x in range(0,num_threads):
  tret.append(heater.remote(100000000+x))
for x in range(0,num_threads):
  ray.get(tret[x])

In [None]:
for x in range(0,len(tret)):
    ray.cancel(tret[x])

## Last Step: Shutting down the Ray Cluster
shutdown diconnects us from the cluster, but as **ray.init()** also created the cluster, this call will also tear down the cluster that it created.

In [None]:
ray.shutdown()

In [None]:
try:
    print(ray.cluster_resources())
except Exception as err:
    print("exception caught: {}".format(err))

ray.init() has a lot of option, inline help is available as well as more documentation at
http://ray.io

In [None]:
help(ray.init)

# Modin = pandas on Ray

In [None]:
! pip install modin

In [None]:
import modin.pandas as pd
import ray

In [None]:
ray_init(True)

In [None]:
df = pd.read_parquet("/home/jovyan/Data/table-adresses-reu.parquet")

In [None]:
df.size

In [None]:
df.columns

In [None]:
df.shape

In [None]:
df.memory_usage

In [None]:
df.tail(3)

In [None]:
df.at[1000, 'id_brut_bv_reu']

In [None]:
df.loc[3]

In [None]:
df.plot(x='longitude' , y='latitude' , kind='scatter')

In [None]:
del df
ray.shutdown()

# Ray Serve

## Restarting the Ray Cluster

In [None]:
ray_init(True)

## Creating the serving engine (listening on port TCP/8000)

In [None]:
from ray import serve

serve.start(http_options = { 'host':'0.0.0.0', 'port':8000 })

## Model 1

In [None]:
!pip install py-spy

In [None]:
import requests
from starlette.requests import Request
from typing import Dict

In [None]:
# 1: Define a Ray Serve application.
@serve.deployment
class MyModelDeployment:
    def __init__(self, msg_format: str):
        # Initialize model state: could be very large neural net weights.
        self._format = msg_format

    async def __call__(self, request: Request) -> Dict:
        first_name: str = await request.json()
        return { "result": self._format.format(first_name) }

In [None]:
app = MyModelDeployment.bind("Salut {} !")

In [None]:
# 2: Deploy the application locally.
app_name="salutations"

### Deploying the application and requesting for predictions

In [None]:
serve.run(target=app, name=app_name, route_prefix='/'+app_name)

### Inference

In [None]:
first_name = input("Quel est votre prénom: ")

In [None]:
response = requests.post("http://localhost:8000/"+app_name, json=first_name)

In [None]:
response.text

### Undeploying the application

In [None]:
serve.delete(app_name)

In [None]:
del app

## Model 2

In [None]:
!pip install torch
!pip install transformers

In [None]:
from ray import serve
import requests
from starlette.requests import Request
from transformers import pipeline

In [None]:
@serve.deployment(num_replicas=1, ray_actor_options={"num_cpus": 0.25, "num_gpus": 0})
class Translator:
    def __init__(self):
        # Load model
        self.model = pipeline("translation_en_to_fr", model="t5-small")

    def translate(self, text: str) -> str:
        # Run inference
        model_output = self.model(text)

        # Post-process output to return only the translation text
        translation = model_output[0]["translation_text"]

        return translation
        
    async def __call__(self, http_request: Request) -> str:
        english_text: str = await http_request.json()
        return self.translate(english_text)

In [None]:
translator_app = Translator.bind()

In [None]:
app2_name="translator"

### Deploying the application

In [None]:
serve.run(target=translator_app, name=app2_name, route_prefix='/'+app2_name)

In [None]:
ray.serve.status()

### Inference

In [None]:
english_text = "Success is the ability to go from one failure to another with no loss of enthusiasm."

In [None]:
response = requests.post("http://127.0.0.1:8000/"+app2_name, json=english_text)
french_text = response.text

In [None]:
print(french_text)

### Undeploying application

In [None]:
serve.delete(app2_name)

In [None]:
del translator_app

## Stopping the serving engine

In [None]:
ray.serve.shutdown()

## Stopping the Ray Cluster

In [None]:
ray.shutdown()