In [None]:
import os
import re
import pprint
from collections import defaultdict

RUN_RE = re.compile(r"Running ((.*) intermediates, )?(\d+) child.*, (\d+) stream.*")
THROUGHPUT_RE = re.compile(r"Found sustainable candidate \((\d+) events/s.\)*")
BENCHMARK_RE = re.compile(r"BENCHMARK: WINDOWS: (.*) - AGG_FNS: (\w+)( - (DISTRIBUTED|SINGLE_NODE))?")
NUM_KEYS_RE = re.compile(r"BM NUM_KEYS: (\d+)")

def parse_log_file(log_file):
    sustainable_throughputs = {}
    
    current_bm = None
    current_run = None
    current_throughput = None
    has_keys = False
    with open(log_file) as f:
        for line in f:
            benchmark_match = BENCHMARK_RE.match(line)
            if benchmark_match is not None:
                curr_windows, curr_agg_fn = benchmark_match.group(1), benchmark_match.group(2)
                curr_mode = benchmark_match.group(4)
                curr_mode = curr_mode if curr_mode is not None else "DISTRIBUTED"
                current_bm = (curr_windows, curr_agg_fn, curr_mode)
                if current_bm not in sustainable_throughputs:
                    sustainable_throughputs[current_bm] = {}
                current_throughput = None
                # print(current_bm)
            
            run_match = RUN_RE.match(line)
            if run_match is not None:
                if has_keys: continue
                if current_run != None:
                    print(f"Did not find candidate line for {current_run}")
                current_run = (int(run_match.group(3)), int(run_match.group(4)))
                current_throughput = None
                # print(current_run)
            
            keys_match = NUM_KEYS_RE.match(line)
            if keys_match is not None:
                has_keys = True
                num_keys = int(keys_match.group(1))
                current_run = (1, num_keys)

            throughput_match = THROUGHPUT_RE.match(line)
            if throughput_match is not None:
                if current_throughput is not None:
                    print(f"Did not find run line after {current_run}")
                current_throughput = int(throughput_match.group(1))
                sustainable_throughputs[current_bm][current_run] = current_throughput
                current_run = None
                
    if current_run is not None:
        print(f"Did not find candidate line for {current_run}")
                
    return sustainable_throughputs
                
def get_all_throughputs(log_path):
    all_throughputs = defaultdict(dict)
    for log_file in sorted(os.listdir(log_path)):
        if log_file.endswith(".log"):
            print(f"Parsing {log_file}")
            sustainable_throughputs = parse_log_file(os.path.join(log_path, log_file))
#             print(f"current: {sustainable_throughputs}")
            for bm, tps in sustainable_throughputs.items():
                all_throughputs[bm].update(tps)
#             print(f"all:     {all_throughputs}\n")
    return all_throughputs

In [None]:
BASE_LOG_DIR = "/Users/law/repos/ma/benchmark-runs"

def merge_paths(paths):
    merged_tp = defaultdict(dict)
    for path in paths:
        abs_path = os.path.join(BASE_LOG_DIR, path)
        tp = get_all_throughputs(abs_path)
        for bm, tps in tp.items():
            merged_tp[bm].update(tps)
    pprint.pprint(merged_tp)
    return merged_tp

print("CONCURRENT")
CONCURRENT_PATHS = [
    "concurrent_tumbling_20"
]
CONCURRENT_TP = merge_paths(CONCURRENT_PATHS)
 
print("MATRIX")
MATRIX_PATHS = [
    "matrix_dist_all",
    "matrix_single_all",
]
MATRIX_TP = merge_paths(MATRIX_PATHS)

print("COUNT")
COUNT_PATHS = [
    "count_window",
]
COUNT_TP = merge_paths(COUNT_PATHS)

print("ROOT")
ROOT_PATHS = [
    "root_tp",
]
ROOT_TP = merge_paths(ROOT_PATHS)

print("NUM KEYS")
KEYS_PATHS = [
    "num_keys",
]
KEYS_TP = merge_paths(KEYS_PATHS)

print("SESSIONS")
SESSION_PATHS = [
    "sessions",
]
SESSION_TP = merge_paths(SESSION_PATHS)

