# Experiments

## Common

In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

colors_cycle = plt.rcParams['axes.prop_cycle'].by_key()['color']
markers_cycle = ['o', '^', 'D', '*', 'P']

In [None]:
def plot_df(df, title, colors, markers, ax=None, **kwargs):
    for (col, marker, color) in zip(df.columns, markers, colors):
        ax = df[col].plot(
            title=title.capitalize(),
            ax=ax,
            c=color,
            marker=marker,
            markersize={'*': 8, 'P': 7}.get(marker, 6),
            **kwargs
        )

    ax.grid(True, linestyle='--')
    return ax

def plot_dfs(dfs, subtitles, title, dim, axs=None, colors=None, markers=None, figsize=None, **kwargs):
    if colors is None:
        colors = colors_cycle[:len(dfs[0].columns)]
    if markers is None:
        markers = [''] * len(dfs[0].columns)
    elif markers is True:
        markers = markers_cycle[:len(dfs[0].columns)]
    if figsize is None:
        figsize = (6.4 * dim[1], 4.8 * dim[0])
    if axs is None:
        fig = plt.figure(figsize=figsize)
        axs = fig.subplots(
            nrows=dim[0],
            ncols=dim[1],
            sharex=True,
            sharey=False,
        ).flatten()
        fig.suptitle(title.capitalize())
        fig.set_dpi(300)
    for (df, subtitle, ax) in zip(dfs, subtitles, axs):
        plot_df(df, title=subtitle, ax=ax, markers=markers, colors=colors, legend=False, **kwargs)
    handles, labels = axs.flatten()[0].get_legend_handles_labels()
    axs.flatten()[0].legend(handles, labels, loc='upper left')
    return axs

In [None]:
def load_df(path):
    raw_df = pd.read_csv(path)

    raw_df['type'] = raw_df['workload'].str[-3:]
    raw_df['workload'] = raw_df['workload'].str[:-3].replace({
        'Folklore': 'Folklore*',
        'FineGrainedLocking': 'Locking',
        'Leap': 'Leapfrog',
        'Partitioned': 'Partition',
        'ThreadLocal': 'Thread Local'
    })

    raw_df['cardinality'] = (
        (raw_df['keys'] >= raw_df['elements']) * 2
        + ((raw_df['keys'] < raw_df['elements']) & (raw_df['keys'] > 10_000)) * 1
    ).replace({2: 'Unique', 1: 'High', 0: 'Low'})
    raw_df['distribution'] = (((raw_df['zipf'] > 0) * 2) + ((raw_df['heavy hitter'] > 0) * 1)).replace({2: 'Zipf', 1: 'Heavy Hitter', 0: 'Uniform'})

    raw_df['latency'] = raw_df[['t1', 't2', 't3', 't4', 't5', 't6', 't7', 't8', 't9']].replace(0, np.nan).median(axis=1)
    raw_df['throughput'] = raw_df['elements'] / raw_df['latency']
    raw_df['time per element'] = raw_df['latency'] / raw_df['elements']
    raw_df['throughput per thread'] = raw_df['throughput'] / raw_df['threads']

    return raw_df

In [None]:
THREADS = [1, 2, 4, 8, 16, 32]
THREADS_LABELS = [1, '', 4, 8, 16, 32]

# Scaling Experiment

In [None]:
raw_scaling_df = load_df('../data/scaling_experiment.csv')
scaling_df = pd.pivot_table(raw_scaling_df, values=['latency', 'throughput', 'throughput per thread', 'time per element'], index=['workload', 'cardinality', 'distribution', 'type'], columns='threads')
for thread in THREADS:
    scaling_df[('speedup', thread)] = scaling_df[('latency', 1)] / scaling_df[('latency', thread)]

