# 1. Initialization

### Setup

In [1]:
import ray
import pandas as pd
import time 
import numpy as np
import binascii
import redis
import pprint
import json
import qgrid
import matplotlib.pyplot as plt
pp = pprint.PrettyPrinter() # for printing dicts and lists in a manner easy for the eyes
from misc import *

In [2]:
ray.init(num_cpus=3)

Waiting for redis server at 127.0.0.1:45165 to respond...
Waiting for redis server at 127.0.0.1:47448 to respond...
Starting local scheduler with 3 CPUs and 0 GPUs.


{'local_scheduler_socket_names': ['/tmp/scheduler5652293'],
 'node_ip_address': '127.0.0.1',
 'object_store_addresses': [ObjectStoreAddress(name='/tmp/plasma_store25119255', manager_name='/tmp/plasma_manager97084075', manager_port=51877)],
 'redis_address': '127.0.0.1:45165'}

### Function definitions

In [3]:
@ray.remote
def example(x):
    time.sleep(np.random.random())
    return np.random.randn()

@ray.remote
def example2(x): 
    return "hi"

@ray.remote
class TestCls():
    def __init__(self):
        self.g = 1
        
    def to_go(self, x):
        return x

    
@ray.remote
class Outer():
    def __init__(self):
        self.f = 1
        self.test = TestCls.remote()
    
    def to_go2(self, x):
        return x * 2
    
    def error(self):
        return 1/0

### Generate data in Redis

In [4]:
# Here, we generate data in redis for remote tasks
results = ray.get([example.remote(x) for x in range(4)])

results2 = ray.get([example2.remote(x) for x in range(20)])

# Generating data for Actor tasks
actor = TestCls.remote()
actor_results = ray.get([actor.to_go.remote(1)])

err_actor = Outer.remote()
err_actor.error.remote()

ObjectID(af81aa6e6ad5c7106dbbc5b90c69365cde076085)

Remote function error failed with:

Traceback (most recent call last):
  File "/Users/michellemarzoev/.local/lib/python3.6/site-packages/ray-0.1.1-py3.6-macosx-10.7-x86_64.egg/ray/worker.py", line 1754, in process_task
    worker.actors[task.actor_id().id()], *arguments)
  File "<ipython-input-3-c197885e48f2>", line 29, in error
ZeroDivisionError: division by zero


You can inspect errors by running

    ray.error_info()

If this driver is hanging, start a new one with

    ray.init(redis_address="127.0.0.1:45165")



### Connect to Redis

In [5]:
addr, port = ray.worker.global_worker.redis_address.split(":")
rc = redis.StrictRedis(host=addr, port=port, decode_responses=True, encoding='latin-1', encoding_errors='replace')

# 2. Jobs Data


### Remote Functions Information

In [6]:
fn_table = ray.global_state.function_table()
fn_list = []
for fn_id in fn_table:
    val = fn_table[fn_id]
    val["function_id"] = fn_id
    fn_list.append(val)
qgrid.nbinstall(overwrite = True)
qgrid.show_grid(pd.DataFrame(fn_list))

### Task Information

In [11]:
from pandas.io.json import json_normalize

tt = ray.global_state.task_table()
tt_list = list(tt.values())
tt_list

for d in tt_list:
    d['TaskSpec']['ReturnObjectIDs'] = [oid.hex() for oid in d['TaskSpec']['ReturnObjectIDs']]

task_df = json_normalize(tt_list)
qgrid.show_grid(task_df)

### Task - Worker Placement Information

In [12]:
event_names = rc.keys("event_log*")
results = dict()
for i in range(len(event_names)):
    event_list = rc.lrange(event_names[i], 0, -1)
    for event in event_list:
        event_dict = json.loads(event)
        task_id = ""
        worker_id = ""
        function_name = ""
    for element in event_dict:
        if "task_id" in element[3] and "worker_id" in element[3]:
            task_id = element[3]["task_id"]
            worker_id = element[3]["worker_id"]
            function_name = element[3]["function_name"]
        if task_id != "" and worker_id != "" and function_name != "":
            results[worker_id] = {}
            results[worker_id]["task_id"] = task_id
            results[worker_id]["function_name"] = function_name
results_table = pd.DataFrame.from_dict(results)
qgrid.show_grid(results_table.T)

### Task Profiles