In [None]:
def print_throughputs(all_throughputs):
    for benchmark, run_throughputs in sorted(all_throughputs.items()):
        print(f"Benchmark {benchmark}")
        for (num_children, num_streams), throughput in sorted(run_throughputs.items()):
            print(f"Total sustainable throughput for {num_children} child(ren) with " \
                  f"{num_streams // num_children} stream(s) each " \
                  f"is {(throughput * num_streams // num_children): >7d} events/s per child.")
        print()
    

print_throughputs(CONCURRENT_TP)
print_throughputs(MATRIX_TP)
print_throughputs(COUNT_TP)
print_throughputs(ROOT_TP)
print_throughputs(SESSION_TP)

# Plots

In [None]:
from matplotlib import rcParams
import matplotlib.pyplot as plt
rcParams.update({'figure.autolayout': True, 'pgf.rcfonts' : False, 'font.size': 14, 'lines.linewidth': 3})
plt.style.use('seaborn-deep')
FORMATS = ["b-o", "g--x", "r-^", "c-<", "m-+", "k-*",]

### Plot Sustainable Throughput

In [None]:
def plot_throughput_group(child_streams, throughputs, title):
    markers = iter(["x", "^", "o"])
    sorted_throughputs = sorted(throughputs.items())
    print(f"sorted_tps: {sorted_throughputs}")   
    
#     f, (ax, ax2) = plt.subplots(2, 1, sharex=True)
    
    for agg_fn, tp in sorted_throughputs:
        m = next(markers)
#         if agg_fn == "M_MEDIAN": 
#         plt.plot([1, 2, 4, 8], tp, marker=m, label=agg_fn.replace("M_", "").lower())
        plt.plot([1, 10, 100, 1000, 10000, 100000], tp, marker=m, label=agg_fn.replace("M_", "").lower())
        #ax2.plot([1, 2, 4, 8, 16], tp, marker=m, label=agg_fn.replace("M_", "").lower())
        
# Gap plot
#     ax.set_ylim(50, 250)  # outliers only
#     ax2.set_ylim(0, 5.5)  # most of the data

#     ax.spines['bottom'].set_visible(False)
#     ax2.spines['top'].set_visible(False)
#     ax.xaxis.set_ticks_position('none') 
#     ax.tick_params(labeltop='off')  # don't put tick labels at the top

#     d = .015  # how big to make the diagonal lines in axes coordinates
#     kwargs = dict(transform=ax.transAxes, color='k', clip_on=False)
#     ax.plot((-d, +d), (-d, +d), **kwargs)        # top-left diagonal
#     ax.plot((1 - d, 1 + d), (-d, +d), **kwargs)  # top-right diagonal

#     kwargs.update(transform=ax2.transAxes)  # switch to the bottom axes
#     ax2.plot((-d, +d), (1 - d, 1 + d), **kwargs)  # bottom-left diagonal
#     ax2.plot((1 - d, 1 + d), (1 - d, 1 + d), **kwargs)  # bottom-right diagonal   
#     ax2.legend(['max', 'avg', 'median'], loc='lower right')
#     ax2.set_xticks(range(1, 17))
#     ax2.set_xticklabels((1, 2, "", 4, "", "", "", 8, "", "", "", "", "", "", "", 16))
#     ax2.set_xlabel("# child nodes")
#     f.text(0.0, 0.55, 'windows/s in 1000', ha='center', va='center', rotation='vertical')
#     f.savefig(f"/tmp/plots/root_tp.pdf", bbox_inches="tight")
#     f.show()
        
    plt.legend()
    str_child_streams = [str(cs) for cs in child_streams]
#     plt.xticks(range(1, 17), (1, 2, "", 4, "", "", "", 8, "", "", "", "", "", "", "", 16))
    plt.ylabel("events/s in mio.")
    plt.xlabel("# keys")
#     plt.title(title)
    plt.xscale("log")
    plt.ylim(ymin=0) #, ymax=5.5)
    plt.savefig(f"/tmp/plots/num_keys.pdf")
    plt.show()


