In [None]:
from pyspark.sql import SparkSession

spark = SparkSession.builder\
    .config("spark.driver.memory", "16g")\
    .getOrCreate()

spark

In [None]:
%load_ext autoreload
%autoreload 2

import numpy as np
import matplotlib.pyplot as plt
import multiprocessing.pool
import pandas as pd
import pyspark.sql.functions as F
import pyspark.sql.types as T

from cycler import cycler
from pyspark.sql.functions import col, lit

from common import *

pd.set_option('display.max_rows', 100)
pd.set_option('display.max_rows', 20)
pd.set_option('display.max_colwidth', None)

plt.rc('axes', labelsize=15, titlesize=15) 
plt.rc('xtick', labelsize=14)
plt.rc('ytick', labelsize=14)
plt.rc('legend', fontsize=15)

# Throughput

In [None]:
THRP_PREFIX = "main/ycsb"

IGNORE_CACHE = False

thrp_index_df = from_cache_or_compute(
    f'{THRP_PREFIX}/index.parquet',
    lambda: get_index(spark, THRP_PREFIX).toPandas().convert_dtypes().astype({
        "wl:hot": "int32",
        "wl:mh": "int32",
        "wl:mp": "int32"
    }),
    ignore_cache=IGNORE_CACHE,
)
thrp_index_df

In [None]:
def compute_throughput(prefix):
    res = throughput(
        spark,
        prefix,
        start_offset_sec=10,
        duration_sec=50,
    ).first().throughput
    return res


def compute_all_throughputs(index_df):
    prefix = list(index_df["prefix"])

    # Compute the throughput of each prefix
    with multiprocessing.pool.ThreadPool() as pool:
        res = pool.map(compute_throughput, prefix)
    
    # Construct a dataframe from the results
    throughput_df = pd.DataFrame({"prefix": prefix, "throughput": res})

    # Associate metadata from the index to the throughputs
    return throughput_df.merge(index_df, on="prefix")

In [None]:
IGNORE_CACHE = False

throughput_df = from_cache_or_compute(
    f'{THRP_PREFIX}/throughput.parquet',
    lambda: compute_all_throughputs(thrp_index_df),
    ignore_cache=IGNORE_CACHE,
)

throughput_df

## Plot

In [None]:
from matplotlib.ticker import MultipleLocator

fig, axes = plt.subplots(2, 3, figsize=(11, 7), sharey=True)

configs = [
    "ddr_ts.conf",
    "ddr_only.conf",
    "baseline.conf",
    "baselinex.conf",
    "calvin.conf",
]

mp_pcts = sorted(throughput_df["wl:mp"].unique())
mh_pcts = sorted(throughput_df["wl:mh"].unique())
hots = sorted(throughput_df["wl:hot"].unique(), reverse=True)

ignored_configs = []

config_to_label = {
    'baseline.conf': 'SLOG',
    'baselinex.conf': 'SLOG (slow)',
    'ddr_only.conf': 'Detock (w/o opportunistic ordering)',
    'ddr_ts.conf': 'Detock',
    "calvin.conf": 'Calvin',
}

pc = cycler(linestyle=['-', ':', '--', (0, (5, 8)), '-.']) + cycler(color=["red", "blue", "k", "dimgray", "g"])
for ax_r in axes:
    for ax in ax_r:
        ax.set_prop_cycle(pc)

