In [1]:
%matplotlib inline
import importlib
import numpy as np
import matplotlib.pyplot as plt
import concurrent.futures
import time
import pandas as pd
from tqdm.auto import tqdm

import utils
import manager
import plotting
from config import REDIS_URL

from collections import deque

importlib.reload(plotting);
importlib.reload(manager);
importlib.reload(utils);

plotting.bokeh_output_notebook()

In [2]:
m = manager.Manager()
m

<Manager: broker='redis://uaf-1.t2.ucsd.edu:50963', workers=0>

In [3]:
m.stop_all_workers(progress_bar=False)
time.sleep(0.5)
utils.start_local_workers(5)

(<concurrent.futures.process.ProcessPoolExecutor at 0x7f0d24870710>,
 [<Future at 0x7f0c7fd04080 state=running>,
  <Future at 0x7f0c7fd04c18 state=pending>,
  <Future at 0x7f0c7fd04cc0 state=pending>,
  <Future at 0x7f0c7fd04d68 state=pending>,
  <Future at 0x7f0c7fd04e48 state=pending>])

In [4]:
def f(x):
    time.sleep(0.2)
    import math
    [math.sqrt(i) for i in range(50000)]
    return x
results = m.remote_map(f,range(100),blocking=True)
plotting.plot_timeflow_bokeh(results)

HBox(children=(IntProgress(value=0), HTML(value='')))




In [5]:
df = m.get_worker_info(include_stats=True)
df

Unnamed: 0_level_0,addr,age,id,idle,node,query_t,node_cpu_time,node_iowait_time,node_read_bytes,node_recv_bytes,node_sent_bytes,node_t,node_write_bytes,worker_busy,worker_cpu_time,worker_mem_used,worker_read_bytes,worker_tasks,worker_time_elapsed,worker_write_bytes
name,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1,Unnamed: 7_level_1,Unnamed: 8_level_1,Unnamed: 9_level_1,Unnamed: 10_level_1,Unnamed: 11_level_1,Unnamed: 12_level_1,Unnamed: 13_level_1,Unnamed: 14_level_1,Unnamed: 15_level_1,Unnamed: 16_level_1,Unnamed: 17_level_1,Unnamed: 18_level_1,Unnamed: 19_level_1,Unnamed: 20_level_1
namin__uaf-10.t2.ucsd.edu__0.3818539,169.228.130.74:7154,5,1710,5,uaf-10.t2.ucsd.edu,1567403000.0,30190078.89,2086159.64,22417008039424,57537502267729,3708369269907,1567403000.0,16067371244032,False,0.3,99872768,0,20,4.275158,0
namin__uaf-10.t2.ucsd.edu__0.3818541,169.228.130.74:7156,5,1711,5,uaf-10.t2.ucsd.edu,1567403000.0,30190078.89,2086159.64,22417008039424,57537502267729,3708369269907,1567403000.0,16067371244032,False,0.31,99876864,0,20,4.293509,0
namin__uaf-10.t2.ucsd.edu__0.3818542,169.228.130.74:7158,5,1712,5,uaf-10.t2.ucsd.edu,1567403000.0,30190078.89,2086159.64,22417008039424,57537502267729,3708369269907,1567403000.0,16067371244032,False,0.31,99880960,0,20,4.29571,0
namin__uaf-10.t2.ucsd.edu__0.3818543,169.228.130.74:7160,5,1713,5,uaf-10.t2.ucsd.edu,1567403000.0,30190078.89,2086159.64,22417008039424,57537502267729,3708369269907,1567403000.0,16067371244032,False,0.28,99880960,0,20,4.267194,0
namin__uaf-10.t2.ucsd.edu__0.3818540,169.228.130.74:7152,5,1709,5,uaf-10.t2.ucsd.edu,1567403000.0,30190078.89,2086159.64,22417008039424,57537502267729,3708369269907,1567403000.0,16067371244032,False,0.28,99872768,0,20,4.270944,0


