In [None]:
import pandas as pd
import matplotlib
import matplotlib.pyplot as plt
import matplotlib.dates as mdates
import matplotlib.patches as patches
import matplotlib.gridspec as gridspec
from matplotlib import lines, markers
import seaborn as sns
import numpy as np
import os
import glob 
import re
from collections import defaultdict
from scipy import stats
import subprocess
from itertools import cycle
from math import floor, ceil, sqrt
plt.style.use('ggplot')
sns.choose_colorbrewer_palette('qualitative')
from ipywidgets import interact, interactive, fixed, interact_manual
import ipywidgets as widgets
%matplotlib inline
matplotlib.rcParams['pdf.fonttype'] = 42
matplotlib.rcParams['ps.fonttype'] = 42
matplotlib.rcParams['hatch.linewidth'] = 0.2
matplotlib.rcParams['xtick.labelsize'] = 10



In [None]:
class ExperimentParameter:
    def __init__(self, key, units, label=None, name=None, timeSeries=True, componentAggregation=np.sum, queryAggregation=np.sum, executionAggregation=np.mean):
        self.key = key
        self.units = units
        self.label = key if label is None else label
        self.name = key if name is None else name
        self.timeSeries = timeSeries
        self.executionAggregation = executionAggregation
        self.queryAggregation = queryAggregation
        self.componentAggregation = componentAggregation
        
    def __str__(self):
        return f'{self.label} ({self.units})'

In [None]:
WARMUP_PERCENTAGE = 0.1
COOLDOWN_PERCENTAGE = 0.1
OUTPUT_DIRECTORY=''
COMMIT = ''
REPORT_FOLDER=''
DATA = None
PARAMETER_LABELS = {'rate': 'Rate (t/s)', 'latency': 'Latency (s)', 'cpu': 'CPU (%)', 'memory': 'Memory (MB)'}


TEXT_FIGURES_PATH='data/output'

def selectOutput(outputType):
    global OUTPUT_DIRECTORY
    OUTPUT_DIRECTORY=f'data/{outputType}'
    interact(selectCommit, commitID=sortedLsByTime(OUTPUT_DIRECTORY))
    
def sortedLsByTime(path):
    try:
        mtime = lambda f: os.stat(os.path.join(path, f)).st_mtime
        dirs = list(sorted(os.listdir(path), key=mtime, reverse=True))
        return [directory for directory in dirs if os.path.isdir(os.path.join(path, directory))]
    except Exception as e:
        print(e)
        return []
    
def selectCommit(commitID):
    global COMMIT
    if commitID:
        COMMIT=commitID.split('/')[-1]
        
def loadData(folder):
    global DATA
    
    def removeWarmupCooldown(df):
        tmax = df.t.max()
        warmup = floor(tmax * WARMUP_PERCENTAGE)
        cooldown = ceil(tmax - tmax * COOLDOWN_PERCENTAGE)
        df.loc[(df.t < warmup) | (df.t > cooldown), 'value'] = np.nan
        return df
    
    def subtractMin(df, key):
        df[key] -= df[key].min()
        return df

    def readCsv(file):
        if not file:
            return pd.DataFrame()
        df = pd.read_csv(f'{file}', names=('rep', 'parallelism', 'node', 'idx', 't', 'value'))
        df['rep'] = df['rep'].astype(int)
        df['parallelism'] = df['parallelism'].astype(int)
        df['value'] = df['value'].astype(float)
        df['t'] = df['t'].astype(int)
        df = df.groupby(['rep', 'parallelism']).apply(subtractMin, key='t')
        df = df.groupby(['rep', 'parallelism']).apply(removeWarmupCooldown)
        return df

    dataFrames = []
    for experimentDir in os.listdir(REPORT_FOLDER):
        if not os.path.isdir(REPORT_FOLDER + '/' + experimentDir):
            continue
        experimentName, experimentVariant = experimentDir.split('_')
        for dataFile in glob.glob(REPORT_FOLDER + '/' + experimentDir + '/' + '*.csv'):
            parameter = dataFile.split('/')[-1].split('.')[0]
            try:
                df = readCsv(dataFile)
            except Exception as e:
                print(f'Failed to read {dataFile}')
                continue
            df['parameter'] = parameter
            df['experiment'] = experimentName
            df['variant'] = experimentVariant
            dataFrames.append(df)
    DATA = pd.concat(dataFrames, sort=False)
        