def group_throughput_mode(bms, num_child_streams):
    groups = defaultdict(dict)
    for benchmark, run_throughputs in bms:
        group, agg_fn, mode = benchmark
        group_key = (group, mode)
        print(f"Adding benchmark {benchmark} to group {group_key}")
        bm_throughputs = []
        for num_cs in num_child_streams:
            num_streams = num_cs[1] 
            throughput = run_throughputs[num_cs]
            bm_throughputs.append((num_streams * throughput) / 1_000_000)
#             bm_throughputs.append(throughput / 1_000_000)
        groups[group_key][agg_fn] = bm_throughputs
    return groups
    
def group_throughput_benchmarks(bms, num_child_streams):
    all_groups = group_throughput_mode(sorted(bms.items()), num_child_streams)
    
    groups = {}
    groups["distributed"] = {run: tps for run, tps in all_groups.items() if run[1] == "DISTRIBUTED"}
    groups["centralized"] = {run: tps for run, tps in all_groups.items() if run[1] == "SINGLE_NODE"}
    
#     print("\n\n")
#     print(f"NODE_CONFIG: {num_child_streams}")
#     print("\nGROUPS:")
#     pprint.pprint(groups)
    return groups

In [None]:
def plot_throughput_benchmarks(bms, num_child_streams):
    groups = group_throughput_benchmarks(bms, num_child_streams)
    print(groups)
    for mode, sub_groups in groups.items():
        if mode != 'distributed': continue
        for group, throughputs in sub_groups.items():
            plot_throughput_group(num_child_streams, throughputs, group)
        
def plot_multi_child_throughputs(bms):
    num_child_streams = [(1, 1), (2, 2), (4, 4), (8, 8)]  #, (16, 16)]
    plot_throughput_benchmarks(bms, num_child_streams)
    
def plot_single_child_throughputs(bms):
    num_child_streams = [(1, 1), (1, 2), (1, 4), (1, 8)]
    plot_throughput_benchmarks(bms, num_child_streams)
    
def plot_num_keys(bms):
    num_child_streams = [(1, 1), (1, 10), (1, 100), (1, 1000), (1, 10000), (1, 100000)]
    plot_throughput_benchmarks(bms, num_child_streams)

           
# plot_single_child_throughputs(MATRIX_TP)
# plot_multi_child_throughputs(MATRIX_TP)
# plot_multi_child_throughputs(COUNT_TP)
# plot_multi_child_throughputs(ROOT_TP)
plot_num_keys(KEYS_TP)

In [None]:
def plot_both_scale(dist_avg, central_avg, dist_median, central_median):
    markers = ["x", "^", "o"]
    
#     print(rcParams.keys())
    font_size = 16
    plt.rc('font', family='serif', serif='Times')
    plt.rc('xtick', labelsize=font_size)
    plt.rc('ytick', labelsize=font_size)
    plt.rc('axes', labelsize=font_size)
    plt.rc('figure', autolayout=False)
    plt.rc('font', size=font_size)
    plt.rc('lines', linewidth=4)
    plt.rc('lines', markersize=8)
    plt.rc('lines', markeredgewidth=2)
    plt.style.use('seaborn-deep')
    plt.rc('figure', figsize=(4.5, 3))
    
    fig = plt.figure()
#     plt_avg = fig.add_subplot(1, 2, 1)
    plt_avg = fig.add_subplot(1, 1, 1)
    plt_avg.plot([1, 2, 4, 8], dist_avg, marker=markers[0], ms=10)
    plt_avg.plot([1, 2, 4, 8], central_avg, marker=markers[1])

    plt_avg.set_xticks(range(1, 9))
    plt_avg.set_ylabel("events/s in million")
    plt_avg.set_xlabel("# children")
    plt_avg.set_ylim(ymin=0, ymax=8)
    
#     plt_med = fig.add_subplot(1, 2, 2)
#     plt_med = fig.add_subplot(1, 1, 1)
#     plt_med.plot([1, 2, 4, 8], dist_median, marker=markers[0], label=f"distributed")
#     plt_med.plot([1, 2, 4, 8], central_median, marker=markers[1], label=f"centralized")
        