In [None]:
def plot_scaling_df(df, colors=None, markers=True):
    dim = (2, 5)
    linewidth = 2
    fig = plt.figure(figsize=(2.4 * dim[1], 3.2 * dim[0]), layout='constrained')
    fig.set_dpi(300)
    axs = fig.subplots(
        nrows=dim[0],
        ncols=dim[1],
        sharex=True,
        sharey=False,
    )
    for ax in axs.flatten()[dim[1]:]:
        ax.plot(THREADS, THREADS, label='Ideal', linestyle='--', color='black', linewidth=linewidth)

    plot_dfs([
        df.xs('Uniform', level=2).xs('Low', level=1).T.loc['throughput'],
        df.xs('Uniform', level=2).xs('High', level=1).T.loc['throughput'],
        df.xs('Zipf', level=2).xs('High', level=1).T.loc['throughput'],
        df.xs('Heavy Hitter', level=2).xs('High', level=1).T.loc['throughput'],
        df.xs('Uniform', level=2).xs('Unique', level=1).T.loc['throughput'],
    ], [
        'low cardinality (uniform)',
        'high cardinality (uniform)',
        'high cardinality (zipfian)',
        'high cardinality (heavy hitter)',
        'unique keys (uniform)',
    ],
        None,
        (1, dim[1]),
        axs=axs[0],
        colors=colors,
        markers=markers,
        xlabel='threads'.capitalize(),
        ylabel='throughput (elements/sec)'.capitalize(),
        logx=False,
        logy=False,
        linewidth=linewidth,
    )
    plot_dfs([
        df.xs('Uniform', level=2).xs('Low', level=1).T.loc['speedup'],
        df.xs('Uniform', level=2).xs('High', level=1).T.loc['speedup'],
        df.xs('Zipf', level=2).xs('High', level=1).T.loc['speedup'],
        df.xs('Heavy Hitter', level=2).xs('High', level=1).T.loc['speedup'],
        df.xs('Uniform', level=2).xs('Unique', level=1).T.loc['speedup'],
    ], [
        'low cardinality (uniform)',
        'high cardinality (uniform)',
        'high cardinality (zipfian)',
        'high cardinality (heavy hitter)',
        'unique keys (uniform)',
    ],
        None,
        (1, dim[1]),
        axs=axs[1],
        colors=colors,
        markers=markers,
        xlabel='threads'.capitalize(),
        ylabel='speedup'.capitalize(),
        logx=False,
        logy=False,
        linewidth=linewidth,
    )

    for (i, ax) in enumerate(axs.flatten()):
        ax.set_xticks(THREADS, THREADS_LABELS)
        ax.set_xticks([], [], minor=True)
        if i >= dim[1]:
            ax.set_title('')
        else:
            print(ax.yaxis.offsetText)
        if i % dim[1] != 0:
            ax.set_ylabel('')

### Graphs

In [None]:
ticketing_scaling_df = scaling_df.xs('Map', level=3).T[['Cuckoo', 'Dash', 'Folklore*', 'Iceberg', 'Leapfrog']].T

plot_scaling_df(ticketing_scaling_df)
plt.savefig('../figures/ticketing_scaling.svg')

In [None]:
pau_scaling_df = scaling_df.xs('Pau', level=3).T[['Atomic', 'Locking', 'Thread Local']].T

plot_scaling_df(pau_scaling_df)
plt.savefig('../figures/update_scaling.svg') 

In [None]:
end_to_end_df = scaling_df.xs('E2E', level=3).T[['Atomic', 'Thread Local', 'Partition']].T

plot_scaling_df(end_to_end_df, colors=(colors_cycle[0:1] + colors_cycle[2:4]), markers=(markers_cycle[0:1] + markers_cycle[2:4]))
plt.savefig('../figures/e2e_scaling.svg')

### Speedup Table