def on_button_clicked(b):
    global REPORT_FOLDER
    REPORT_FOLDER = f'{OUTPUT_DIRECTORY}/{COMMIT}'
    loadData(REPORT_FOLDER)
    print(f'Done!')
    

interact(selectOutput, outputType={'local': 'output'})

button = widgets.Button(description="Load Data")
button.on_click(on_button_clicked)
display(button)


In [None]:
def get(**kwargs):
    if len(kwargs) == 0:
        raise ValueError('Need at least one argument!')
    queryParts = []
    for key, value in kwargs.items():
        queryParts.append(f'({key} == "{value}")')
    queryStr = ' & '.join(queryParts)
    return DATA.query(queryStr)


def percentageDiff(value, reference):
    return 100*(value - reference) / reference

def get95CI(data):
    return (1.96*np.std(data))/np.sqrt(len(data))

def expandSyntheticCols(df):
    df = df.copy()
    df[['variant', 'sourceParallelism', 'sinkParallelism', 'provenanceOverlap','provenanceSize']] = df.variant.str.split('\.', expand=True)
    df[['sourceParallelism', 'sinkParallelism', 'provenanceOverlap', 'provenanceSize']] = df[['sourceParallelism', 'sinkParallelism', 'provenanceOverlap','provenanceSize']].apply(pd.to_numeric)
    return df

    
SYSTEM_PARAMETERS = ['gc_count_old', 'gc_count_young', 'gc_time_old', 'gc_time_young', 'memory']
DATA['kind'] = 'user'
DATA.loc[DATA['parameter'].isin(SYSTEM_PARAMETERS), 'kind'] = 'system'
# Extract transparent/non-transparent variant ending in 1 or 2
DATA['transparent'] = DATA['variant'].str.contains('[a-zA-Z]+2[a-zA-Z]?$', regex=True)
DATA['variant'] = DATA['variant'].str.replace('^GL\d$', 'GL', regex=True)
DATA['variant'] = DATA['variant'].str.replace('LIN', 'ANK', regex=True)
DATA['variant'] = DATA['variant'].str.replace('^ANK[12]$', 'ANK-1', regex=True)
DATA['variant'] = DATA['variant'].str.replace('^ANK[12]S$', 'ANK-N', regex=True)
DATA['variant'] = DATA['variant'].str.replace('^ANK\.', 'ANK-1.', regex=True)
DATA['variant'] = DATA['variant'].str.replace('^ANKS\.', 'ANK-N.', regex=True)


DATA.loc[DATA['value'] < 0, 'value'] = np.nan
DATA.loc[DATA['parameter'] == 'latency', 'value'] /= 1e3 # Convert latency to seconds

print(f'=> Commit: {COMMIT}')
print('-'*100)

print(f'{"Experiment": <20}{"Variant": <20}{"Transparent": <15}{"Reps": <7}{"Parallelism": <15}{"Duration"}')
print('-'*100)
for label, group in DATA.groupby(['experiment', 'variant', 'transparent', 'parallelism']):
    reps = group.rep.nunique()
    # Get tmax except logical latency, since this has dummy timestamps
    duration = group.query('parameter != "logical-latency"').t.max() / 60 
    print(f'{label[0]: <20}{label[1]: <20}{label[2]: < 15}{reps: <7}{label[3]: <15}{duration:3.1f} min')

In [None]:
@interact(code=['lr', 'sg', 'carlocal', 'carcloud', 'synthetic1', 'synthetic2'])
def selectFigureCode(code):
    global FIGURE_CODE
    FIGURE_CODE = code

## Comparison with the State-of-the-Art

