In [None]:
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from os import path, walk, listdir
from os.path import isfile
from statistics import fmean
import re
from tqdm import tqdm

# Constants

In [None]:
RUNTIME_SECONDS = 3600

DIR_CLASSIC = "../bench_data/classic/"
DIR_MODIFIED = "../bench_data/modified/"

REGEX_NODES = r"\d+_nodes"
REGEX_VERSION = r"(modified)|(classic)"
REGEX_DURATION = r"\d+_minutes"
REGEX_RUN = r"run_\d+"

# Utility Functions

In [None]:
def find_all_csvs(rootpath : str) -> list[str]:
    csvs = []
    for root, dirs, files in walk(rootpath):
        for dir in dirs:
            dir_path = path.join(root, dir)
            csvs = csvs + [path.normpath(path.abspath(path.join(root, dir, file))) for file in listdir(dir_path) if file.endswith('csv')]
    return csvs

def find_all_csvs_and_join(rootpaths : list[str]) -> list[str]:
    csvs = []
    for path in rootpaths:
        csvs = csvs + find_all_csvs(path)
    return csvs

In [None]:
FILES = find_all_csvs("..\\bench_data")

In [None]:
def open_csv(path : str) -> pd.DataFrame:
    df = pd.read_csv(path)
    df = df[df.leaderId != -1] # Remove the initial dummy entries
    return df

In [None]:
def find_node_count(df : pd.DataFrame) -> int:
    return len(df["serverId"].unique())

In [None]:
def find_quorum_size(df : pd.DataFrame) -> int:
    return find_node_count(df) // 2 + 1

In [None]:
def get_node_count(file : str) -> int:
    num, _ = re.search(REGEX_NODES, file).group().split("_", maxsplit=1)
    return int(num)

def get_duration_minutes(file : str) -> int:
    num, _ = re.search(REGEX_DURATION, file).group().split("_", maxsplit=1)
    return int(num)

def get_algorithm_type(file : str) -> str:
    return re.search(REGEX_VERSION, file).group()

def get_run_id(file : str) -> str:
    return re.search(REGEX_RUN, file).group()

# Work per Leader

## Committed entries

In [None]:
def plot_leader_commit_counts(df : pd.DataFrame, title=None):
    leaders = df["leaderId"].value_counts()
    leaders = leaders / len(df)
    # plt.bar(leaders.index, leaders)
    sns.barplot(leaders)
    plt.ylim(0,1)
    if not title is None:
        plt.title(title)
    plt.show()

In [None]:
for file in FILES:
    algo = get_algorithm_type(file)
    nodes = get_node_count(file)
    minutes = get_duration_minutes(file)
    run = get_run_id(file)
    plot_leader_commit_counts(open_csv(file), f"{algo} {run}: {nodes} nodes for {minutes} min")

# Performance

## Entry Throughput

In [None]:
def find_replicated_count(df : pd.DataFrame, node_count : int) -> int:
    entry_counts = df.groupby('serverId').size()
    committed = []
    for count in entry_counts:
        replicated_count = sum(map(lambda x: x >= count, entry_counts))
        if replicated_count >= node_count:
            committed.append(count)
    return max(committed) if len(committed) > 0 else 0

def find_committed_count(df : pd.DataFrame) -> int:
    return find_replicated_count(df, find_quorum_size(df))

def find_fully_replicated_count(df : pd.DataFrame) -> int:
    return find_replicated_count(df, find_node_count(df))

In [None]:
def split_logs_by(df : pd.DataFrame, column : str) -> pd.DataFrame:
    logs : list[pd.DataFrame] = []
    for val in df[column].unique():
        logs.append(df[df[column] == val])
    return logs


In [None]:
def find_replication_delays(df : pd.DataFrame, num_nodes : int):
    logs = split_logs_by(df, "serverId")

    last_replicated = find_replicated_count(df, num_nodes)
    
    delays = []

    for i in range(last_replicated):
        creation_time = min([log.iloc[i]["creationTime"] for log in logs if len(log) > i])
        storage_times = [log.iloc[i]["storageTime"] for log in logs if len(log) > i]
        
        if len(storage_times) < num_nodes:
            delays.append(None)
            print(f"None at {i}. Need {num_nodes}, have {len(storage_times)}")
        else:
            storage_times.sort()
            delays.append(storage_times[num_nodes - 1] - creation_time)
    return delays
        