In [6]:
class RollingMonitor(object):
    
    def __init__(self,manager):
        self.manager = manager
        self.curr_info = self.prev_info = pd.DataFrame()
        self.curr_update_time = self.prev_update_time = time.time()
        
        N = 150
        self.dqs = dict(
            timestamps = deque([],N),
            timestamps_pd = deque([],N),
            num_workers_total = deque([],N),
            num_workers_busy = deque([],N),
            worker_tasks = deque([],N),
            
            worker_mem_used_gb = deque([],N),
            
            worker_cpu_time_frac = deque([],N),
            node_cpu_time_frac = deque([],N),
            node_iowait_time_frac = deque([],N),
            
            worker_read_mbps = deque([],N),
            worker_write_mbps = deque([],N),
            node_recv_mbps = deque([],N),
            node_sent_mbps = deque([],N),
        )
        
    def update(self, min_delay=0.5):
        df = self.manager.get_worker_info(include_stats=True,min_delay=min_delay).dropna()
        self.curr_info = df
        self.curr_update_time = df["query_t"].mean()
        if not self.prev_info.empty:
            self.push_deques()
        self.prev_info = self.curr_info
        self.prev_update_time = self.curr_update_time
        
    def push_deques(self):
        dt = (self.curr_update_time - self.prev_update_time)
        if dt < 1e-6: return
        
        ci = self.curr_info
        pi = self.prev_info
        
        # for node, we need to uniquify on node first
        cinodes = ci.drop_duplicates(["node"])
        pinodes = pi.drop_duplicates(["node"])
        num_nodes_total = len(cinodes)
        
        num_workers_total = len(ci)
        num_workers_busy = ci["worker_busy"].sum()
        worker_tasks = (ci["worker_tasks"].sum() - pi["worker_tasks"].sum())
        
        worker_mem_used_gb = ci["worker_mem_used"].sum()/1e9
        
        worker_cpu_time_frac = (ci["worker_cpu_time"].sum()-pi["worker_cpu_time"].sum())/dt/num_workers_total
        worker_read_mbps = (ci["worker_read_bytes"].sum()-pi["worker_read_bytes"].sum())/dt/1e6
        worker_write_mbps = (ci["worker_write_bytes"].sum()-pi["worker_write_bytes"].sum())/dt/1e6
        
        node_cpu_time_frac = (cinodes["node_cpu_time"].sum()-pinodes["node_cpu_time"].sum())/dt/num_nodes_total
        node_iowait_time_frac = (cinodes["node_iowait_time"].sum()-pinodes["node_iowait_time"].sum())/dt/num_nodes_total
        node_sent_mbps = (cinodes["node_sent_bytes"].sum()-pinodes["node_sent_bytes"].sum())/dt/1e6
        node_recv_mbps = (cinodes["node_recv_bytes"].sum()-pinodes["node_recv_bytes"].sum())/dt/1e6
        
        self.dqs["timestamps"].append(self.curr_update_time)
        self.dqs["timestamps_pd"].append(pd.to_datetime(self.curr_update_time,unit="s",utc=True))
        
        self.dqs["num_workers_total"].append(num_workers_total)
        self.dqs["num_workers_busy"].append(num_workers_busy)
        self.dqs["worker_tasks"].append(worker_tasks)

        self.dqs["worker_mem_used_gb"].append(worker_mem_used_gb)

        self.dqs["worker_cpu_time_frac"].append(worker_cpu_time_frac)
        self.dqs["node_cpu_time_frac"].append(node_cpu_time_frac)
        self.dqs["node_iowait_time_frac"].append(node_iowait_time_frac)

        self.dqs["worker_read_mbps"].append(worker_read_mbps)
        self.dqs["worker_write_mbps"].append(worker_write_mbps)
        self.dqs["node_recv_mbps"].append(node_recv_mbps)
        self.dqs["node_sent_mbps"].append(node_sent_mbps)
        

In [7]:
rm = RollingMonitor(m)
rm.dqs

{'node_cpu_time_frac': deque([]),
 'node_iowait_time_frac': deque([]),
 'node_recv_mbps': deque([]),
 'node_sent_mbps': deque([]),
 'num_workers_busy': deque([]),
 'num_workers_total': deque([]),
 'timestamps': deque([]),
 'timestamps_pd': deque([]),
 'worker_cpu_time_frac': deque([]),
 'worker_mem_used_gb': deque([]),
 'worker_read_mbps': deque([]),
 'worker_tasks': deque([]),
 'worker_write_mbps': deque([])}

In [8]:
# bokeh crap based on
# https://github.com/bokeh/bokeh/blob/master/examples/howto/notebook_comms/Continuous%20Updating.ipynb

In [9]:
from bokeh.io import push_notebook, show, output_notebook
from bokeh.models.sources import ColumnDataSource
from bokeh.models import HoverTool
from bokeh.plotting import figure 
from bokeh.layouts import row, gridplot
output_notebook()

In [10]:
data = rm.dqs
source = ColumnDataSource(data)

ps = []
for i in range(4):
    ps.append(
        figure(tools="pan,wheel_zoom,box_zoom,reset,hover",x_axis_type='datetime')
    )
    

extra = dict(line_width=2)
r1 = ps[0].line(x="timestamps_pd",y="num_workers_total",source=source,legend="# total workers",color="navy",**extra)
r1 = ps[0].line(x="timestamps_pd",y="num_workers_busy",source=source,legend="# busy workers",color="firebrick",**extra)
r1 = ps[0].line(x="timestamps_pd",y="worker_tasks",source=source,legend="# tasks",color="olive",**extra)


r1 = ps[1].line(x="timestamps_pd",y="worker_read_mbps",source=source,legend="total worker read (MB/s)",color="navy",**extra)
r1 = ps[1].line(x="timestamps_pd",y="worker_write_mbps",source=source,legend="total worker write (MB/s)",color="blue",**extra)
r1 = ps[1].line(x="timestamps_pd",y="node_recv_mbps",source=source,legend="total node net recv (MB/s)",color="firebrick",**extra)
r1 = ps[1].line(x="timestamps_pd",y="node_sent_mbps",source=source,legend="total node net sent (MB/s)",color="red",**extra)

r1 = ps[2].line(x="timestamps_pd",y="worker_mem_used_gb",source=source,legend="total worker mem used (GB)",**extra)

r1 = ps[3].line(x="timestamps_pd",y="worker_cpu_time_frac",source=source,legend="avg worker CPU time fraction",color="firebrick",**extra)
r1 = ps[3].line(x="timestamps_pd",y="node_cpu_time_frac",source=source,legend="avg node CPU time fraction",color="navy",**extra)
r1 = ps[3].line(x="timestamps_pd",y="node_iowait_time_frac",source=source,legend="avg node IOwait time fraction",color="red",**extra)

for p in ps:
    p.legend.location = "top_left"
    p.legend.click_policy="hide"
    p.plot_height = 200
    p.plot_width = 450

p = gridplot([
    [ps[0],ps[1]],
    [ps[2],ps[3]],
])
target = show(p, notebook_handle=True)

In [12]:
i = 0
while True:
    i +=1 
    if i>5:
        break
#     p1.title.text = str(i)
        
    rm.update(min_delay=1.5)
    r1.data_source.data = rm.dqs
    push_notebook(handle=target)