for config in configs:
    if config in ignored_configs:
        continue
    for r, hot in enumerate(hots):
        for c, mp_pct in enumerate(mp_pcts):
            mask = (
                (throughput_df["config_name"] == config) &
                (throughput_df["wl:mp"] == mp_pct) &
                (throughput_df["wl:hot"] == hot)
            )
            label = config_to_label[config] if r == 0 and c == 0 else '_nolegend_'
            filtered = throughput_df[mask].sort_values("wl:mh")
            filtered["throughput2"] = filtered["throughput"] / 1000
            filtered.plot(
                ax=axes[r, c],
                x="wl:mh",
                y="throughput2",
                label=label,
                marker='.',
                legend=False,
            )
            axes[r, c].set_title(f"HOT = {1/hot}, MP = {mp_pct}%")
            axes[r, c].set_ylabel("thousand txn/s")
            axes[r, c].set_xticks(mh_pcts)
            axes[r, c].set_xscale("symlog", linthresh=25)
            axes[r, c].set_ylim(0, 125)
            axes[r, c].set_xticks([0, 5, 15, 25, 50, 75, 100])
            axes[r, c].set_xticklabels([0, 5, 15, 25, 50, 75, 100], rotation=0)

            # Realign the tick labels
            tick_label_100 = axes[r, c].xaxis.get_majorticklabels()[-1]
            tick_label_100.set_horizontalalignment("left")

            axes[r, c].minorticks_off()
            # axes[r, c].grid(axis='y')
            if r == len(hots) - 1:
                axes[r, c].set_xlabel("% multi-home")
            else:
                axes[r, c].set_xlabel(None)


fig.tight_layout()
fig.legend(bbox_to_anchor=(0, 1, 1, 0), loc='lower left', mode='expand', ncol=3)

fig.savefig('output/micro-throughput.pdf', bbox_inches='tight')
fig.savefig('output/micro-throughput.png', bbox_inches='tight')


# Deadlocks

In [None]:
deadlocks_index_df = thrp_index_df[(
    (thrp_index_df["wl:hot"] == 100) &
    (thrp_index_df["wl:mp"] == 100) &
    (
        (thrp_index_df["config_name"] == "ddr_ts.conf") |
        (thrp_index_df["config_name"] == "ddr_only.conf")
    )
)]
deadlocks_index_df

In [None]:
IGNORE_CACHE = False

import pickle

mh_pcts = [5, 15, 25, 50, 75, 100]

configs = ["ddr_ts.conf", "ddr_only.conf"]

config_to_label = {
    'ddr_only.conf': 'Detock (w/o opportunistic ordering)',
    'ddr_ts.conf': 'Detock'
}

# Collect data
DEADLOCKS_PATH = f'{THRP_PREFIX}/deadlocks.pickle'

data = []
if not IGNORE_CACHE and isfile(DEADLOCKS_PATH):
    with open(DEADLOCKS_PATH, 'rb') as f:
        data = pickle.load(f)
else:
    for config in configs:
        deadlocks = []
        num_txns = []
        for mh in mh_pcts:
            mask = (deadlocks_index_df["config_name"] == config) & (deadlocks_index_df["wl:mh"] == mh)
            prefix = deadlocks_index_df.loc[mask, "prefix"].iloc[0]
            
            deadlocks_df = deadlocks_csv(spark, prefix).where(col("replica") == 0).toPandas()

            deadlocks.append(deadlocks_df["vertices"])
            num_txns.append(committed(spark, prefix))

        data.append((deadlocks, num_txns))

    with open(DEADLOCKS_PATH, 'wb') as f:
        pickle.dump(data, f)
        print(f'Saved to: {DEADLOCKS_PATH}')

# Plot
fig, axes = plt.subplots(2, 1, sharex=True, figsize=(5, 7.5))
colors=['steelblue', 'tomato']
hatches = ['', '/']
for i, c in enumerate(configs):
    deadlocks, num_txns = data[i]
    data_cnt = list(map(lambda d : d[0].count(), zip(deadlocks, num_txns)))
    pos = [j * (len(configs) + 1) + i for j in range(len(data_cnt))]

    l = axes[0].bar(pos, data_cnt, label=config_to_label[c], hatch=hatches[i], facecolor=colors[i], edgecolor='k')
    # color = l.get_children()[-1].get_facecolor()

    box = axes[1].boxplot(
        deadlocks,
        flierprops={ 'markersize': 1 },
        medianprops={ 'color': 'black' },
        positions=pos,
        manage_ticks=False,
        patch_artist=True,
    )
    for b in box['boxes']:
        b.set_facecolor(colors[i])
        b.set_hatch(hatches[i])
        b.set_edgecolor('k')
    
ticks = []
ticklabels = []
for i, mh in enumerate(mh_pcts):
    start = i * (len(configs) + 1)
    end = start + len(configs)
    ticks.append((start + end - 1) / 2)
    ticklabels.append(f'{mh}')
    