In [None]:
for card in ['Low', 'High', 'Unique']:
    print('\\midrule')
    for dist in ['Uniform', 'Zipf', 'Heavy Hitter']:
        part_lat = end_to_end_df.loc['Partition', card, dist]['latency']
        atomic_lat = end_to_end_df.loc['Atomic', card, dist]['latency']
        tl_lat = end_to_end_df.loc['Thread Local', card, dist]['latency']
        vals = [
            part_lat[1] / atomic_lat[1],
            part_lat[1] / tl_lat[1],
            part_lat[4] / atomic_lat[4],
            part_lat[4] / tl_lat[4],
            part_lat[16] / atomic_lat[16],
            part_lat[16] / tl_lat[16],
            part_lat[32] / atomic_lat[32],
            part_lat[32] / tl_lat[32],
        ]
        val_strs = []
        for val in vals:
            if val >= 0.9 and val <= 1.1:
                val_strs.append('\\underline{}{:.2f}{}'.format('{', val, '}'))
            elif val > 1.1:
                val_strs.append('\\textbf{}{:.2f}{}'.format('{', val, '}'))
            else:
                val_strs.append('{:.2f}'.format(val))
        print('{} ({}) & {} & {} & {} & {} & {} & {} & {} & {} \\\\'.format(
            {'Low' : 'Low cardinality', 'High': 'High cardinality', 'Unique': 'Unique keys'}[card] ,
            {'Uniform' : 'uniform', 'Zipf': 'zipfian', 'Heavy Hitter': 'heavy hitter'}[dist],
            val_strs[0],
            val_strs[1],
            val_strs[2],
            val_strs[3],
            val_strs[4],
            val_strs[5],
            val_strs[6],
            val_strs[7],
        ))

## Fuzzy Ticketing Ablation

In [None]:
fuzzy_df = load_df('../data/fuzzy_ticketing_experiment.csv')

dim = (1, 2)
fig = plt.figure(figsize=(2.4 * dim[1], 3.2 * dim[0]), layout='constrained')
fig.set_dpi(300)
axs = fig.subplots(
    nrows=dim[0],
    ncols=dim[1],
)
axs[0].bar(['Atomic', 'Fuzzy'], [
    fuzzy_df[(fuzzy_df['workload'] == 'FolkloreUnfuzzy') & (fuzzy_df['keys'] == 1000)].reset_index().loc[0]['latency'],
    fuzzy_df[(fuzzy_df['workload'] == 'Folklore*') & (fuzzy_df['keys'] == 1000)].reset_index().loc[0]['latency']
])
axs[0].set_ylabel('Latency (sec)')
axs[0].set_title('Low cardinality (uniform)')
axs[0].grid(axis='y')

axs[1].bar(['Atomic', 'Fuzzy'], [
    fuzzy_df[(fuzzy_df['workload'] == 'FolkloreUnfuzzy') & (fuzzy_df['keys'] == 10000000)].reset_index().loc[0]['latency'],
    fuzzy_df[(fuzzy_df['workload'] == 'Folklore*') & (fuzzy_df['keys'] == 10000000)].reset_index().loc[0]['latency']
])
axs[1].set_title('High cardinality (uniform)')
axs[1].grid(axis='y')

plt.savefig('../figures/fuzzy_ticketer.svg')

## Latency Breakdown Ablation

In [None]:
# Adapted from https://stackoverflow.com/questions/22787209/how-to-have-clusters-of-stacked-bars

