## Ray Distributed Objects and a Comparison to Parsl and FuncX.
this is the actor demo from the blog post 

In [1]:
import ray
import json

Several utility functions that handle the topic and subtopic recognition for the titles from ArXiv.
For example,

'Spectral Measures on Locally Fields [math.FA]'

comes from the top level topic math and the subtopic FA  which among the ArXiv categories we call Analysis.

In [2]:
def read_config(subj, basepath):
    docpath =basepath+ "/config_"+subj+".json"
    with open(docpath, 'rb') as f:
        doc = f.read() 
    z =json.loads(doc)
    subject = z['subject']
    loadset = z['loadset']
    subtopics = []
    for w in z['supertopics']:
        subtopics.extend([(w[0], w[1])])
    return subject, loadset, subtopics

In [11]:
def make_dict(subtopics):
    dic = {}
    for main in subtopics:
        sl = main[1]
        for x in sl:
            dic[x] = main[0]
    return dic


Split_titles is the most important library function.   it is the one that given a title, and a general topic it returns the correct subtopic. 

In [15]:
def split_titles(topic, disp_title):
    subject,loadset, subtopics = read_config(topic,"../scdoc-new")
    dic = make_dict(subtopics)
    toplist = [subtopics[i][0] for i in range(len(subtopics))]
    lis = []
    for ti in disp_title:
        l = ti.find('[')
        if(l >= 0):
            e = ti[l+1:]
            l2 = e.find(']')
            e = e[:l2]
            try:
                i = 0
                for x in toplist:
                    if dic[e] == x:
                        return x
                    i = i+1
            except:
                print('ti', e)
    return ''

In [16]:
x = 'Spectral Measures on Locally Fields [math.FA]'
split_titles('math', [x])

'analysis'

when split_titles is called with "all4" as the topic it returns the toplevel category for the title.

In [18]:
split_titles('all4', [x])

'math'

In [17]:
import time

Subclassifier generates an actor for each of the top level categories.  this actor stores titles in lists for each subcategory of that top level topic

compute_subclass is our dummy worker function that takes a pointer to a subclass actor and a title and pushes it to the subclass actor

Classifier creates an actor that sorts sentences by toplevel category and launches an invocation of compute_subclass for it.

More details in the blog post.

In [7]:
@ray.remote
class SubClassifier:
    def __init__(self, topic):
        self.topic = topic
        subject,loadset, subtopics = read_config(topic,"./")
        toplist = [subtopics[i][0] for i in range(len(subtopics))]
        self.topics = {top:[] for top in toplist}
        self.counter = {top:0 for top in toplist}
        
    def send(self, title):
        subt = split_titles(self.topic,  [title])
        self.topics[subt].append(title)
        self.counter[subt]+=1
        return subt
    
    def get_classification(self):
        return self.topics, self.counter


In [None]:
#the parameter for sleep below is how we adjust the amount of "work"

In [8]:
@ray.remote
def compute_subclass(subc_ptr, title):
    time.sleep(3.0)
    fut =  subc_ptr.send.remote(title)
    return fut

In [None]:
queue_size = 0 #this size yields sequential execution

In [9]:
@ray.remote
class Classifier:
    def __init__(self, topic):
        self.topic = topic
        self.future_list = []
        self.future_count =  0
    def send(self, title):
        subt = split_titles(self.topic, [title])
        if subt != '':
            try:
                cl = ray.get_actor(subt)
            except:
                cl = SubClassifier.options(name=subt, lifetime="detached").remote(subt)
            
            self.future_list.append(compute_subclass.remote(cl, title))
            if self.future_count < queue_size:
                self.future_count +=1
            else:
                for x in self_future_list:
                    ray.get(x)
                self.future_list = []
                self.future_count = 0
        return "item sent to subcategory"



Server is the backend class for the serve client.    it sends the document to the classifier object.

In [10]:
class Server:
    def __init__(self):
        self.counter = 0
        try:
            cl = ray.get_actor('all4')
            print(cl)
        except:
            cl = Classifier.options(name='all4', lifetime="detached").remote('all4')
        self.classifier = cl

    async def __call__(self, starlette_request):
        payload_bytes = await starlette_request.body()
        arg = payload_bytes.decode("utf-8")
        v = ray.get(self.classifier.send.remote(arg))
        return {'status':v}


In [None]:
from ray import serve
import requests

client = serve.start(http_options={"host":'0.0.0.0'})
client.create_backend("sciml:v0", Server)
client.create_endpoint(
    "classify",
    backend="sciml:v0",
    route="/classify",
    methods=["POST"])

2021-03-26 10:53:45,166	INFO services.py:1172 -- View the Ray dashboard at [1m[32mhttp://127.0.0.1:8265[39m[22m


In [12]:
sent = 'Spectral Measures on Locally Fields [math.FA]'
resp = requests.post(
    "http://44.234.63.1:8000/classify", data=sent)
print(resp.json())

[2m[36m(pid=17905)[0m 2021-03-19 17:59:33,566	INFO router.py:249 -- Endpoint classify doesn't exist, waiting for registration.


{'status': 'analysis'}


If you run the code in a cluster the actors are persistent.   here is a way to see their contents and (if uncommented) kill each one.   

In [13]:
subject,loadset, subtopics = read_config('all4',"./")
dic = make_dict(subtopics)
toplist = [subtopics[i][0] for i in range(len(subtopics))]
total =  0
for top in toplist:
    try:
        sub,loads, subtop = read_config(top,"./")
        stoplist = [subtop[i][0] for i in range(len(subtop))]
        actor = ray.get_actor(top)
        l = ray.get(actor.get_classification.remote())
        local_tot =  0
        for t in stoplist:
            local_tot += l[1][t]
        total += local_tot
        print('topic =', top, 'number =', local_tot)
        #print(l)
        #ray.kill(actor)
    except:
        print('no actor for topic ', top)
print('total =', total)
        

topic = physics number = 62
topic = math number = 28
topic = bio number = 11
topic = compsci number = 32
topic = finance number = 7
total = 140


In [15]:
try:
    actor = ray.get_actor('all4')
    ray.kill(ray.get_actor('all4'))
except:
    print('no all4')