In [None]:
def process_data(paths : list[str], csv : str) -> pd.DataFrame:
    unprocessed_paths = []
    unprocessed_paths = paths
    existing_df = None

    algorithms = []
    node_counts = []
    durations_minutes = []
    run_ids = []
    dataframes = []
    full_replication_delays = []
    commit_delays = []
    committed_counts = []
    fully_replicated_counts = []
    committed_rates = []
    fully_replicated_rates = []
    longest_logs = []


    for file in tqdm(unprocessed_paths):
        df = open_csv(file)

        algorithms.append(get_algorithm_type(file))
        node_counts.append(get_node_count(file))
        durations_minutes.append(get_duration_minutes(file))
        run_ids.append(get_run_id(file))
        dataframes.append(df)
        full_replication_delays.append(find_replication_delays(df, find_node_count(df)))
        commit_delays.append(find_replication_delays(df, find_quorum_size(df)))
        committed_counts.append(find_committed_count(df))
        fully_replicated_counts.append(find_fully_replicated_count(df))
        longest_logs.append(find_replicated_count(df, 1))

    fully_replicated_rates = np.array(fully_replicated_counts) / RUNTIME_SECONDS
    committed_rates = np.array(committed_counts) / RUNTIME_SECONDS

    columns = {"path": paths, 
               "algorithm": algorithms, 
               "nodeCount": node_counts, 
               "duration": durations_minutes, 
               "runId": run_ids, 
               "fullReplicationDelays": full_replication_delays,
               "commitDelays": commit_delays,
               "fullReplicationCount": fully_replicated_counts, 
               "commitCount": committed_counts, 
               "fullReplicationRate": fully_replicated_rates,
               "commitRate": committed_rates,
               "longest_logs": longest_logs
               }
    new_entries = pd.DataFrame(columns)

    if existing_df is None:
       new_entries = new_entries.sort_values(by=["algorithm", "nodeCount", "runId"])
    #    new_entries.to_csv(csv)
       return new_entries
    else:
        result = pd.concat([existing_df, new_entries]).sort_values(by=["algorithm", "nodeCount", "runId"]).reindex()
        # result.to_csv(csv)
        return result


paths = find_all_csvs("../bench_data/")
processed_path = "../bench_data/processed.csv"
DATA = process_data(paths, processed_path)
print(DATA.dtypes)
display(DATA)


In [None]:
for file in FILES:
    df = open_csv(file)
    print(f"{file.split("bench_data", maxsplit=1)[1]}:\t committed {find_committed_count(df)}/{find_replicated_count(df, 1)},\t fully replicated {find_fully_replicated_count(df)}/{find_replicated_count(df, 1)}")

In [None]:
def get_col_from_all(data : pd.DataFrame, colname : str, algo : str = None, nodes : str = None):
    filtered = data
    if not algo is None:
        filtered = filtered[filtered["algorithm"] == algo]
    if not nodes is None:
        filtered = filtered[filtered["nodeCount"] == nodes]
    
    if colname in ["fullReplicationDelays", "commitDelays"]:
        nested_lists = filtered[colname].tolist()
        return sum(nested_lists, start=[])
    else:
        return filtered[colname]

In [None]:
# Overall
modified_commit_delays = get_col_from_all(DATA, "commitDelays", algo="modified")
classic_commit_delays = get_col_from_all(DATA, "commitDelays", algo="classic")
plt.figure(figsize=(10,5))
sns.ecdfplot(modified_commit_delays, log_scale=True, label="modified")
sns.ecdfplot(classic_commit_delays, log_scale=True, label="classic")
plt.title("Overall commit latency of classic vs modified")
plt.xlabel("Latency [milliseconds]")
plt.xlim(0.0000001, 1000)
plt.yticks(np.arange(0, 1.1, 0.1))
plt.gca().xaxis.minorticks_on()
plt.grid(True)
plt.legend()
plt.gca()
plt.show()

