# Experiment Result Analysis

Evaluated systems:

- Nos
- Bsl-Cocytus
- Bsl-PQ
- Bsl-Split
- Bsl-Repl

## Preparation

In [None]:
import glob
import os
import pandas as pd
import numpy as np

from pathlib import Path
from typing import *

EVALUATED_SYSTEMS = [
    'nos',
    'cocytus',
    'pq',
    'split',
    'repl'
]

EVALUATED_VALUE_SIZES = [
    '64',
    '256',
    '1k',
    '4k'
]

CLUSTER_SIZE = 32

from enum import Enum

class Dist(Enum):
    '''
    Key distribution.
    '''
    UNIFORM = 0
    ZIPF = 1

In [3]:
def collect_dir(exp: str) -> Tuple[float, float, pd.Series]:
    '''
    Collect csv-format performance dumps from a directory.

    ### Args:
        `exp`: Absolute path to the experiment data directory.

    ### Returns:
        (thpt_max, thpt_mean, latencies)    
    '''
    
    throughputs = []
    latencies = []

    for f in glob.glob(f'{exp}/*.csv'):
        df = pd.read_csv(f)
        
        # Drop the first and the last row
        df.drop(df.head(1).index, inplace=True)
        df.drop(df.tail(1).index, inplace=True)

        # Get throughput data, pending merge
        throughput = df['throughput'].sort_index(ascending=False).reset_index(drop=True)

        # Get latency data, merge instantly
        latency = df[['get_avg', 'get_p50', 'get_p99', 'get_p999', 'put_avg', 'put_p50', 'put_p99', 'put_p999']]
        latency = latency.mean(axis=0)
        
        throughputs.append(throughput)
        latencies.append(latency)
    
    if len(throughputs) == 0 or len(latencies) == 0:
        raise Exception("No data found")

    # Merge throughputs into a table
    throughput_table = pd.concat(throughputs, axis=1).sort_index(ascending=False).reset_index(drop=True).replace(np.nan, 0)
    throughputs = throughput_table.sum(axis=1).astype("int") / 1e6

    # Merge latencies
    latency_table = pd.concat(latencies, axis=1)
    # print(exp, latency_table)
    latencies = latency_table.mean(axis=1).astype("int") / 1e3
    return throughputs.max(), throughputs.mean(), latencies

In [4]:
def get_system_dir(system: str, config: str) -> str:
    if system == 'repl':
        config = '42' if config == '62' else '63'
    return f'{system}-{config}'


def fix_glob_pattern(dir: str, op: str) -> str:
    if op == 'put':
        dir = dir.replace('split-lb', 'split')
    return dir

## Section 6.2: Microbenchmark

1. 100%-GET and 100%-PUT
    * peak throughputs
    * latency-throughput curves
2. YCSB throughputs
3. Sensitivity analysis
4. Centralization test
5. Dynamic constraint adapt

In [None]:
def collect_100getput(config: str, dist: Dist = Dist.ZIPF, output_dir: str = 'res'):
    '''
    Section 6.2.1(a)
    ----------------
    
    Collect performances of 100%-GET/100%-PUT microbenchmark.

    ### Args:
        `config`: Erasure coding configuration, e.g., `42`, `62`, and `63`.
        `dist`:   Key distribution.
    '''
    
    notebook_path = Path(os.getcwd())
    data_dir = notebook_path.parent / 'data'
    infix = '' if dist == Dist.UNIFORM else 'zipf-'

    # Prevent unnecessary re-run
    output_file = data_dir / output_dir / f'getput-{infix}{config}.xlsx'

    # Common-case performance
    commoncase_dir = data_dir / 'normal'

    # Ensure the directory exists
    present_systems = []
    for system in EVALUATED_SYSTEMS:
        system_dir = commoncase_dir / get_system_dir(system, config)
        if system_dir.is_dir():
            present_systems.append(system)
        else:
            print(f'Directory {system_dir} does not exist!')

    results = {}    
    for op in ['get', 'put']:
        df_thpt = None
        
        for value_sz in EVALUATED_VALUE_SIZES:
            row_thpt = {}

            for system in present_systems:
                system_dir = commoncase_dir / get_system_dir(system, config)

                op_long = 'updateonly' if op == 'put' else 'readonly'
                glob_pattern = fix_glob_pattern(f'{system_dir}/{op_long}-{infix}{value_sz}-*', op)
                g = list(glob.glob(glob_pattern))

                if len(g) == 0:
                    print(glob_pattern)
                    raise Exception(f'No data found for ({op}, {system}, {value_sz})')
                
                exp_dir = g[0]
                thpt_max, _, _ = collect_dir(exp_dir)
                
                row_thpt[system] = thpt_max
            
            df_thpt = pd.concat([df_thpt, pd.DataFrame(row_thpt, index=[value_sz])])
        
        results[op] = df_thpt
    
    # Dump
    with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
        for op in ['get', 'put']:
            df_thpt = results[op]
            df_thpt.to_excel(writer, sheet_name=f'{op}-thpt')


