# Evaluate the box below to initialize the web UI.

In [1]:
%%html
<script>
  function code_toggle() {
    if (code_shown){
      $('div.input').hide('500');
      $('#toggleButton').val('Show Code')
    } else {
      $('div.input').show('500');
      $('#toggleButton').val('Hide Code')
    }
    code_shown = !code_shown
  }

  $( document ).ready(function(){
    code_shown=false;
    $('div.input').hide()
  });
</script>
<form action="javascript:code_toggle()"><input type="submit" id="toggleButton" value="Show Code"></form>

<button id="do_run_all">Click to run all</button>
<script>
$("#do_run_all").click(
    function () {
        $("#run_all_cells").click();
    }
);
</script>

In [None]:
import ipywidgets as widgets
import os
import pandas as pd
import pprint
import qgrid
import ray
import subprocess
import sys
import tempfile
import time
import math

from IPython.display import display

ray.init(redis_address=os.environ["REDIS_ADDRESS"])

## Summary Statistics

In [None]:
earliest, latest, number = ray.global_state.job_length()
dur = latest - earliest
print("This job executed for " + str(dur) + " seconds and a total of " + str(number) + " tasks were run.")

## Object search

In [None]:
object_search = widgets.Text(
    value="",
    placeholder="Object ID",
    description="Search for an object:",
    disabled=False
)
display(object_search)

def handle_submit(sender):
    pp = pprint.PrettyPrinter()
    pp.pprint(ray.global_state.object_table(object_search.value))

object_search.on_submit(handle_submit)

## Task search

In [None]:
def dependency_graph(task_id):
    import networkx as nx
    from bokeh.plotting import figure, show
    from bokeh.resources import CDN
    from bokeh.io import output_notebook
    from bokeh.models import CustomJS, ColumnDataSource, Slider, HoverTool, TapTool
    output_notebook( resources=CDN )
    from bokeh.layouts import gridplot
    from bokeh.models.widgets import Div
 
    edges = []
    nodes = set()
 
    DG = nx.DiGraph()
 
    import time
    task_profiles = ray.global_state.task_profiles(start=0,end=time.time())
    task_info = ray.global_state.task_table()
   
    if task_id not in task_info:
        raise ValueError()
   
    DG.add_node(task_id)
 
    try:
        while task_id not in nodes:
            nodes.add(task_id)
            parent_id = task_info[task_id]["TaskSpec"]["ParentTaskID"]
            if parent_id not in task_info:
                break
            DG.add_node(parent_id)
            edges.append([task_id, parent_id])
            task_id = parent_id
    except KeyError:
        pass
   
    DG.add_edges_from(edges)
   
    from collections import defaultdict
    df = defaultdict(list)
    pts = nx.shell_layout(DG)
   
    for task_id, indices in pts.items():
        df["x"].append(indices[0])
        df["y"].append(indices[1])
        df["task_id"].append(task_info[task_id]["TaskSpec"]["TaskID"])
       
        try:
            df["function_name"].append(task_profiles[task_id]["function_name"])
        except KeyError:
            df["function_name"].append("None")
       
        try:
            df["parent_id"].append(task_info[task_id]["TaskSpec"]["ParentTaskID"])
            df["return_object"].append(list(map(lambda x: x.hex(), task_info[task_id]["TaskSpec"]["ReturnObjectIDs"])))
        except KeyError:
            df["parent_id"].append("None")
 
    source = ColumnDataSource(data=dict(
        x=[],
        y=[],
        task_id=[],
        parent_id=[],
        function_name=[],
        return_object=[]
    ))
    source.data = dict(
                    x= df["x"],
                    y= df["y"],
                    parent_id=df["parent_id"],
                    task_id=df["task_id"],
                    function_name=df["function_name"],
                    return_object=df["return_object"]
                    )
    
    hover = HoverTool(tooltips=[
        ("TaskID", "@task_id"),
        ("Function", "@function_name"),
        ("ParentID", "@parent_id"),
        ("Return ObjectID", "@return_object")
    ])
 
    p = figure(
        x_range=(-5,5),
        y_range=(-5,5),
        height=700,
        width=700,
        tools=[hover,"pan","wheel_zoom","box_zoom", "save"],
        toolbar_sticky=False
    )
 
    p.xaxis.visible = False
    p.yaxis.visible = False
 
    p.line(
        x="x",
        y="y",
        source=source
    )
 
    p.circle(
    x="x",
    y="y",
    source=source,
    size=40,
    color="#B3B3B3")
 
    show(gridplot(p, ncols=1, plot_width=500, plot_height=500, toolbar_location="below"))
 
 