In [None]:
modified_commit_delays = get_col_from_all(DATA, "commitDelays", algo="modified", nodes=3)
classic_commit_delays = get_col_from_all(DATA, "commitDelays", algo="classic", nodes=3)
plt.figure(figsize=(10,5))
sns.ecdfplot(modified_commit_delays, log_scale=True, label="modified")
sns.ecdfplot(classic_commit_delays, log_scale=True, label="classic")
plt.title("Overall commit latency of classic vs modified")
plt.xlabel("Latency [milliseconds]")
plt.xlim(0.0000001, 1000)
plt.yticks(np.arange(0, 1.1, 0.1))
plt.gca().xaxis.minorticks_on()
plt.grid(True)
plt.legend()
plt.gca()
plt.show()

In [None]:
modified_commit_delays = get_col_from_all(DATA, "commitDelays", algo="modified", nodes=5)
classic_commit_delays = get_col_from_all(DATA, "commitDelays", algo="classic", nodes=5)
plt.figure(figsize=(10,5))
sns.ecdfplot(modified_commit_delays, log_scale=True, label="modified")
sns.ecdfplot(classic_commit_delays, log_scale=True, label="classic")
plt.title("Overall commit latency of classic vs modified")
plt.xlabel("Latency [milliseconds]")
plt.xlim(0.0000001, 1000)
plt.yticks(np.arange(0, 1.1, 0.1))
plt.gca().xaxis.minorticks_on()
plt.grid(True)
plt.legend()
plt.gca()
plt.show()

In [None]:
modified_commit_delays = get_col_from_all(DATA, "commitDelays", algo="modified", nodes=7)
classic_commit_delays = get_col_from_all(DATA, "commitDelays", algo="classic", nodes=7)
plt.figure(figsize=(10,5))
sns.ecdfplot(modified_commit_delays, log_scale=True, label="modified")
sns.ecdfplot(classic_commit_delays, log_scale=True, label="classic")
plt.title("Overall commit latency of classic vs modified")
plt.xlabel("Latency [milliseconds]")
plt.xlim(0.0000001, 1000)
plt.yticks(np.arange(0, 1.1, 0.1))
plt.gca().xaxis.minorticks_on()
plt.grid(True)
plt.legend()
plt.gca()
plt.show()

In [None]:
modified_commit_delays = get_col_from_all(DATA, "commitDelays", algo="modified", nodes=9)
classic_commit_delays = get_col_from_all(DATA, "commitDelays", algo="classic", nodes=9)
plt.figure(figsize=(10,5))
sns.ecdfplot(modified_commit_delays, log_scale=True, label="modified")
sns.ecdfplot(classic_commit_delays, log_scale=True, label="classic")
plt.title("Overall commit latency of classic vs modified")
plt.xlabel("Latency [milliseconds]")
plt.xlim(0.0000001, 1000)
plt.yticks(np.arange(0, 1.1, 0.1))
plt.gca().xaxis.minorticks_on()
plt.grid(True)
plt.legend()
plt.gca()
plt.show()

In [None]:
# Overall
modified_commit_delays = get_col_from_all(DATA, "commitDelays", algo="modified", nodes=15)
classic_commit_delays = get_col_from_all(DATA, "commitDelays", algo="classic", nodes=15)
plt.figure(figsize=(10,5))
sns.ecdfplot(modified_commit_delays, log_scale=True, label="modified")
sns.ecdfplot(classic_commit_delays, log_scale=True, label="classic")
plt.title("Overall commit latency of classic vs modified")
plt.xlabel("Latency [milliseconds]")
plt.xlim(0.0000001, 1000)
plt.yticks(np.arange(0, 1.1, 0.1))
plt.gca().xaxis.minorticks_on()
plt.grid(True)
plt.legend()
plt.gca()
plt.show()