# Do collection
collect_100getput('42', Dist.UNIFORM)
collect_100getput('62', Dist.UNIFORM)
collect_100getput('63', Dist.UNIFORM)

In [None]:
def collect_ltc(output_dir: str = 'res'):
    '''
    Section 6.2.1(b)
    ----------------

    Collect latency-throughput curve data.
    '''

    notebook_path = Path(os.getcwd())
    data_dir = notebook_path.parent / 'data'
    ltc_dir = data_dir / 'ltc'

    def do_collect_op(systems: List[str], op: str):
        output_file = data_dir / output_dir / f'ltc-{op}.xlsx'
        results = {}
        op_long = 'readonly' if op == 'get' else 'updateonly'

        for system in systems:
            df_res = None
            
            glob_pattern = f'{ltc_dir}/{system}/{op_long}-*.csv'
            for f in glob.glob(glob_pattern):
                offered_load = int(f.split('-')[-1].split('.')[0])

                df = pd.read_csv(f)
        
                # Drop the first and the last row
                df.drop(df.head(1).index, inplace=True)
                df.drop(df.tail(1).index, inplace=True)

                # Get throughput data, pending merge
                throughput = df['throughput'].sort_index(ascending=False).reset_index(drop=True)
                throughput = throughput.max()

                # Get latency data, merge instantly
                latency = df[[f'{op}_avg', f'{op}_p50', f'{op}_p99']]
                latency = latency.mean(axis=0)

                row = { 'thpt': offered_load + throughput / 1e6 }
                row[f'{op}_avg'] = latency[f'{op}_avg'] / 1e3
                row[f'{op}_p50'] = latency[f'{op}_p50'] / 1e3
                row[f'{op}_p99'] = latency[f'{op}_p99'] / 1e3
            
                df_res = pd.concat([df_res, pd.DataFrame(row, index=[offered_load])])
            
            # Sort by index
            df_res.sort_index(inplace=True)
            
            results[system] = df_res

        with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
            for system in systems:
                df = results[system]
                df.to_excel(writer, sheet_name=system)

    # Read latency curve.
    # 3 types: Nos, Split, Split-LB.
    do_collect_op(['nos', 'split', 'split-lb'], 'get')

    # Write latency curve.
    # 5 typs:  Nos, Split, Cocytus, PQ, Repl.
    do_collect_op(['nos', 'split', 'cocytus', 'pq', 'repl'], 'put')


collect_ltc()

In [None]:
def collect_ycsb(config: str, output_dir: str = 'res'):
    '''
    Section 6.2.2
    -------------

    Collect performances of YCSB benchmark.

    ### Args:
        `config`: Erasure coding configuration, e.g., `42`, `62`, and `63`.
    '''
    
    notebook_path = Path(os.getcwd())
    data_dir = notebook_path.parent / 'data'

    # Prevent unnecessary re-run.
    output_file = data_dir / output_dir / f'ycsb-{config}.xlsx'
    if output_file.is_file():
        return

    # Ensure the directory exists.
    present_systems = []
    for system in EVALUATED_SYSTEMS:
        system_dir = data_dir / 'ycsb' / get_system_dir(system, config)
        if system_dir.is_dir():
            present_systems.append(system)
        else:
            print(f'Directory {system_dir} does not exist!')

    df_thpt = None
    for workload in ['ycsb-a', 'ycsb-b', 'ycsb-d']:
        row_thpt = {}
        for system in present_systems:
            system_dir = data_dir / 'ycsb' / get_system_dir(system, config)

            glob_pattern = f'{system_dir}/{workload}-*'
            g = list(glob.glob(glob_pattern))

            if len(g) == 0:
                raise Exception(f'No data found for ({workload}, {system})')
            
            exp_dir = g[0]
            thpt_max, _, _ = collect_dir(exp_dir)
            
            row_thpt[system] = thpt_max
            
        df_thpt = pd.concat([df_thpt, pd.DataFrame(row_thpt, index=[workload])])
    
    # Dump
    with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
        df_thpt.to_excel(writer, sheet_name='thpt')


# Do collection
collect_ycsb('42')
collect_ycsb('62')
collect_ycsb('63')