task_search = widgets.Text(
    value="",
    placeholder="Task ID",
    description="Search for a task:",
    disabled=False
)
display(task_search)
 
def handle_submit(sender):
    pp = pprint.PrettyPrinter()
    pp.pprint(ray.global_state.task_table(task_search.value))
    dependency_graph(task_search.value)

task_search.on_submit(handle_submit)

## Error search

In [None]:
results = dict()
def handle_submit(sender):
    msg = error_search.value
    errors = ray.global_state.error_info()
    for task_id, data in errors.items(): 
        if msg in data["traceback"]: 
            results[task_id] = data 
    df = pd.DataFrame(results)
    df_t = df.T
    df_t.index.name = "TaskID"
    grid = qgrid.QGridWidget(df=df_t)
    display(grid)

error_search = widgets.Text(
    value="",
    placeholder="e.g. - division by zero",
    description="Error message",
    disabled=False
)
display(error_search)
error_search.on_submit(handle_submit)

In [None]:
class _EventRecursionContextManager(object):
    def __init__(self):
        self.should_recurse = True
   
    def __enter__(self):
        self.should_recurse = False
       
    def __exit__(self, *args):
        self.should_recurse = True

total_time_value = '% total time'
total_tasks_value = '% total tasks'
   
def get_sliders(update):
    start_box = widgets.FloatText(
        description='Start Time:',
        disabled=True,
    )
    end_box = widgets.FloatText(
        description='End Time:',
        disabled=True,
    )
    range_slider = widgets.IntRangeSlider(
        value=[70, 100],
        min=0,
        max=100,
        step=1,
        description='%:',
        continuous_update=False,
        orientation='horizontal',
        readout=True,
        readout_format='.0i%',
    )
    num_tasks_box = widgets.IntText(
        description='Num Tasks:',
        disabled=False
    )
 
    breakdown_opt = widgets.Dropdown(
        options=[total_time_value, total_tasks_value],
        value=total_tasks_value,
        description="Selection Options:"
    )
   
    INIT_EVENT = 'INIT'
    out_recursion = _EventRecursionContextManager()
   
    def update_wrapper(event):
        if not out_recursion.should_recurse:
            return
       
        with out_recursion:
            smallest, largest, num_tasks = ray.global_state.job_length()
            diff = largest - smallest
 
            if event == INIT_EVENT:
                if breakdown_opt.value == total_tasks_value:
                    num_tasks_box.value = -min(10000, num_tasks)
                    range_slider.value = (int(100 - (100. * -num_tasks_box.value) / num_tasks), 100)
                else:
                    low, high = map(lambda x: x / 100., range_slider.value)
                    start_box.value = round(diff * low, 2)
                    end_box.value = round(diff * high, 2)
            elif event['owner'] == start_box:
                if start_box.value > end_box.value:
                    start_box.value = end_box.value
                elif start_box.value < 0:
                    start_box.value = 0
               
                low, high = range_slider.value
                range_slider.value = (int((start_box.value * 100.) / diff), high)
            elif event['owner'] == end_box:
                if start_box.value > end_box.value:
                    end_box.value = start_box.value
                elif end_box.value > diff:
                    end_box.value = diff
               
                low, high = range_slider.value
                range_slider.value = (low, int((end_box.value * 100.) / diff))
            elif event['owner'] == breakdown_opt:
                if breakdown_opt.value == total_tasks_value:
                    start_box.disabled = True
                    end_box.disabled = True
                    num_tasks_box.disabled = False
                    num_tasks_box.value = min(10000, num_tasks)
                    range_slider.value = (int(100 - (100. * num_tasks_box.value) / num_tasks), 100)
                else:
                    start_box.disabled = False
                    end_box.disabled = False
                    num_tasks_box.disabled = True
                    range_slider.value = (int((start_box.value * 100.) / diff),
                                          int((end_box.value * 100.) / diff))
            elif event['owner'] == range_slider:
                low, high = map(lambda x: x / 100., range_slider.value)
                if breakdown_opt.value == total_tasks_value:
                    old_low, old_high = event['old']
                    new_low, new_high = event['new']
                    if old_low != new_low:
                        range_slider.value = (new_low, 100)
                        num_tasks_box.value = -(100. - new_low) / 100. * num_tasks
                    else:
                        range_slider.value = (0, new_high)
                        num_tasks_box.value = new_high / 100. * num_tasks
                else:
                    start_box.value = round(diff * low, 2)
                    end_box.value = round(diff * high, 2)
            elif event['owner'] == num_tasks_box:
                if num_tasks_box.value > 0:
                    range_slider.value = (0, int(100 * float(num_tasks_box.value) / num_tasks))
                elif num_tasks_box.value < 0:
                    range_slider.value = (100 + int(100 * float(num_tasks_box.value) / num_tasks), 100)
            else:
                raise ValueError('Unknown event owner!')
               
            if not update:
                return
 
            diff = largest - smallest
            low, high = map(lambda x: x / 100., range_slider.value)
 
            if breakdown_opt.value == total_time_value:
                tasks = ray.global_state.task_profiles(start=smallest + diff * low, end=smallest + diff * high)
            elif breakdown_opt.value == total_tasks_value:
                if range_slider.value[0] == 0:
                    tasks = ray.global_state.task_profiles(num_slice=int(num_tasks * high), fwd=True)
                else:
                    tasks = ray.global_state.task_profiles(num_slice=int(num_tasks * (high - low)), fwd=False)
            else:
                raise ValueError('Value "{}" is not a legal breakdown value'.format(breakdown_opt.value))
           
            update(smallest, largest, num_tasks, tasks)
       
    range_slider.observe(update_wrapper, names='value')
    breakdown_opt.observe(update_wrapper, names='value')
    start_box.observe(update_wrapper, names='value')
    end_box.observe(update_wrapper, names='value')
    num_tasks_box.observe(update_wrapper, names='value')
   
    update_wrapper(INIT_EVENT)
   
    display(start_box, end_box, range_slider, num_tasks_box, breakdown_opt)
   
    return start_box, end_box, range_slider, breakdown_opt