In [None]:
def soaComparison2(figsize=(6.5,3.5)):
    
    def aggregageAll(df, timeFunc=np.mean, nodeFunc=np.mean):
        assert DATA.parallelism.nunique() == 1
        assert DATA.experiment.nunique() == 1
        data = df.copy()
        data = data.groupby(['rep', 'parallelism', 'variant', 'transparent', 'node', 'idx']).aggregate({'value': timeFunc})\
                    .groupby(level=['rep', 'parallelism', 'variant', 'transparent']).aggregate({'value': nodeFunc})\
                    .reset_index()
        data.columns = ['rep', 'parallelism', 'variant', 'transparent', 'value']
        return data

    def dataFor(parameter, timeFunc=np.mean, nodeFunc=np.mean):
        df = aggregageAll(get(parameter=parameter), timeFunc, nodeFunc)
        df = df[(df.variant != 'GL') | (df.transparent == False)]
        df.loc[df.transparent == True, 'variant'] += '/T'
        return df

    assert DATA.parallelism.nunique() == 1
    assert DATA.experiment.nunique() == 1
    ORDER=None
    COLORS=None
    ORDER = ['NP', 'GL', 'ANK-1', 'ANK-1/T', 'ANK-N', 'ANK-N/T']
    COLORS = ['C0', 'C1', 'C2', 'C2', 'C3', 'C3']
    
    fig, axes = plt.subplots(ncols=2, nrows=2, figsize=figsize, sharey=False, sharex=True, squeeze=False)
    axes = axes.flatten()
    hue = None
    # Absolute values
    sns.barplot(x='variant', y='value', data=dataFor('rate'), ax=axes[0], order=ORDER, palette=COLORS)
    sns.barplot(x='variant', y='value', data=dataFor('latency'), ax=axes[1], order=ORDER, palette=COLORS)
    sns.barplot(x='variant', y='value', data=dataFor('memory'), ax=axes[2], order=ORDER, palette=COLORS)
    sns.barplot(x='variant', y='value', data=dataFor('cpu', nodeFunc=np.sum), ax=axes[3], order=ORDER, palette=COLORS)
    
    for ax in axes:
        ax.set_ylabel('')
        ax.set_xlabel('')
    axes[0].set_title('Rate (t/s)', size=12)
    axes[1].set_title('Latency (s)', size=12)
    axes[2].set_title('Memory (MB)', size=12)
    axes[3].set_title('CPU Utilization (%)', size=12)
    
    for i, ax in enumerate(axes):
        print(f'Percent from NP => [{ax.get_title()}]:', end=' ')
        referenceHeight = ax.patches[0].get_height()
        ax.patches[3].set_hatch('/////')
        ax.patches[5].set_hatch('/////')
        for p in ax.patches[1:]:
            height = p.get_height()
            diff = percentageDiff(height, referenceHeight)
        referenceHeight2 = ax.patches[1].get_height()
        for p in ax.patches[2:]:
            height = p.get_height()
            diff = percentageDiff(height, referenceHeight2)
            ax.text(p.get_x()+p.get_width()/2,
                    height - (height*0.7),
                    f'{diff:+2.1f}%',
                    ha='center', rotation=90, size=9, family='sans', weight='bold', color='#ffffff') 


    fig.tight_layout()
    fig.autofmt_xdate(ha='right', rotation=25)
    fig.savefig(f'{REPORT_FOLDER}/eval_soa_comp_{FIGURE_CODE}.pdf', pad_inches=.1, bbox_inches='tight',)
    fig.savefig(f'{TEXT_FIGURES_PATH}/eval_soa_comp_{FIGURE_CODE}.pdf', pad_inches=.1, bbox_inches='tight',)
soaComparison2()

## Preprocess & Store Logical Latency Data

Does some preprocessing on the logical latency of this experiment and stores a new dataframe named using the commit code of the experiment in the specified directory.

In [None]:
LOGICAL_LATENCY_OUTPUT_DIRECTORY='data/output'