axes[0].set_ylabel('number of deadlocks')
axes[0].set_xticks(ticks)
axes[0].set_xticklabels(ticklabels)
# axes[0].grid(axis='y')

axes[1].set_xlabel("% multi-home")
axes[1].set_ylabel('size of a deadlock')
axes[1].set_yscale("log")
# axes[1].grid(axis='y')

fig.legend(bbox_to_anchor=(0, 1, 1, 0), loc='lower left', mode='expand', ncol=1)
fig.tight_layout()
fig.savefig('output/micro-deadlocks.pdf', bbox_inches='tight')

# Latency

In [None]:
LAT_PREFIX = "main/ycsb-latency"

IGNORE_CACHE = False

lat_index_df = from_cache_or_compute(
    f'{LAT_PREFIX}/index.parquet',
    lambda: get_index(spark, LAT_PREFIX).toPandas().convert_dtypes().astype({
        "wl:hot": "int32",
        "wl:mh": "int32",
        "wl:mp": "int32"
    }),
    ignore_cache=IGNORE_CACHE
)

percentile_cols = [
    F.percentile_approx("latency", 0.5).alias("percentile_50"),
    F.percentile_approx("latency", 0.99).alias("percentile_99"),
]

lat_index_df

In [None]:
IGNORE_CACHE = False

latency_pct_sh_df = from_cache_or_compute(
    f'{LAT_PREFIX}/latency_sh.parquet',
    lambda: latency(
        spark,
        lat_index_df["prefix"],
        sample=1.0,
        start_offset_sec=10,
    )\
        .where(F.size("regions") == 1)\
        .groupBy("prefix")\
        .agg(*percentile_cols)\
        .toPandas()\
        .merge(lat_index_df, on="prefix"),
    ignore_cache=IGNORE_CACHE,
)

In [None]:
IGNORE_CACHE = False

# For MH transactions, we resample the results so that the numbers of transactions
# are even across the regions

mh_df = latency(
    spark,
    lat_index_df["prefix"],
    sample=1.0,
    start_offset_sec=5,
    duration_sec=20,
)\
    .withColumn("region", F.shiftright(F.col("coordinator"), 24))\
    .withColumn("key", F.concat(F.col("prefix"), F.col("region")))\
    .where(F.size("regions") > 1)\
    .cache()

sampling_frac = mh_df\
    .groupBy("key")\
    .count()\
    .withColumn("frac", F.least(F.lit(1), 200/F.col("count")))\
    .select("key", "frac")

latency_pct_mh_df = from_cache_or_compute(
    f'{LAT_PREFIX}/latency_mh.parquet',
    lambda: mh_df\
        .sampleBy(
            "key",
            sampling_frac.toPandas().set_index("key").to_dict()["frac"],
            seed=1000,
        )\
        .groupBy("prefix")\
        .agg(*percentile_cols).toPandas()\
        .merge(lat_index_df, on="prefix"),
    ignore_cache=IGNORE_CACHE,
)

In [None]:
latency_pct_mh_df

## Plot

In [None]:
import matplotlib.ticker as ticker 

def plot_one_latency(df, label, config, hot, mp_pct, mh_pcts, ax):
    mask = (
        (df["config_name"] == config) &
        (df["wl:hot"] == hot) &
        (df["wl:mp"] == mp_pct) &
        (df["wl:mh"].isin(mh_pcts))
    )
    filtered = df[mask].sort_values("wl:mh")
    filtered.plot(ax=ax, x="wl:mh", y="percentile_50", label=label.format('p50'), legend=False)
    # filtered.plot(ax=ax, x="wl:mh", y="percentile_95", label=label.format('p95'), legend=False)
    filtered.plot(ax=ax, x="wl:mh", y="percentile_99", label=label.format('p99'), legend=False)

    ax.set_title(f"HOT = {1/hot}, MP = {mp_pct}%")
    ax.set_ylabel("latency (ms)")
    # ax.set_yscale("log")
    ax.set_xlabel("% multi-home")
    ax.set_xscale("symlog", linthresh=15)
    ax.set_xticks(mh_pcts)
    ax.set_xticklabels(mh_pcts, rotation=0)
    ax.yaxis.set_major_locator(ticker.MultipleLocator(50))
    ax.yaxis.set_minor_locator(ticker.AutoMinorLocator())
    #ax.grid(visible=None)