## Task timeline

In [None]:
def task_timeline():
    path_input = widgets.Button(description="View task timeline")

    breakdown_basic = "Basic"
    breakdown_task = "Task Breakdowns"
    
    breakdown_opt = widgets.Dropdown(
        options=["Basic", "Task Breakdowns"],
        value="Basic",
        description="View options:",
        disabled=False,
    )
    
    start_box, end_box, range_slider, time_opt = get_sliders(False)
    display(breakdown_opt)
    display(path_input)

    def find_trace2html():
        trace2html = os.path.join(os.path.expanduser("~"), "catapult", "tracing", "bin", "trace2html")
        assert os.path.exists(trace2html), "Could not find catapult, please clone it into your home directory from https://github.com/catapult-project/catapult/tree/master/tracing"
        return trace2html

    def handle_submit(sender):
        tmp = tempfile.mktemp() + ".json"
        tmp2 = tempfile.mktemp() + ".html"
        
        if breakdown_opt.value == breakdown_basic:
            breakdown = False
        elif breakdown_opt.value == breakdown_task:
            breakdown = True
        else:
            raise ValueError('Unexpected breakdown value "{}"'.format(breakdown_opt.value))
            
        low, high = map(lambda x: x / 100., range_slider.value)
        
        smallest, largest, num_tasks = ray.global_state.job_length()
        diff = largest - smallest

        if time_opt.value == total_time_value:
            tasks = ray.global_state.task_profiles(start=smallest + diff * low, end=smallest + diff * high)
        elif time_opt.value == total_tasks_value:
            if range_slider.value[0] == 0:
                tasks = ray.global_state.task_profiles(num_slice=int(num_tasks * high), fwd=True)
            else:
                tasks = ray.global_state.task_profiles(num_slice=int(num_tasks * (high - low)), fwd=False)
        else:
            raise ValueError('Unexpected time value "{}"'.format(time_opt.value))
        
        print('{} tasks to trace'.format(len(tasks)))
        print("Dumping task profiling data to " + tmp)
        ray.global_state.dump_catapult_trace(tmp, tasks, breakdowns=breakdown)
        print("Converting chrome trace to " + tmp2)
        trace2html = find_trace2html()
        subprocess.check_output(["python2", trace2html, tmp, '--output', tmp2])
        print("Opening html file in browser...")
        subprocess.Popen(["open", "-a", "Google Chrome", tmp2])

    path_input.on_click(handle_submit)