def preprocessLogicalLatency():
    def secondsToMillis(s):
        return s * 1000
    def minutesToMillis(m):
        return secondsToMillis(m * 60)
    def hoursToMillis(h):
        return minutesToMillis(60*h)
    DELAY_CONSTANTS = {'lr': secondsToMillis(120+30), 'sg': hoursToMillis(48), 
                       'carlocal': secondsToMillis(6), 'carcloud': secondsToMillis(300)}
    delayConstant = DELAY_CONSTANTS[FIGURE_CODE]
    df = get(parameter='logical-latency').copy()
    assert df.parallelism.nunique() == 1
    df = df[df.transparent == False]
    df.drop(columns=['kind', 'transparent', 'parameter', 'idx', 'parallelism'], inplace=True)
    df['value'] /= delayConstant
    outputFile = f'{LOGICAL_LATENCY_OUTPUT_DIRECTORY}/logical-latency-agg-{COMMIT}.csv'
    df.to_csv(outputFile, index=False)
    print(f'Saved {outputFile}')
preprocessLogicalLatency()

## Plot Logical Latency Comparison

Loads logical latency dataframes from multiple experiments and plots a comparison. **The commit codes of the dataframes need to be specified!**

In [None]:
def plotLogicalLatency(commits):
    NODE_TYPES={'V0': 'SINK', 'V1': 'SOURCE', 'A0': 'SINK-L', 'A1': 'SOURCE-L', '2': 'EDGE'}
    QUERY_DICT = {'CarLocalQueries': 'OA', 'CarCloudQueries': 'VT', 'LinearRoadCombined': 'LR', 'SmartGridCombined': 'SG'}
    QUERY_ORDER = ['LR', 'SG', 'VT', 'OA']
    NODE_ORDER = ['SINK', 'SOURCE', 'EDGE', 'SINK-L', 'SOURCE-L']
    dfs = [pd.read_csv(f'{LOGICAL_LATENCY_OUTPUT_DIRECTORY}/logical-latency-agg-{commit}.csv') for commit in commits]
    df = pd.concat(dfs, ignore_index=True, sort=False)
    df['nodeType'] = df.node.apply(lambda name: NODE_TYPES[name.split('-')[-1]])
    df['experiment'] = df.experiment.apply(lambda name: QUERY_DICT[name])
    df = pd.pivot_table(df, values=['value'], index=['variant', 'rep', 'experiment', 'nodeType']).reset_index()
    g = sns.catplot(x='nodeType', y='value', data=df, hue='experiment', kind='bar', order=NODE_ORDER, 
                    legend_out=False, col_order=['ANK-1', 'ANK-N'], aspect=1.4, height=2.5, col='variant', hue_order=QUERY_ORDER)
    g.set_xlabels('').set_ylabels('Provenance Latency (U)').set_titles('{col_name}')
    g.add_legend(title='Query', ncol=2)
    g.fig.autofmt_xdate(ha='right', rotation=25)
    g.fig.savefig(f'{TEXT_FIGURES_PATH}/eval_logical_latency.pdf', pad_inches=.1, bbox_inches='tight')

# ----
# Configure: Change commit codes accordingly!
# ----
plotLogicalLatency(['3c79e14_120_1630', '9a51bd9_118_1455', '8175c59_115_1251', '8175c59_119_0803'])

## Synthetic-1 Plot (varying overlap and provenance size)