def plot_clustered_stacked(dfall, labels, title, ax, idx,**kwargs):
    n_df = len(dfall)
    n_col = len(dfall[0].columns)
    n_ind = len(dfall[0].index)

    for df in dfall : # for each data frame
        ax = df.plot(kind='bar',
                      linewidth=0,
                      stacked=True,
                      ax=ax,
                      legend=False,
                      grid=False,
                      **kwargs)  # make bar plots

    minor_ticks = []
    h,l = ax.get_legend_handles_labels() # get the handles we want to modify
    for i in range(0, n_df * n_col, n_col): # len(h) = n_col * n_df
        for j, pa in enumerate(h[i:i+n_col]):
            for rect in pa.patches: # for each index
                x = rect.get_x() + 1 / float(n_df + 1) * i / float(n_col) - 1 / float(n_df + 1) - 0.04
                w = 1 / float(n_df + 1)
                rect.set_x(x)
                rect.set_width(w)
                minor_ticks += [x + w / 2]
    minor_ticks = sorted(set(minor_ticks))
    major_ticks = []
    for i in range(n_ind):
        mid = n_df // 2 - 1
        left = minor_ticks[n_df * i + mid]
        right = minor_ticks[n_df * i + mid + 1]
        major_ticks += [(left + right) / 2]
    ax.tick_params(axis='x', which='major', length=0)
    ax.set_xticks(major_ticks, labels=dfall[0].index, rotation=0)
    ax.set_xticks(minor_ticks, labels=np.tile(labels, n_ind), minor=True, rotation=0)
    ax.set_yticks([0, 0.25, 0.5, 0.75, 1], labels=['0%', '25%', '50%', '75%', '100%'], rotation=0)
    ax.set_title(title)
    ax.margins(x=0, y=0)
    for t in ax.get_xticklabels(minor=False):
        t.set_y(-0.12)
    if idx == -3:
        l1 = ax.legend(h[:n_col], list(map(lambda l: l.capitalize(), l[:n_col])), loc='lower right')

In [None]:
raw_breakdown_df = load_df('../data/scaling_experiment.csv')
raw_breakdown_df = raw_breakdown_df[raw_breakdown_df['type'] == 'E2E']
total_time = raw_breakdown_df['initialization'] + raw_breakdown_df['ticketing'] + raw_breakdown_df['update'] + raw_breakdown_df['materialization']
raw_breakdown_df['initialization'] /= total_time
raw_breakdown_df['ticketing'] /= total_time
raw_breakdown_df['update'] /= total_time
raw_breakdown_df['materialization'] /= total_time
breakdown_df = pd.pivot_table(raw_breakdown_df, values=['initialization', 'ticketing', 'update', 'materialization'],
                              index=['workload', 'type', 'cardinality', 'distribution'], columns=['threads'],
                              sort=False).T[['Atomic', 'Thread Local']].T

In [None]:
dim = (3, 1)
fig = plt.figure(figsize=(4.5, 6.0), layout='constrained')
axs = fig.subplots(
    nrows=dim[0],
    ncols=dim[1],
    sharex=True,
    sharey=False,
).flatten()
fig.set_dpi(300)

for df, subtitle, ax in zip([
    breakdown_df.xs('Uniform', level=3).xs('Low', level=2).xs('E2E', level=1).T,
    breakdown_df.xs('Uniform', level=3).xs('High', level=2).xs('E2E', level=1).T,
    breakdown_df.xs('Uniform', level=3).xs('Unique', level=2).xs('E2E', level=1).T,
], [
    'low cardinality (uniform)',
    'high cardinality (uniform)',
    'unique keys (uniform)',
], axs):
    plot_clustered_stacked([
        df.xs(thread, level=1).T for thread in THREADS
    ], THREADS, subtitle.capitalize(), ax, list(axs).index(ax) - len(axs))
axs.flatten()[dim[0] - 1].set_xlabel('')
plt.savefig('../figures/breakdown.svg')

# Resizing Ablation

In [None]:
raw_resizing_df = load_df('../data/resizing_experiment.csv')
raw_resizing_df['resized'] = raw_resizing_df['capacity'] < raw_resizing_df['keys']
resizing_df = pd.pivot_table(raw_resizing_df, values=['throughput', 'latency'], index=['workload', 'cardinality', 'resized'], columns=['threads']).T[['Atomic', 'Thread Local', 'Partition']].T