def plot_latency(
    df,
    mp_pcts=[0, 50, 100],
    mh_pcts=[0, 5, 10, 15, 25, 50, 75, 100],
    hots=[10000, 100],
    figsize=(11, 7),
    legend=True,
    fig_axes=None,
):
    configs = [
        "ddr_ts.conf",
        # "ddr_only.conf",
        "baseline.conf"
    ]

    config_to_label = {
        'ddr_ts.conf': 'Detock',
        'baseline.conf': 'SLOG',
        # 'calvin.conf': 'Calvin',
        # 'ddr_only.conf': 'Detock w.o OO',
    }

    if fig_axes:
        fig, axes = fig_axes
    else:
        fig, axes = plt.subplots(len(hots), len(mp_pcts), figsize=figsize, sharey=True)
        axes = np.array(axes).reshape((len(hots), len(mp_pcts)))

    pc = (cycler(color='rkgb') + cycler(marker=['.', 'x', '+', 'o'])) * cycler(linestyle=['--', '-'])
    for ax_r in axes:
        for ax in ax_r:
            ax.set_prop_cycle(pc)

    for i, config in enumerate(configs):
        for r, hot in enumerate(hots):   
            for c, mp_pct in enumerate(mp_pcts):
                plot_one_latency(
                    df,
                    config_to_label[config] + ' {}' if r == 0 and c == 0 else '_nolegend_',
                    config,
                    hot,
                    mp_pct,
                    mh_pcts,
                    axes[r, c],
                )

    if legend:
        fig.legend(bbox_to_anchor=(0, 1, 1, 0), loc='lower left', mode='expand', ncol=3)

    return fig, axes


In [None]:
configs = [
    "ddr_ts.conf",
    # "ddr_only.conf",
    "baseline.conf",
    # "calvin.conf"
]

config_to_label = {
    'ddr_ts.conf': 'Detock',
    'baseline.conf': 'SLOG',
    # 'ddr_only.conf': 'Detock w.o OO',
    'calvin.conf': "Calvin",
}

fig, axes = plt.subplots(3, 1, figsize=(6, 8.5), sharex=True, sharey=True)
axes = np.array(axes).reshape((3, 1))

plot_latency(
    latency_pct_sh_df,
    mp_pcts=[100],
    mh_pcts=[0, 5, 10, 15, 25, 50, 75],
    figsize=(6, 6),
    legend=False,
    fig_axes=(fig, axes),
)

for config in configs:
    plot_one_latency(
        latency_pct_mh_df,
        label=label,
        config=config,
        hot=100,
        mp_pct=100,
        mh_pcts=[5, 10, 15, 25, 50, 75, 100],
        ax=axes[2, 0],
    )

first = True
for i in range(3):
    label = '_nolegend_' if not first else config_to_label["calvin.conf"] + ' {}'
    plot_one_latency(
        latency_pct_sh_df,
        label=label,
        config="calvin.conf",
        hot=100,
        mp_pct=100,
        mh_pcts=[0, 5, 10, 15, 25, 50, 75, 100],
        ax=axes[i, 0],
    )
    first = False


axes[0, 0].set_title("Single-Home, HOT = 0.0001")
axes[0, 0].set_xlabel(None)
axes[1, 0].set_title("Single-Home, HOT = 0.01")
axes[1, 0].set_xlabel(None)
axes[2, 0].set_title("Multi-Home, HOT = 0.01")
last_tick_label = axes[2, 0].xaxis.get_majorticklabels()[-1]
last_tick_label.set_horizontalalignment("left")


fig.legend(bbox_to_anchor=(0, 1, 1, 0), mode='expand', ncol=3, loc='lower left')
fig.tight_layout()
fig.savefig('output/micro-latency.pdf', bbox_inches='tight')
fig.savefig('output/micro-latency.png', bbox_inches='tight')