In [26]:
task_profiles, events = ray.global_state.task_profiles()
profiles_dict = dict()
for task_id, profiles in task_profiles.items(): 
    for profile in profiles:
        start_exec = -1
        end_exec = -1 
        start_store = -1
        end_store = -1
        start_lock = -1
        end_lock = -1
        overall_start = profile[0][0]
        overall_end = profile[len(profile)-1][0]
        overall_dur = overall_end - overall_start
        for log in profile: 
            if log[1] == "ray:task:execute" and log[2] == 1: 
                start_exec = log[0]
            if log[1] == "ray:task:execute" and log[2] == 2: 
                end_exec = log[0]
            if log[1] == "ray:task:store_outputs" and log[2] == 1: 
                start_store = log[0]
            if log[1] == "ray:task:store_outputs" and log[2] == 2: 
                end_store = log[0]
            if log[1] == "ray:acquire_lock" and log[2] == 1: 
                start_lock = log[0]
            if log[1] == "ray:acquire_lock" and log[2] == 2: 
                end_lock = log[0]
        if start_exec != -1 and end_exec != -1 and start_store != -1 and end_store != -1 and start_lock != -1 and end_lock != -1:
            profiles_dict[task_id] = dict()
            exec_dur = end_exec - start_exec
            store_dur = end_store - start_store
            lock_dur = end_lock - start_lock
            overall_dur = overall_end - overall_start 
            profiles_dict[task_id]["execute"] = exec_dur
            profiles_dict[task_id]["store"] = store_dur
            profiles_dict[task_id]["acquire_lock"] = lock_dur
            profiles_dict[task_id]["total"] = overall_dur
            profiles_dict[task_id]["other"] = overall_dur - exec_dur - store_dur - lock_dur
results_table = pd.DataFrame.from_dict(profiles_dict)
qgrid.show_grid(results_table.T)



In [24]:
total_acq = -1
total_exec = -1
total_store = -1
total_other = -1 
total = -1
for value in profiles_dict.values(): 
    total_exec += value["execute"]
    total_acq += value["acquire_lock"]
    total_store += value["store"]
    total_other += value["other"]
    total += value["total"]
'''
labels = 'Acquire', 'Execute', 'Store', 'Other'
sizes = [total_acq/total, total_exec/total, total_store/total, total_other/total]
explode = (0, 0.1, 0, 0)
plt.pie(sizes, explode=explode, labels=labels,
        autopct='%1.1f%%', shadow=True, startangle=140)
plt.axis('equal')
plt.show()
'''

"\nlabels = 'Acquire', 'Execute', 'Store', 'Other'\nsizes = [total_acq/total, total_exec/total, total_store/total, total_other/total]\nexplode = (0, 0.1, 0, 0)\nplt.pie(sizes, explode=explode, labels=labels,\n        autopct='%1.1f%%', shadow=True, startangle=140)\nplt.axis('equal')\nplt.show()\n"

### Event Profiles 

In [15]:
event_list = []

# Get and decode all task timing/event logs
for key in rc.keys("event_log*"):
    content = rc.lrange(key, 0, -1)
    event_list.append(json.loads(content[0])) 
    
from collections import defaultdict

# event_dict is used to store timing info
event_dict = defaultdict(lambda: np.full(len(event_list), np.nan))

# info_dict is used to store meta data - such as function names and task id
info_dict = defaultdict(lambda: [None] * len(event_list))

for i, task_event in enumerate(event_list):
    for event in (task_event):
        time, label, startstop, info = event
        event_dict[(label, startstop)][i] = time
        if info:
            for k in info:
                info_dict[k][i] = info[k]

edf = pd.DataFrame(dict(event_dict))
edf.rename(columns={1: 'start', 2:'end'}, inplace=True)
edf

Unnamed: 0_level_0,ray:acquire_lock,ray:acquire_lock,ray:get_task,ray:get_task,ray:import_remote_function,ray:import_remote_function,ray:submit_task,ray:submit_task,ray:task,ray:task,ray:task:execute,ray:task:execute,ray:task:get_arguments,ray:task:get_arguments,ray:task:store_outputs,ray:task:store_outputs,ray:wait_for_function,ray:wait_for_function
Unnamed: 0_level_1,start,end,start,end,start,end,start,end,start,end,start,end,start,end,start,end,start,end
0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,,,,,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0
1,1497513000.0,1497513000.0,1497513000.0,1497513000.0,,,,,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0
2,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,,,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0
3,1497513000.0,1497513000.0,1497513000.0,1497513000.0,,,,,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0
4,1497513000.0,1497513000.0,1497513000.0,1497513000.0,,,,,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0
5,1497513000.0,1497513000.0,1497513000.0,1497513000.0,,,,,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0
6,1497513000.0,1497513000.0,1497513000.0,1497513000.0,,,,,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0
7,1497513000.0,1497513000.0,1497513000.0,1497513000.0,,,,,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0
8,1497513000.0,1497513000.0,1497513000.0,1497513000.0,,,,,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0
9,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,,,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0,1497513000.0


### Stragglers

In [16]:
event_names = rc.keys("event_log*")
x = 10
stragglers = dict()