task_timeline()

## Task time series

In [None]:
from bokeh.layouts import gridplot
from bokeh.plotting import figure, show, helpers
from bokeh.resources import CDN
from bokeh.io import output_notebook, push_notebook
output_notebook(resources=CDN)
from bokeh.models import Range1d, ColumnDataSource
import numpy as np

def task_time_series():
    time_series_fig = figure(title="Task Time Series",
                             tools=["save", "hover", "wheel_zoom", "box_zoom", "pan"],
                             background_fill_color="#FFFFFF", x_range=[0, 1], y_range=[0, 1])

    time_series_source = ColumnDataSource(data=dict(
        left=[],
        right=[],
        top=[]
    ))

    time_series_fig.quad(left='left', right='right', top='top', bottom=0,
                         source=time_series_source, fill_color="#B3B3B3", line_color="#033649")

    time_series_fig.xaxis.axis_label = 'Time in seconds'
    time_series_fig.yaxis.axis_label = 'Number of concurrent tasks'

    handle = show(gridplot(time_series_fig, ncols=1, plot_width=500, plot_height=500, toolbar_location="below"), 
         notebook_handle=True)

    def time_series_data(abs_earliest, abs_latest, abs_num_tasks, tasks):
        granularity = 1
        earliest = time.time()
        latest = 0

        for task_id, data in tasks.items():
            if data["score"] > latest:
                latest = data["score"]
            if data["score"] < earliest:
                earliest = data["score"]

        num_buckets = math.ceil((latest - earliest) / granularity)

        buckets = []

        for i in range(0, num_buckets, granularity):
            start = i * granularity + earliest
            end = ((i + 1) * granularity) + earliest
            t = ray.global_state.task_profiles(start=start, end=end)
            buckets.append(len(t))

        if len(buckets) == 0:
            return [], [], []

        distr = []
        for x in range(len(buckets)):
            distr.extend([earliest - abs_earliest + granularity * x] * buckets[x])

        bins = [earliest - abs_earliest + (i - 1) * granularity for i in range(len(buckets) + 2)]
        hist, bin_edges = np.histogram(distr, bins=bins)

        left = bin_edges[:-1]
        right = bin_edges[1:]
        top = hist

        return left, right, top

    def time_series_update(abs_earliest, abs_latest, abs_num_tasks, tasks):
        left, right, top = time_series_data(abs_earliest, abs_latest, abs_num_tasks, tasks)

        time_series_source.data = {'left': left, 'right': right, 'top': top}

        x_range = (max(0, min(left)) if len(left) else 0, max(right) if len(right) else 1)
        y_range = (0, max(top) + 1 if len(top) else 1)

        x_range = helpers._get_range(x_range)
        time_series_fig.x_range.start = x_range.start
        time_series_fig.x_range.end = x_range.end

        y_range = helpers._get_range(y_range)
        time_series_fig.y_range.start = y_range.start
        time_series_fig.y_range.end = y_range.end

        push_notebook(handle=handle)

    get_sliders(time_series_update)
    