In [None]:
def synthetic1():
    def average(df):
        assert df.transparent.nunique() == 1
        assert df.sourceParallelism.nunique() == 1
        assert df.sinkParallelism.nunique() == 1
        assert df.parallelism.nunique() == 1
        data = df.copy()
        data = data.groupby(['rep', 'provenanceOverlap', 'provenanceSize', 'variant', 'parameter', 'node', 'idx']).aggregate({'value': np.mean})\
                    .groupby(level=['rep', 'provenanceOverlap', 'provenanceSize', 'variant', 'parameter']).aggregate({'value': np.mean})\
                    .reset_index()
        data.columns = ['rep', 'provenanceOverlap', 'provenanceSize', 'variant', 'parameter', 'value']
        return data
    ROW_ORDER=['rate', 'latency', 'cpu', 'memory']
    g = sns.catplot(col='variant', y='value', x='provenanceSize', row='parameter', data=average(expandSyntheticCols(DATA)), hue='provenanceOverlap', 
                sharey='row', kind='bar', col_order=['ANK-1', 'ANK-N'], row_order=ROW_ORDER, height=1.5, aspect=1.8,
                legend=False)
    g.set_axis_labels('Provenance (#tuples)', ROW_ORDER).set_titles("{row_name} | {col_name}")
    for i, axes_row in enumerate(g.axes):
        for j, axes_col in enumerate(axes_row):
            row, col = axes_col.get_title().split('|')

            if i == 0:
                axes_col.set_title(col.strip())
            else:
                axes_col.set_title('')

            if j == 0:
                ylabel = axes_col.get_ylabel()
                axes_col.set_ylabel(PARAMETER_LABELS[row.strip()])
    for ax in g.axes.flat:
        ax.ticklabel_format(axis='y', style='sci', scilimits=(0, 3), useMathText=True)

    g.fig.align_ylabels(g.axes[:, 0])
    g.add_legend(title='Provenance Overlap (%)', loc='lower center', ncol=3)
    g.fig.tight_layout()
    g.fig.subplots_adjust(bottom=0.2)
    g.fig.savefig(f'{REPORT_FOLDER}/eval_synthetic_comp_{FIGURE_CODE}.pdf', pad_inches=.1, bbox_inches='tight')
    g.fig.savefig(f'{TEXT_FIGURES_PATH}/eval_synthetic_comp_{FIGURE_CODE}.pdf', pad_inches=.1, bbox_inches='tight')

synthetic1()

## Synthetic-2 Plot (Varying parallelism and #queries)

In [None]:
def synthetic2():
    def average(df):
        assert df.provenanceOverlap.nunique() == 1
        assert df.provenanceSize.nunique() == 1
        assert df.parallelism.nunique() == 1
        data = df.copy()
        data = data.groupby(['rep', 'sourceParallelism', 'sinkParallelism', 'variant', 'parameter', 'node', 'idx']).aggregate({'value': np.mean})\
                    .groupby(level=['rep', 'sourceParallelism', 'sinkParallelism', 'variant', 'parameter']).aggregate({'value': np.mean})\
                    .reset_index()
        data.columns = ['rep', 'sourceParallelism', 'sinkParallelism', 'variant', 'parameter', 'value']
        return data
    ROW_ORDER=['rate', 'latency', 'cpu', 'memory']
    g = sns.catplot('sourceParallelism', 'value', kind='bar', row='parameter', data=average(expandSyntheticCols(DATA)), 
                sharey='row', hue='sinkParallelism', col_order=['ANK-1', 'ANK-N'], row_order=ROW_ORDER,
                height=1.5, aspect=1.2,col='variant',legend=False)

    g.set_axis_labels('#queries', ROW_ORDER).set_titles("{row_name} | {col_name}")
    for i, axes_row in enumerate(g.axes):
        for j, axes_col in enumerate(axes_row):
            row, col = axes_col.get_title().split('|')

            if i == 0:
                axes_col.set_title(col.strip())
            else:
                axes_col.set_title('')

            if j == 0:
                ylabel = axes_col.get_ylabel()
                axes_col.set_ylabel(PARAMETER_LABELS[row.strip()])

    for ax in g.axes.flat:
        ax.set_yscale('log')
    g.fig.align_ylabels(g.axes[:, 0])
    g.add_legend(title='Parallelism', loc='lower center', ncol=5)
    g.fig.tight_layout()
    g.fig.subplots_adjust(bottom=0.2)
    g.fig.savefig(f'{REPORT_FOLDER}/eval_synthetic_comp_{FIGURE_CODE}.pdf', pad_inches=.1, bbox_inches='tight')
    g.fig.savefig(f'{TEXT_FIGURES_PATH}/eval_synthetic_comp_{FIGURE_CODE}.pdf', pad_inches=.1, bbox_inches='tight')
    
synthetic2()