for i in range(len(event_names)):
    event_list = rc.lrange(event_names[i], 0, -1)
    for event in event_list:
        event_dict = json.loads(event)
        task_id = ""
        overall_start = event_dict[0][0]
        overall_end = event_dict[len(event_dict)-1][0]
        overall_dur = overall_end - overall_start
        exec_start = -1
        exec_end = -1
        exec_dur = -1
        for element in event_dict:
            if element[1] == "ray:task:execute" and element[2] == 1:
                exec_start = element[0]
            if element[1] == "ray:task:execute" and element[2] == 2:
                exec_end = element[0]
            if "task_id" in element[3]:
                task_id = element[3]["task_id"]
        if exec_start != -1 and exec_end != -1 and task_id != "":
            exec_dur = exec_end - exec_start
            if len(stragglers.keys()) < x:
                stragglers[task_id] = exec_dur
            if len(stragglers.keys()) == x:
                shortest_time = min(stragglers.values()) 
                for tid, time in stragglers.items(): 
                    if time == shortest_time: 
                        del[tid] 
                        stragglers[task_id] = exec_dur 
                        break
results_table = pd.DataFrame(stragglers, index = [0])
qgrid.show_grid(results_table.T)


### Reconstructed Task Information

In [17]:
event_names = rc.keys("event_log*")
attempted = dict()
reconstructed = dict()
for i in range(len(event_names)):
    event_list = rc.lrange(event_names[i], 0, -1)
    for event in event_list:
        event_dict = json.loads(event)
        task_id = ""
        for element in event_dict:
            if "task_id" in element[3]:
                task_id = element[3]["task_id"]
        if task_id != "":
            if task_id in attempted:
                if task_id not in reconstructed:
                    reconstructed[task_id] = 0
                    reconstructed[task_id] += 1
                else:
                    attempted[task_id] = True
results_table = pd.DataFrame(reconstructed)
qgrid.show_grid(results_table)

# 3. System State


### Node Information

In [18]:
# Using the global state API, we can populate a DataFrame with a list of Redis Clients currently connected
ctable = ray.global_state.client_table()

client_list = []
for node_ip in ctable:
    for client in ctable[node_ip]:
        client["node_ip_address"] = node_ip
        client_list.append(client)

client_df = pd.DataFrame(client_list)
qgrid.show_grid(client_df)

### Object Store

In [19]:
# We can populate a DataFrame with a list of objects in the object store
object_dict = {oid.hex(): v for oid, v in ray.global_state.object_table().items()}
object_df = pd.DataFrame(object_dict).transpose()
qgrid.show_grid(object_df)

### Object - Worker Placement Information 

In [20]:
# Objects associated with each worker_id 
object_table = ray.global_state.object_table()
location_to_objects = dict()

for object_id, object_descriptor in object_table.items():
    if object_descriptor["ManagerIDs"] != None: 
        for location in object_descriptor["ManagerIDs"]:
            if location not in location_to_objects:
                location_to_objects[location] = []
            object_id = str(object_id)
            obj_comp = object_id.split("(")
            obj_comps = obj_comp[1].split(")") 
            object_id = obj_comps[0]
            location_to_objects[location].append(object_id)
table = pd.DataFrame.from_dict(location_to_objects)
qgrid.show_grid(table)

### Worker Information

In [21]:
workers = rc.keys("Worker*") 
worker_info = dict()
for worker in workers:
    worker_key_str = worker[len('Workers:'):]
    worker_key_bytes = worker_key_str.encode('latin-1')
    worker_info['Workers:{}'.format(hex_identifier(worker_key_bytes))] = rc.hgetall(worker)
table = pd.DataFrame.from_dict(worker_info)
qgrid.show_grid(table.T)

# 3. Error Information

### Error Profiles


In [22]:
event_names = rc.keys("event_log*")
error_profiles = dict()
for i in range(len(event_names)):
    event_list = rc.lrange(event_names[i], 0, -1)
    for event in event_list:
        event_dict = json.loads(event)
        task_id = ""
        traceback = ""
        worker_id = ""
        start_time = -1
    for element in event_dict:
        if element[1] == "ray:task:execute" and element[2] == 1:
            start_time = element[0]
        if "task_id" in element[3] and "worker_id" in element[3]:
            task_id = element[3]["task_id"]
            worker_id = element[3]["worker_id"]
        if "traceback" in element[3]:
            traceback = element[3]["traceback"]
        if task_id != "" and worker_id != "" and traceback != "":
            if start_time != -1:
                error_profiles[task_id] = dict()
                error_profiles[task_id]["worker_id"] = worker_id
                error_profiles[task_id]["traceback"] = traceback
                error_profiles[task_id]["start_time"] = start_time
table = pd.DataFrame.from_dict(error_profiles) 
qgrid.show_grid(table.T)