task_time_series()

## Cluster usage

In [None]:
import time
import math
import pandas as pd
import random
import numpy as np
from bokeh.io import show, output_notebook, push_notebook
from bokeh.resources import CDN
from bokeh.models import (
    ColumnDataSource,
    HoverTool,
    LinearColorMapper,
    BasicTicker,
    PrintfTickFormatter,
    ColorBar,
)
from bokeh.plotting import figure, helpers
output_notebook(resources=CDN)
 
def cluster_usage():
    source = ColumnDataSource(data={"node_ip_address":['127.0.0.1'], "time":[0], "num_tasks":[0]})

    colors = ["#FFFFFF", "#E8E8E8", "#DCDCDC", "#D3D3D3", "#B8B8B8", "#A8A8A8", "#696969", "#383838", "#000000"]
    mapper = LinearColorMapper(palette=colors, low=0, high=2)

    TOOLS = "hover,save,xpan,box_zoom,reset,xwheel_zoom"

    p = figure(title="Cluster Usage", y_range=list(set(source.data['node_ip_address'])),
               x_axis_location="above", plot_width=900, plot_height=500,
               tools=TOOLS, toolbar_location='below')

    p.grid.grid_line_color = None
    p.axis.axis_line_color = None
    p.axis.major_tick_line_color = None
    p.axis.major_label_text_font_size = "10pt"
    p.axis.major_label_standoff = 0
    p.xaxis.major_label_orientation = math.pi / 3

    p.rect(x="time", y="node_ip_address", width=1, height=1,
           source=source,
           fill_color={'field': 'num_tasks', 'transform': mapper},
           line_color=None)

    color_bar = ColorBar(color_mapper=mapper, major_label_text_font_size="8pt",
                         ticker=BasicTicker(desired_num_ticks=len(colors)),
                         label_standoff=6, border_line_color=None, location=(0, 0))
    p.add_layout(color_bar, 'right')

    p.select_one(HoverTool).tooltips = [
         ('Node IP Address', '@node_ip_address'),
         ('Number of tasks running', '@num_tasks'),
         ('Time', '@time')
    ]

    p.xaxis.axis_label = "Time in seconds"
    p.yaxis.axis_label = "Node IP Address"

    handle = show(p, notebook_handle=True)

    def heat_map_update(abs_earliest, abs_latest, abs_num_tasks, tasks):
        granularity = 1
        earliest = time.time()
        latest = 0

        for task_id, data in tasks.items():
            if data["score"] > latest:
                latest = data["score"]
            if data["score"] < earliest:
                earliest = data["score"]

        num_buckets = math.ceil((latest - earliest) / granularity)
        buckets = [0] * num_buckets

        worker_info = ray.global_state.workers()
        num_tasks = []
        nodes = []
        times = []
        start_point = earliest
        end_point = len(buckets) * granularity + earliest

        for i in range(0, len(buckets), granularity):
            start = i * granularity + earliest
            end = (i + 1) * granularity + earliest
            t = ray.global_state.task_profiles(start=math.floor(start), end=math.ceil(end))

            node_to_num = dict()
            for task_id, data in t.items():
                worker = data["worker_id"]
                node = worker_info[worker]["node_ip_address"]
                if node not in node_to_num:
                    node_to_num[node] = 0
                node_to_num[node] += 1

            for node_ip, counter in node_to_num.items():
                num_tasks.append(node_to_num[node_ip])
                nodes.append(node_ip)
                times.append(earliest - abs_earliest + i * granularity)

        p.y_range = helpers._get_range(list(set(nodes)))

        if len(num_tasks) == 0:
            return
        else:
            mapper.low = min(min(num_tasks), 0)
            mapper.high = max(max(num_tasks), 1)

        source.data = {"node_ip_address": nodes, "time": times, "num_tasks": num_tasks}
        push_notebook(handle=handle)

    get_sliders(heat_map_update)
    