In [None]:
def collect_sensitivity(output_dir: str = 'res'):
    '''
    Section 6.2.3
    -------------

    Collect performances of EC parameter sensitivity tests.
    '''

    notebook_path = Path(os.getcwd())
    data_dir = notebook_path.parent / 'data'

    output_file = data_dir / output_dir / f'sensitivity.xlsx'
    sensitivity_dir = data_dir / 'sensitivity'

    present_systems = list(x for x in EVALUATED_SYSTEMS if x != 'pq')

    # Enumerate configurations
    df = { 4: None, 6: None }
    for k in [4, 6]:
        df_thpt = None
        for p in range(1, k):
            row_thpt = {}
            for system in present_systems:
                exp_dir = sensitivity_dir / system / f'{k}{p}'
                thpt_max, _, _ = collect_dir(exp_dir)

                row_thpt[system] = thpt_max
        
            df_thpt = pd.concat([df_thpt, pd.DataFrame(row_thpt, index=[p])])

        df[k] = df_thpt

    # Dump
    with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
        df[4].to_excel(writer, sheet_name='k4')
        df[6].to_excel(writer, sheet_name='k6')
    

# Do collection
collect_sensitivity()

In [16]:
def collect_centralized(output_dir: str = 'res'):
    '''
    Section 6.2.4
    -------------

    Collect performances with centralized MDS.
    '''

    notebook_path = Path(os.getcwd())
    data_dir = notebook_path.parent / 'data'

    output_file = data_dir / output_dir / f'centralized.xlsx'
    centralized_dir = data_dir / 'centralized'

    present_systems = ['mds'] + EVALUATED_SYSTEMS

    # Enumerate configurations
    df_thpt = None
    for exp in ['read', 'update']:
        row_thpt = {}
        for system in present_systems:
            system_dir = centralized_dir / system

            glob_pattern = f'{system_dir}/{exp}only-*'
            g = list(glob.glob(glob_pattern))

            if len(g) == 0:
                print(glob_pattern)
                raise Exception(f'No data found for ({exp}, {system})')
            
            exp_dir = g[0]
            thpt_max, _, _ = collect_dir(exp_dir)
            row_thpt[system] = thpt_max
    
        df_thpt = pd.concat([df_thpt, pd.DataFrame(row_thpt, index=[exp])])

    # Dump
    with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
        df_thpt.to_excel(writer, sheet_name='thpt')
    

# Do collection
collect_centralized()

In [5]:
def collect_slowdown(output_dir: str = 'res'):
    '''
    Section 6.2.5
    -------------

    Collect performances with dynamic slowdown.
    '''

    notebook_path = Path(os.getcwd())
    data_dir = notebook_path.parent / 'data'

    output_file = data_dir / output_dir / 'slowdown.xlsx'
    slowdown_dir = data_dir / 'slowdown'

    present_systems = ['cocytus', 'nos', 'repl', 'split', 'repl+', 'split+', 'dynbackup', 'repl-handoff']

    results = {}

    # Collect curve data, one sheet per system
    for sys in present_systems:
        system_dir = f'{slowdown_dir}/ycsb-d-{sys}'
        
        reclen = 999999
        for f in glob.glob(f'{system_dir}/*.csv'):
            df = pd.read_csv(f)
            reclen = min(reclen, df.shape[0])

        thpt = None
        lats = None

        for f in glob.glob(f'{system_dir}/*.csv'):
            df = pd.read_csv(f)
            df = df.truncate(after=reclen)

            if thpt is None:
                thpt = df['throughput']
            else:
                thpt = thpt.add(df['throughput'], fill_value=0)
            
            latency = df[['get_p50', 'get_p99', 'put_p50', 'put_p99']]
            lats = pd.concat([lats, latency])
        
        lats = lats.groupby(lats.index).mean().astype(int)
        total = pd.concat([thpt / 1e6, lats / 1e3], axis=1)
        results[sys] = total

    # Dump
    with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
        for sys in results:
            results[sys].to_excel(writer, sheet_name=sys)


# Do collection
collect_slowdown()

## Section 6.3: Twemcache

5. GET/PUT latency-throughput curves