#     plt_med.set_xticks(range(1, 9))
# #     plt_med.set_ylabel("events/s in million")
#     plt_med.set_xlabel("# children")
#     plt_med.set_ylim(ymin=0, ymax=0.21)
    
    
#     fig.legend(["distributed", "centralized"], fontsize=font_size, loc="upper center", ncol=2)
    plt.savefig(f"/tmp/paper_plots/scale_avg.pdf", bbox_inches="tight")
    fig.show()

def plot_scale_throughput_benchmarks(bms, num_child_streams):
    groups = group_throughput_benchmarks(bms, num_child_streams)
    print(f"groups: {groups['distributed'][('TUMBLING,1000', 'DISTRIBUTED')]}")
    dist_avg = groups['distributed'][('TUMBLING,1000', 'DISTRIBUTED')]["M_AVG"]
    central_avg = groups['centralized'][('TUMBLING,1000', 'SINGLE_NODE')]["M_AVG"]
    
    dist_median = groups['distributed'][('TUMBLING,1000', 'DISTRIBUTED')]["M_MEDIAN"]
    central_median = groups['centralized'][('TUMBLING,1000', 'SINGLE_NODE')]["M_MEDIAN"]
#     dist_median = [x * 100000 for x  in dist_median]
#     central_median = [x * 100000 for x in central_median]
    plot_both_scale(dist_avg, central_avg, dist_median, central_median)
    
def plot_sessions(bms):
    num_child_streams = [(1, 1), (2, 2), (4, 4), (8, 8)]
    plot_scale_throughput_benchmarks(bms, num_child_streams)
    
plot_sessions(MATRIX_TP)

In [None]:
OUT_OF_ORDER = [0, 40.9, 61.3, 91.4]

def plot_throughput_by_agg_fn(child_streams, throughputs):
    print(throughputs)
    formats = iter(FORMATS)
    data = defaultdict(list)
    for mode, run in sorted(throughputs):
        for agg_fn, tps in run.items():
            data[agg_fn].append(tps)

    print(data)

    fig, ax1 = plt.subplots()
    for agg_fn, (dist, single) in data.items():
        if agg_fn != "MAX": continue
        ax1.plot([1, 2, 4, 8], dist, label="distributed max", marker="o", ms=7)
        ax1.plot([1, 2, 4, 8], single, label="centralized max", marker="^", ms=8)
        ax1.set_ylabel("events/s in mio.")
        ax1.set_xlabel("# input streams")
        ax1.set_ylim(ymin=0, ymax=1.1 * max(dist))
        ax1.set_xlim(xmin=0.5)
    
    ax2 = ax1.twinx()
    ax2.plot([1, 2, 4, 8], OUT_OF_ORDER, color="crimson", ls="--")
    ax2.set_ylabel("% out-of-order events")
    ax2.set_ylim(ymin=0)
    ax2.set_xlim(xmin=0.5)
    ax2.set_xticks(range(1, 9))
    ax2.set_xticklabels((1, 2, "", 4, "", "", "", 8))
    
    ax1.legend(["distributed max", "centralized max"], loc="center right")
    ax2.legend(["out-of-orderness"], loc="upper center")
    
    fig.savefig(f"/tmp/plots/count_scale.pdf", bbox_inches="tight")
    fig.show()    

#     for agg_fn, (dist, single) in data.items():
#         if agg_fn != "MAX": continue
#         plt.plot([1, 2, 4, 8], dist, label="distributed max", marker="o", ms=7)
#         plt.plot([1, 2, 4, 8], single, label="centralized max", marker="^", ms=8)
#         plt.plot([1, 2, 4, 8])
#         plt.legend()
#         plt.ylabel("events/s in mio.")
#         plt.xlabel("# input streams")
#         plt.ylim(ymin=0, ymax=1.1 * max(dist))
#         plt.xlim(xmin=0.5)
# #         plt.savefig(f"/tmp/plots/count_scale_{agg_fn}.pdf", bbox_inches="tight")
#         plt.show()