cluster_usage()

## Task duration distribution

In [None]:
import time
from bokeh.models import Range1d
from bokeh.layouts import gridplot

def task_completion_time_distribution():
    
    p = figure(title="Task Completion Time Distribution",tools=["save", "hover", "wheel_zoom", "box_zoom", "pan"],
                background_fill_color="#FFFFFF", x_range=(0, 1), y_range = (0, 1))
    
    source = ColumnDataSource(data={
        'top': [],
        'left': [],
        'right': []
    })
    p.quad(top='top', bottom=0, left='left', right='right', source=source,
            fill_color="#B3B3B3", line_color="#033649")

    p.xaxis.axis_label = 'Time in seconds'
    p.yaxis.axis_label = 'Number of concurrent tasks'

    handle = show(gridplot(p, ncols=1, plot_width=500, plot_height=500, toolbar_location="below"),
                 notebook_handle=True)

    def task_completion_time_update(abs_earliest, abs_latest, abs_num_tasks, tasks):
        if len(tasks) == 0:
            return
        
        distr = []
        for task_id, data in tasks.items():
            distr.append(data["store_outputs_end"] - data["get_task_start"])

        top, bin_edges = np.histogram(distr, bins='auto')
        left = bin_edges[:-1]
        right = bin_edges[1:]

        source.data = {'top': top, 'left': left, 'right': right}
    
        x_range = (min(left) if len(left) else 0, max(right) if len(right) else 1)
        y_range = (0, max(top) + 1 if len(top) else 1)
        
        x_range = helpers._get_range(x_range)
        p.x_range.start = x_range.start
        p.x_range.end = x_range.end

        y_range = helpers._get_range(y_range)
        p.y_range.start = y_range.start
        p.y_range.end = y_range.end
        
        push_notebook(handle=handle)
    
    get_sliders(task_completion_time_update)

task_completion_time_distribution()

In [None]:
# import json
# import heapq
# import qgrid 
# import pandas as pd
# from pandas.io.json import json_normalize

# table = ray.global_state.task_table()
# stragglers = dict()

# def handle_submit(sender):
#     if num_slowest.value is "":
#         num_slow = sys.maxsize
#     else: 
#         num_slow = int(num_slowest.value) 
        
#     tasks = ray.global_state.task_profiles(num = num_slow)
#     longest = []
#     heapq.heapify(longest)
#     l_size = 0 
#     for task_id, data in tasks.items():
#         dur = data["store_outputs_end"] - data["get_arguments_start"]
#         heapq.heappush(longest, (dur, task_id))
#         l_size += 1 
#         if l_size > num_slow: 
#             shortest, shortest_id = heapq.heappop(longest)
#     for x,y in longest:
#         stragglers[y] = dict()
#         stragglers[y]["Duration(s)"] = x 
        
#     df_o = pd.DataFrame.from_dict(stragglers)
#     df = df_o.T
#     df.index.name = "TaskID"
#     df.columns = ["Duration (s)"]
#     grid = qgrid.QGridWidget(df=df)
#     display(grid)
     


# num_slowest = widgets.Text(
#     value="",
#     placeholder="e.g. - 10 ",
#     description="Top _ stragglers:",
#     disabled=False
# )
# display(num_slowest)
# path_input = widgets.Button(description="View table")
# display(path_input)
# path_input.on_click(handle_submit)