In [82]:
# %pylab inline
import pywren
import pickle
import seaborn as sns
import pandas as pd
import numpy as np
sns.set_style('whitegrid')
import os
import matplotlib.patches as mpatches
from itertools import count
#########################
from bokeh.core.properties import value
from bokeh.io import output_notebook, show, push_notebook
import bokeh.plotting.figure
from bokeh.models import ColumnDataSource, HoverTool, WheelZoomTool
output_notebook()

In [79]:
# run pywren
def my_function(key):
    import time
    time.sleep(1)
    return key

pwex = pywren.default_executor()
futures1 = pwex.map(my_function, range(5))
pywren.wait(futures1)

futures2 = pwex.map(my_function, range(5))
pywren.wait(futures2)

([<pywren.future.ResponseFuture at 0x103afae48>,
  <pywren.future.ResponseFuture at 0x103afadd8>,
  <pywren.future.ResponseFuture at 0x1115d51d0>,
  <pywren.future.ResponseFuture at 0x110716198>,
  <pywren.future.ResponseFuture at 0x1108ded30>],
 [])

In [80]:
# collect info from pywren futures
def collect_execution_info(futures):
    results = [f.result() for f in futures]
    run_statuses = [f.run_status for f in futures]
    invoke_statuses = [f.invoke_status for f in futures]
    # need to analyze both run_statuses and invoke_statuses
#    print(invoke_statuses)
    return {'results' : results,'run_statuses' : run_statuses, 'invoke_statuses' : invoke_statuses}

info = collect_execution_info(futures1 + futures2)

In [89]:
# visualization
def visualize_execution(info):
    # preparing data
    run_df = pd.DataFrame(info['run_statuses'])
    invoke_df = pd.DataFrame(info['invoke_statuses'])
    info_df = pd.concat([run_df, invoke_df], axis=1)
    
    def remove_duplicate_columns(df):
        Cols = list(df.columns)
        for i,item in enumerate(df.columns):
            if item in df.columns[:i]: Cols[i] = "toDROP"
        df.columns = Cols
        return df.drop("toDROP",1)

    info_df = remove_duplicate_columns(info_df)
    
    total_tasks = len(info_df)
    y = np.arange(total_tasks)
    
###################################################
# Intermediate Data Handling

    # calculated deltas
    # should ANNOTATE NAMES with deltas
    time_offset = np.min(info_df.host_submit_time)
    fields = [('host submit', info_df.host_submit_time - time_offset), 
              ('job start', info_df.start_time - info_df.host_submit_time), 
              ('setup done', info_df.setup_time), 
              ('job done', info_df.end_time - info_df.start_time - info_df.setup_time), 
              ('results returned', info_df.download_output_timestamp - info_df.end_time),
              ('server_info', info_df.server_info)]
    
    # is inaccurate if jobIDs do not start at 0
    jobIDs = list(range(len(info_df.host_submit_time)))
    jobIDs = [str(i) for i in jobIDs]
    
    # gets the host IP address
    server_info = list(fields[5][1])
    unames = list(map(lambda x:x['uname'], server_info))
    sep = ' '
    unames = [unames[int(jobID)].split(sep, 2)[1][3:] for jobID in jobIDs]
    mapping = {uname : count(1) for uname in set(unames)}   
    unames_dup = ['{} ({})'.format(x, next(mapping[x])) for x in unames]
    unames_jobID = [unames_dup[int(jobID)] + " {Task " + jobID + "}" for jobID in jobIDs]
    
    counts = dict((x,unames.count(x)) for x in set(unames))
    print(counts)
    print(counts[max(counts, key=counts.get)])
###################################################
# Bokeh Visualization
    
    # set up data sources
    
    stages = ["time to submit to host", "time to start job", "time to finish setup", "time to finish job", "time to return results"]
    data = {'host ID' : unames_jobID,
            #'jobIDs' : jobIDs,
            'time to submit to host' : list(fields[0][1]),
            'time to start job' : list(fields[1][1]),
            'time to finish setup' : list(fields[2][1]),
            'time to finish job' : list(fields[3][1]),
            'time to return results' : list(fields[4][1])}
    
    # should change colors
    # try to make first bar transparent
#    colors = ["#ffffff", "#ffccff", "#ff99ff", "#ff66ff", "#ff33ff"]
#    colors = ["#33ff00", "#330055", "#33ff00", "#330055", "#33ff00"]
    colors = ["#ffffff", "#00ff00", "#0000ff", "#800080", "#000000"]

    # plot structure
    # auto enabled wheel_zoom, but should be 'xwheel_zoom' ALSO
    p = bokeh.plotting.figure(y_range=unames_jobID, plot_height=400, plot_width=800, x_range=(-15, 70),
                              title="Pywren Execution GANTT Chart", active_scroll = 'wheel_zoom')

    # plot contents
    stacked = p.hbar_stack(stages, y='host ID', height=0.9, color=colors, source=ColumnDataSource(data),
                 legend=[value(x) for x in stages])

    # plot details
    # in order to get xwheel_zoom, scroll over the x axis; this is not intuitive
    p.legend.location = "top_left"
    p.xaxis.axis_label = "Wallclock Time (sec)"
    p.yaxis.axis_label = "Host ID (reuse #) {Task ID}"
    hover = bokeh.models.HoverTool(tooltips=[("host ID", "@{host ID}"),
                                             ("host submit delta", "@{time to submit to host}"),
                                             ("job start delta", "@{time to start job}"),
                                             ("setup done delta", "@{time to finish setup}"),
                                             ("job done delta", "@{time to finish job}"),
                                             ("results returned delta", "@{time to return results}")])
    p.add_tools(hover)

    # show plot
    bokeh.io.show(p)
    
    # find out what Amazon does with my tasks are distrubuted across WHICH containers

    # split by # OR use regex
    # what's the maximum numbers of cores per IP
    # we could find a dynamic maximum number of cores used and fill unused with whitespace
    # plot with regex
    # change legend names to reflect time
    

visualize_execution(info)

{'10-23-44-8': 2, '10-25-192-4': 4, '10-13-147-184': 4}
4