def plot_tumbling_scale(bms):
    num_child_streams = [(1, 1), (2, 2), (4, 4), (8, 8)]
    groups = group_throughput_benchmarks(bms, num_child_streams)
    groups = [(bm[1], tps) for mode, g in groups.items() for bm, tps in g.items() if bm[0].startswith("TUMBLING")]
    plot_throughput_by_agg_fn(num_child_streams, groups)
    
def plot_tumbling_single_node(bms):
    num_child_streams = [(1, 1), (1, 2), (1, 4), (1, 8)]
    groups = group_throughput_benchmarks(bms, num_child_streams)
    groups = [(bm[1], tps) for mode, g in groups.items() for bm, tps in g.items() if bm[0].startswith("TUMBLING")]
    plot_throughput_by_agg_fn(num_child_streams, groups)
    


    
# plot_tumbling_scale(MATRIX_TP)
# plot_tumbling_single_node(MATRIX_TP)
plot_tumbling_scale(COUNT_TP)

In [None]:
def plot_concurrent_throughput_group(num_windows, throughputs):
    markers = iter(['o', '^', 'x', '*'])
    print(throughputs)
    for (agg_fn, mode), tps in sorted(throughputs):
        if agg_fn == "M_MEDIAN": continue
        mode_str = "distributed" if mode == "DISTRIBUTED" else "centralized"
        plt.plot(num_windows, tps, marker=next(markers), ms=8, label=f"{mode_str} - {agg_fn.replace('M_', '').lower()}")

    plt.ylabel("events/s in mio.")
    plt.xlabel("# concurrent windows")
    plt.legend()
    plt.xscale("log")
    plt.ylim(ymin=0)
    plt.savefig(f"/tmp/plots/concurrent_decomposable.png", bbox_inches="tight")
    plt.savefig(f"/tmp/plots/concurrent_decomposable.pdf", bbox_inches="tight")
    plt.show()
    
bm_throughputs = defaultdict(list)
bm_num_windows = set()
for benchmark, run_throughputs in sorted(CONCURRENT_TP.items(), key=lambda x: int(x[0][0].split(",")[1])):
    print(f"Benchmark {benchmark}")
    num_windows = int(benchmark[0].split(",")[1])
    bm_num_windows.add(num_windows)
    agg_fn = benchmark[1]
    mode = benchmark[2]
    for throughput in run_throughputs.values(): 
        bm_throughputs[(agg_fn, mode)].append(throughput / 1_000_000)

# print(bm_throughputs)

plot_concurrent_throughput_group(sorted(bm_num_windows), bm_throughputs.items())

# for (agg_fn, mode), tps in sorted(bm_throughputs.items()):
#     plot_concurrent_throughput_group(sorted(bm_num_windows), (agg_fn.replace('M_', '').lower(), mode, tps), f"{agg_fn.replace('M_', '')} - {mode}")
    # plt.savefig(f"/tmp/plots/concurrent_max_tumbling.png")
    # plt.close()

In [None]:
def plot_both_sessions(child_streams, dist, central):
    markers = iter(["x", "^", "o"])   
    print(dist)
    print(central)

    plt.plot(dist, marker=next(markers), label="distributed max")
    plt.plot(central, marker=next(markers), label="centralized max")
        
    plt.legend()
    str_child_streams = [str(cs) for cs in child_streams]
    plt.xticks(range(3), ("2 (3)", "3 (7)", "4 (15)"))
    plt.ylabel("events/s in mio.")
    plt.xlabel("height of balanced tree (# total nodes)")
    plt.ylim(ymin=0) #, ymax=5.5)
    plt.savefig(f"/tmp/plots/session_tp.pdf")
    plt.show()

def plot_session_throughput_benchmarks(bms, num_child_streams):
    groups = group_throughput_benchmarks(bms, num_child_streams)
    print(groups)
    dist = list(groups['distributed'].values())[0]["MAX"]
    central = list(groups['centralized'].values())[0]["MAX"]
    plot_both_sessions(num_child_streams, dist, central)
    
def plot_sessions(bms):
    num_child_streams = [(2, 2), (4, 4), (8, 8)]
    plot_session_throughput_benchmarks(bms, num_child_streams)
    
plot_sessions(SESSION_TP)