In [3]:
def collect_twemcache(cluster: str, output_dir: str = 'res'):
    '''
    Section 6.3
    -----------

    Collect Twemcache trace throughput-latency curves.
    '''

    notebook_path = Path(os.getcwd())
    data_dir = notebook_path.parent / 'data'
    twemcache_dir = data_dir / 'twemcache' / cluster
    
    results = {}
    for system in EVALUATED_SYSTEMS:
        system_dir = twemcache_dir / system
        df_res = None
        
        # Closed-loop throughput (regarded as the maximum)
        closed_loop_dir = f'{system_dir}/closed'
        closed_thpt, _, closed_lat = collect_dir(closed_loop_dir)

        # Open-loop throughputs
        glob_pattern = f'{system_dir}/*.csv'
        for f in glob.glob(glob_pattern):
            offered_load = int(f.split('/')[-1].split('.')[0])
            if offered_load > closed_thpt:
                closed_thpt = 0

            df = pd.read_csv(f)

            # Drop the first and the last row
            df.drop(df.head(1).index, inplace=True)
            df.drop(df.tail(1).index, inplace=True)

            # Get throughput data, pending merge
            throughput = df['throughput'].sort_index(ascending=False).reset_index(drop=True)
            throughput = throughput.max()

            # Get latency data, merge instantly
            latency = df[['get_p50', 'get_p99', 'put_p50', 'put_p99']]
            latency = latency.mean(axis=0)

            row = { 'thpt': offered_load + throughput / 1e6 }
            row['get_p50'] = latency['get_p50'] / 1e3
            row['get_p99'] = latency['get_p99'] / 1e3
            row['put_p50'] = latency['put_p50'] / 1e3
            row['put_p99'] = latency['put_p99'] / 1e3
        
            df_res = pd.concat([df_res, pd.DataFrame(row, index=[offered_load])])
            
        if closed_thpt > 0:
            row = { 'thpt': closed_thpt }
            row['get_p50'] = closed_lat['get_p50']
            row['get_p99'] = closed_lat['get_p99']
            row['put_p50'] = closed_lat['put_p50']
            row['put_p99'] = closed_lat['put_p99']
            df_res = pd.concat([df_res, pd.DataFrame(row, index=[closed_thpt])])

        # Sort by index
        df_res.sort_index(inplace=True)
        results[system] = df_res


    output_file = data_dir / output_dir / f'twemcache-{cluster}.xlsx'
    with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
        for system in EVALUATED_SYSTEMS:
            df_thpt = results[system]
            df_thpt.to_excel(writer, sheet_name=system)


CLUSTERS = ['04', '12', '27', '31']
for cluster in CLUSTERS:
    collect_twemcache(cluster)

## Section 6.4: Recovery

6. Recovery throughputs
7. Degraded read latencies

In [None]:
def collect_recovery(output_dir: str = 'res'):
    '''
    Section 6.4(a)
    --------------

    Collect recovery throughput.
    '''
    
    notebook_path = Path(os.getcwd())
    data_dir = notebook_path.parent / 'data'

    # Prevent unnecessary re-run
    output_file = data_dir / output_dir / f'recovery.xlsx'
    if output_file.is_file():
        return

    # Recovery performance
    recovery_dir = data_dir / 'recovery'

    # Ensure the directory exists
    for system in EVALUATED_SYSTEMS:
        system_dir = recovery_dir / system
        if not system_dir.is_dir():
            raise Exception(f'Directory {system_dir} does not exist')

    AMPLIFICATION = {
        'nos': 1.75,
        'cocytus': 1.5,
        'pq': 1.5,
        'repl': 3,
        'split': 1.5,
    }
    TOTAL_OBJECTS = 50e6 / 32

    df_res = None
    for value_sz in EVALUATED_VALUE_SIZES:
        row = {}
        for system in EVALUATED_SYSTEMS:
            system_dir = recovery_dir / system

            df = pd.read_csv(f'{system_dir}/{value_sz}.csv')
        
            # Drop the first and the last row
            df.drop(df.head(1).index, inplace=True)
            df.drop(df.tail(1).index, inplace=True)

            # Get throughput data, pending merge
            throughput = df['throughput'].sort_index(ascending=False).reset_index(drop=True)
            throughput = throughput.max()

            row[system] = TOTAL_OBJECTS / throughput * AMPLIFICATION[system]
        
        df_res = pd.concat([df_res, pd.DataFrame(row, index=[value_sz])])

    # Dump
    with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
        df_res.to_excel(writer, sheet_name='recovery')


# Do collection
collect_recovery()

In [None]:
def collect_degraded():
    '''
    Section 6.4(b)
    --------------

    Degraded read latency collector placeholder.
    '''

    raise NotImplementedError("degraded read latency is hand-collected!")

## Section 6.5: Memory Consumption

8. Memory consumption

In [None]:
def collect_memory(output_dir: str = 'res'):
    notebook_path = Path(os.getcwd())
    data_dir = notebook_path.parent / 'data'

    data_file = data_dir / 'memory.txt'
    output_file = data_dir / output_dir / 'memory.xlsx'

    present_systems = list(x for x in EVALUATED_SYSTEMS if x != 'pq')
    results = {}
    for line in open(data_file, 'r').readlines():
        parts = line.strip().split()
        sys = parts[0]
        conf = parts[1]
        value_size = parts[2]
        mem = int(parts[3])

        if sys.startswith('bsl-'):
            sys = sys[4:]

        if conf not in results:
            results[conf] = pd.DataFrame(columns=present_systems, index=EVALUATED_VALUE_SIZES)
        
        actual_mem_gb = mem * CLUSTER_SIZE / 1e6
        results[conf].at[value_size, sys] = actual_mem_gb

    with pd.ExcelWriter(output_file, engine='openpyxl') as writer:
        for config in results:
            df = results[config]
            df.to_excel(writer, sheet_name=config)


collect_memory()