In [None]:
dim = (1, 2)
linewidth = 2
fig = plt.figure(figsize=(2.4 * dim[1], 3.2 * dim[0]), layout='constrained')
fig.set_dpi(300)
axs = fig.subplots(
    nrows=dim[0],
    ncols=dim[1],
)
plot_dfs([
        resizing_df.xs(False, level=2).xs('High', level=1).T.loc['throughput'],
        resizing_df.xs(False, level=2).xs('Unique', level=1).T.loc['throughput'],
    ], [
        'high cardinality',
        'unique keys',
    ],
    '',
    (1, 2),
    figsize=(3.0 * dim[1], 4.0 * dim[0]),
    axs=axs,
    colors=(colors_cycle[0:1] + colors_cycle[2:4]),
    markers=(markers_cycle[0:1] + markers_cycle[2:4]),
    xlabel='threads'.capitalize(),
    ylabel='throughput'.capitalize(),
    linewidth=linewidth,
    linestyle='--',
)
plot_dfs([
        resizing_df.xs(True, level=2).xs('High', level=1).T.loc['throughput'],
        resizing_df.xs(True, level=2).xs('Unique', level=1).T.loc['throughput'],
    ], [
        'high cardinality',
        'unique keys',
    ],
    '',
    (1, 2),
    axs = axs,
    figsize=(3.2 * dim[1], 4.2 * dim[0]),
    colors=(colors_cycle[0:1] + colors_cycle[2:4]),
    markers=(markers_cycle[0:1] + markers_cycle[2:4]),
    xlabel='threads'.capitalize(),
    ylabel='throughput'.capitalize(),
    linewidth=linewidth,
)
for (i, ax) in enumerate(axs.flatten()):
    ax.set_xticks(THREADS, THREADS_LABELS)
    ax.set_xticks([], [], minor=True)
    if i % dim[1] != 0:
        ax.set_ylabel('')
handles, labels = axs.flatten()[0].get_legend_handles_labels()
axs.flatten()[0].legend(handles[:3], labels[:3], loc='upper left')
plt.savefig('../figures/resizing.svg')

## Memory Ablation

In [None]:
raw_memory_df = pd.read_csv('../data/memory_experiment.csv')

raw_memory_df['workload'] = raw_memory_df['workload'].str[:-3].replace({
    'Folklore': 'Folklore*',
    'FineGrainedLocking': 'Locking',
})

raw_memory_df['cardinality'] = (
    (raw_memory_df['keys'] >= raw_memory_df['elements']) * 2
    + ((raw_memory_df['keys'] < raw_memory_df['elements']) & (raw_memory_df['keys'] > 10_000)) * 1
).replace({2: 'Unique', 1: 'High', 0: 'Low'})

memory_df = pd.pivot_table(raw_memory_df, values=['memory'], index=['workload', 'cardinality'], columns='threads')

for card in ['Low', 'High', 'Unique']:
    vals = [
        memory_df.loc['Atomic', card]['memory'][1],
        memory_df.loc['ThreadLocal', card]['memory'][1],
        memory_df.loc['Partitioned', card]['memory'][1],
        memory_df.loc['Atomic', card]['memory'][8],
        memory_df.loc['ThreadLocal', card]['memory'][8],
        memory_df.loc['Partitioned', card]['memory'][8],
        memory_df.loc['Atomic', card]['memory'][32],
        memory_df.loc['ThreadLocal', card]['memory'][32],
        memory_df.loc['Partitioned', card]['memory'][32],
    ]
    val_strs = []
    for val in vals:
        val_strs.append('{:.3f}'.format(val))

    print('{} & {} & {} & {} & {} & {} & {} & {} & {} & {} \\\\'.format(
        {'Low' : 'Low cardinality', 'High': 'High cardinality', 'Unique': 'Unique keys'}[card] ,
        val_strs[0],
        val_strs[1],
        val_strs[2],
        val_strs[3],
        val_strs[4],
        val_strs[5],
        val_strs[6],
        val_strs[7],
        val_strs[8],
    ))