# 

This notebook contains the scripts to run all experiments and generate all figures in the paper.
Please follow the these step:
1. Run `pipenv install` to install the python dependencies specified in `Pipfile` and `Pipfile.lock`.
2. Ensure `go version` is `1.20.6`.
3. Update the values of the variables in the following cell.
4. Run all cells in this notebook with the python environment created in Step 1.
5. The results will be saved in the directory `results/2023-bigdata/output/`.

In [None]:
# 
project_path = "~/projects/lollipop" # root of the repository
path_hive_comments = "~/hive-comments.txt"
path_wikipedia_growth = "~/wikipedia-growth.txt"
path_eth_transfers = "~/eth-transfers-t200m.txt.p"
path_flickr = "~/flickr-growth.txt"
path_roadnet = "~/roadNet-CA.txt.shuffled" # shuffled with random edge weights added # ignore this for now

# settings for i5-12600K
threads = [2, 4, 6] # ignore this for now
default_thread = 8
cpu_affinity_masks = {2: range(4), 4: range(8), 6: range(12)} # ignore this for now

In [None]:
import subprocess as sp
from pathlib import Path
import shutil
import re
import time
import statistics
import os, signal
from dataclasses import dataclass
import pandas as pd
import numpy as np

import matplotlib as mpl
import matplotlib.pyplot as plt
import matplotlib.ticker as mtick
import matplotlib.dates as mdates

mpl.rcParams['figure.dpi'] = 600

In [None]:
hive_comments_pagerank = [5880, 38806, 85033, 6379, 499, 43668, 27029, 3943, 396, 383716]
hive_comments_transpose_pagerank = [38806, 5880, 613937, 51025, 328593, 554966, 543979, 392586, 383716, 448577]
eth_transfers_pagerank = [3979551, 109769957, 73084091, 514110, 359557, 60806159, 13873739, 21585619, 28827290, 33082862]
eth_transfers_transpose_pagerank = [109421, 7724, 101776590, 3979551, 335165, 109769957, 1948, 87368785, 4875253, 13001]
wikipedia_pagerank = [73, 259, 9479, 6276, 1710, 864, 2169, 110, 10312, 69]
wikipedia_transpose_pagerank = [205298, 437592, 117461, 1184369, 1121429, 1550595, 205299, 205305, 1255505, 178961]
flickr_pagerank = [377, 657, 3160, 390, 786, 1467, 23, 994, 61, 376]
flickr_transpose_pagerank = [784, 27327, 3160, 3169, 6965, 67, 185034, 798, 110435, 657]
roadnet_pagerank = [225438, 287362, 562818, 241926, 521168]

In [None]:
go = Path(shutil.which("go"))
project_path = Path(project_path).expanduser()
path_hive_comments = Path(path_hive_comments).expanduser()
path_wikipedia_growth = Path(path_wikipedia_growth).expanduser()
path_eth_transfers = Path(path_eth_transfers).expanduser()
path_flickr = Path(path_flickr).expanduser()
path_roadnet = Path(path_roadnet).expanduser()
assert go.exists()
assert project_path.exists()
assert path_hive_comments.exists()
assert path_wikipedia_growth.exists()
assert path_eth_transfers.exists()
assert path_flickr.exists()
assert path_roadnet.exists()

# TODO: check hashes of graph files

push_relabel_code = project_path / "cmd" / f"lp-push-relabel"
assert push_relabel_code.exists()

In [None]:
def create_results_directory(top_dir: Path):
    results_dir = top_dir / f"2023-bigdata"
    log_dir = results_dir / "log"
    ts_dir = results_dir / "ts"
    output_dir = results_dir / "output"
    results_dir.mkdir(exist_ok=True)
    log_dir.mkdir(exist_ok=True)
    ts_dir.mkdir(exist_ok=True)
    output_dir.mkdir(exist_ok=True)
    return results_dir, output_dir, log_dir, ts_dir

(project_path / "results").mkdir(exist_ok=True)
results_dir, output_dir, log_dir, ts_dir = create_results_directory(project_path / "results")
path_timeseries_output = project_path / "results" / "push-relabel-timeseries.csv"
path_timeseries_flow_output = project_path / "results" / "push-relabel-timeseries-flow.csv"

In [None]:
@dataclass(kw_only=True)
class Graph:
    name: str
    path: Path
    pw: int
    pt: int
    undirected: bool = False

@dataclass(kw_only=True)
class MaxFlowTestCase:
    graph: Graph
    st_index: int
    s: int
    t: int

    def get_name(self) -> str:
        return f"{self.graph.name}-{self.st_index}"

@dataclass(kw_only=True)
class BasicTestCase(MaxFlowTestCase):
    threads: int = default_thread

@dataclass(kw_only=True)
class StaticTestCase(BasicTestCase):
    pass

@dataclass(kw_only=True)
class DynamicTestCase(BasicTestCase):
    target_ingest_rate: int = 0
    delete_window: int = 0

@dataclass(kw_only=True)
class TimeseriesTestCase(DynamicTestCase):
    interval_time: int = None
    interval_edge: int = None
    track_progress: bool = False

    def __post_init__(self):
        assert (self.interval_time and not self.interval_edge) or \
            (not self.interval_time and self.interval_edge), \
            "Exactly one of dt or de must set"

graph_hive_comments = Graph(name="Hive-comments", path=path_hive_comments, pw=None, pt=1)
graph_wikipedia_growth = Graph(name="Wikipedia-growth", path=path_wikipedia_growth, pw=None, pt=2)
graph_eth_transfers = Graph(name="Eth-transfers-t200m", path=path_eth_transfers, pw=1, pt=None) # TODO: fix pw and pt
graph_flickr_growth = Graph(name="Flickr-growth", path=path_flickr, pw=None, pt=2)
graph_roadnet = Graph(name="RoadNet-CA", path=path_roadnet, pw=1, pt=None)

tc_hive_comments = [MaxFlowTestCase(graph=graph_hive_comments, st_index=i, s=s, t=t) for i, (t, s) in enumerate(zip(hive_comments_pagerank, hive_comments_transpose_pagerank))]
tc_wikipedia_growth = [MaxFlowTestCase(graph=graph_wikipedia_growth, st_index=i, s=s, t=t) for i, (t, s) in enumerate(zip(wikipedia_pagerank, wikipedia_transpose_pagerank))]
tc_eth_transfers = [MaxFlowTestCase(graph=graph_eth_transfers, st_index=i, s=s, t=t) for i, (t, s) in enumerate(zip(eth_transfers_pagerank, eth_transfers_transpose_pagerank))]
tc_flickr_growth = [MaxFlowTestCase(graph=graph_flickr_growth, st_index=i, s=s, t=t) for i, (t, s) in enumerate(zip(flickr_pagerank, flickr_transpose_pagerank))]
tc_roadnet = [MaxFlowTestCase(graph=graph_roadnet, st_index=0, s=roadnet_pagerank[0], t=roadnet_pagerank[1])]

In [None]:
@dataclass
class StaticResult:
    test_case: StaticTestCase
    time_alg: int
    time_general: int

@dataclass
class TimeseriesResult:
    test_case: TimeseriesTestCase

    ingest_rate_actual: int
    time_termination: int
    latency_mean: int
    query_count: int
    timeseries: pd.DataFrame
    timeseries_flow: pd.DataFrame
    progress: pd.DataFrame

def get_static_result(tc: StaticTestCase, path_log: Path) -> StaticResult:
    assert isinstance(tc, StaticTestCase)

    log = path_log.read_text()

    time_alg = re.findall(r"Termination: ([0-9]+)", log)
    assert len(time_alg) == 1 
    time_alg = int(time_alg[0])

    time_general = re.findall(r"Total including streaming: ([0-9]+)", log)
    assert len(time_general) == 1 
    time_general = int(time_general[0])

    max_flow = re.findall(r"Maximum flow is ([0-9]+)", log)
    assert len(max_flow) == 1 
    max_flow = int(max_flow[0])
    
    return StaticResult(test_case=tc, time_alg=time_alg, time_general=time_general)

def get_ts_result(tc: TimeseriesTestCase, path_log: Path, path_timeseries: Path, path_timeseries_flow: Path) -> TimeseriesResult:
    assert isinstance(tc, TimeseriesTestCase)

    log = path_log.read_text()

    time_termination = re.findall(r"Termination: ([0-9]+)", log)
    assert len(time_termination) == 1 
    time_termination = int(time_termination[0])

    ingest_rate_actual = re.findall(r"TotalRate ([0-9]+)", log)
    assert len(ingest_rate_actual) >= 1 
    ingest_rate_actual = int(ingest_rate_actual[-1])

    assert path_timeseries.exists()
    timeseries = pd.read_csv(path_timeseries)

    latency_mean = timeseries.loc[:, 'Latency'].mean()
    query_count = timeseries.shape[0]

    timeseries_flow = None
    if path_timeseries_flow:
        timeseries_flow = pd.read_csv(path_timeseries_flow)

    progress = None
    if tc.track_progress:
        progress = []
        progress_matches = re.findall(r"Current Progress, Time: ([0-9]+), SourceSent: ([0-9]+), SinkReceived: ([0-9]+)", log)
        progress_matches = [(int(a), "flow", int(b), int(c), None) for a, b, c in progress_matches]
        query_start_matches = re.findall(r"ExecuteQuery start, Time: ([0-9]+), entry: ([0-9]+)", log)
        query_start_matches = [(int(a), "query-triggered", None, None, int(b)) for a, b in query_start_matches]
        query_end_matches = re.findall(r"ExecuteQuery end, Time: ([0-9]+), entry: ([0-9]+)", log)
        query_end_matches = [(int(a), "query-done", None, None, int(b)) for a, b in query_end_matches]

        progress = pd.DataFrame(progress_matches + query_start_matches + query_end_matches, columns=["time", "type", "source-sent", "sink-received", "entry"])
        progress.sort_values("time")


    return TimeseriesResult(test_case=tc, ingest_rate_actual=ingest_rate_actual, 
                            time_termination=time_termination, latency_mean=latency_mean, 
                            query_count=query_count, 
                            timeseries=timeseries, timeseries_flow=timeseries_flow,
                            progress=progress)

In [None]:
def run_algorithm(tc: BasicTestCase, repeat_index: int = 0, correctness: bool = True, add_flags: list[str] = [], suffix: str = "", cpu_affinity: bool = False, no_skip: bool = False) -> Path:
    assert isinstance(tc, BasicTestCase)

    # compile command and log filename
    cmd = [go, "run", push_relabel_code, "-nc", f"-t={tc.threads}", f"-g={tc.graph.path}"]
    log_filename = f"push-relabel-t={tc.threads}-g={tc.graph.path.name}"
    if tc.graph.undirected:
        cmd.append("-u")
        log_filename += "-u"
    if tc.graph.pw:
        cmd += [f"-pw={tc.graph.pw}"]
        log_filename += f"-pw={tc.graph.pw}"
    if tc.graph.pt:
        cmd += [f"-pt={tc.graph.pt}"]
        log_filename += f"-pt={tc.graph.pt}"

    cmd += [f"-S={tc.s}", f"-T={tc.t}"]
    log_filename += f"-S={tc.s}-T={tc.t}"

    if correctness:
        cmd += ["-c"]
        log_filename += "-c"
    if cpu_affinity:
        log_filename += "-cpuaff"

    cmd += add_flags
    log_filename += f"{suffix}.{repeat_index}.log"

    # Run command
    log_path = log_dir / log_filename
    print(f"Command: {cmd}")
    print(f"Log path: {log_path}")

    if (not no_skip) and log_path.exists():
        print(f"Skipped as the log file exists")
        return log_path

    path_log_temp = log_dir / f"_running-{log_filename}"
    with open(path_log_temp, "w+t") as log_file:
        def preexec_fn():
            os.setpgrp()
            if cpu_affinity:
                os.sched_setaffinity(os.getpid(), cpu_affinity_masks[tc.threads])
        process = sp.Popen(cmd, cwd=project_path, stdout=log_file, stderr=sp.STDOUT, preexec_fn=preexec_fn)
        time.sleep(0.5)
        if cpu_affinity:
            assert set(cpu_affinity_masks[tc.threads]) == set(os.sched_getaffinity(process.pid)), \
                f"{cpu_affinity_masks[tc.threads]} != {os.sched_getaffinity(process.pid)}"
        try:
            returncode = process.wait()
        except KeyboardInterrupt as ki:
            os.killpg(os.getpgid(process.pid), signal.SIGTERM)
            process.kill()
            raise ki
        if returncode != 0:
            assert False, f"Return code is none-zero: {returncode}"
    
    path_log_temp.rename(log_path)
    print(f"Log saved")
    
    return log_path

def run_algorithm_static(tc: StaticTestCase, **kwargs) -> StaticResult:
    assert isinstance(tc, StaticTestCase)

    path_log = run_algorithm(tc=tc, **kwargs)
    return get_static_result(tc=tc, path_log=path_log)

def run_algorithm_ts(tc: TimeseriesTestCase, repeat_index: int = 0, stability: bool = False, no_skip: bool = False, **kwargs) -> TimeseriesResult:
    assert isinstance(tc, TimeseriesTestCase)
    assert (tc.interval_time and not tc.interval_edge) or (not tc.interval_time and tc.interval_edge), "Exactly one of dt or de must set"
    assert not tc.track_progress or not stability

    path_timeseries_output.unlink(missing_ok=True)
    
    add_flags = ["-tquery"]
    if tc.interval_time:
        add_flags += [f"-dt={tc.interval_time}"]
    if tc.interval_edge:
        add_flags += [f"-de={tc.interval_edge}"]
    if tc.target_ingest_rate:
        add_flags += [f"-dr={tc.target_ingest_rate}"]
    if tc.delete_window:
        add_flags += [f"-w={tc.delete_window}"]
    if tc.track_progress:
        add_flags += [f"-debug=2"]
    
    if stability:
        add_flags.append("-Stability")

    suffix = "".join(add_flags)

    kwargs["add_flags"] = add_flags + kwargs.get("add_flags", [])
    kwargs["suffix"] = suffix + kwargs.get("suffix", "")
    path_log = run_algorithm(tc=tc, repeat_index=repeat_index, no_skip=no_skip, **kwargs)

    path_timeseries = ts_dir / (path_log.name[:-4] + f".csv")
    path_timeseries_flow = None
    if not no_skip and path_timeseries.exists():
        print(f"Timeseries already exists: {path_timeseries}")
        if stability:
            path_timeseries_flow = ts_dir / (path_log.name[:-4] + f"-flow.csv")
            assert path_timeseries_flow.exists()
    else:
        assert path_timeseries_output.exists(), "No timeseries output found"
        path_timeseries_output.rename(path_timeseries)
        print(f"Timeseries saved to: {path_timeseries}")
    
        if stability:
            path_timeseries_flow = ts_dir / (path_log.name[:-4] + f"-flow.csv")
            assert path_timeseries_flow_output.exists()
            path_timeseries_flow_output.rename(path_timeseries_flow)
                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                 
    return get_ts_result(tc=tc, path_log=path_log, path_timeseries=path_timeseries, path_timeseries_flow=path_timeseries_flow)

In [None]:
class SaturationTest:
    def __init__(self, testcases: list[list[BasicTestCase]], query_intervals: list[int]) -> None:
        self.testcases = testcases
        self.query_intervals = sorted(query_intervals)
        self.common_rates = [("Ethereum Transactions", 12, 4), ("VisaNet Transactions", 6000, -11), ("Worldwide Emails Sent/Received", 10000, 4)]

        indices = self.query_intervals
        columns = []
        for tcs in testcases:
            columns += [f"{tcs[0].graph.name}-top", f"{tcs[0].graph.name}-mid", f"{tcs[0].graph.name}-bot"]
        self.results = pd.DataFrame(index=indices, columns=columns)

    def run(self) -> None:
        for interval in self.query_intervals:
            for tcs_graph in self.testcases:
                throughputs = []
                for tc in tcs_graph:
                    tc_ts = TimeseriesTestCase(interval_time=interval, **tc.__dict__)
                    result = run_algorithm_ts(tc=tc_ts)
                    throughputs.append(result.ingest_rate_actual)
                self.results.at[interval, f"{tcs_graph[0].graph.name}-top"] = max(throughputs)
                self.results.at[interval, f"{tcs_graph[0].graph.name}-mid"] = statistics.median(throughputs)
                self.results.at[interval, f"{tcs_graph[0].graph.name}-bot"] = min(throughputs)
        return self.results

    def plot(self, figsize) -> None:
        fig, ax = plt.subplots(1, 1, figsize=figsize)
        x = self.results.index.astype(int)
        ax.set_xscale('log')
        ax.set_yscale('log')
        ax.set_xticks(self.results.index)
        ax.set_xticklabels(self.results.index)
        ax.set(ylim=[1e1, 1e7])
        ax.invert_xaxis()
        for tcs in self.testcases:
            name = tcs[0].graph.name
            ax.plot(x, self.results.loc[:, f"{name}-mid"], marker = '.', markersize = 5, label=f"{name}")
            ax.fill_between(x, self.results.loc[:, f"{name}-top"].astype(int), self.results.loc[:, f"{name}-bot"].astype(int), alpha=0.4)
        ax.xaxis.grid(alpha=0.3)
        ax.yaxis.grid(alpha=0.3)
        fig.legend(bbox_to_anchor=(0.51, 0.92), loc='center', ncol=3, framealpha=1, prop={'size': 7.5})

        for label, rate, y_offset in self.common_rates:
            lines = ax.plot([min(x), max(x)], [rate]*2)
            ax.annotate(f"{label}", (max(x), rate), xytext=(1, y_offset), textcoords='offset points', color=lines[0].get_color())
        
        ax.set_xlabel(f"Query Interval (days)", labelpad=6)
        ax.set_ylabel(f"Throughput (events/second)", labelpad=6)

        return fig

n = 10
saturation_test = SaturationTest(testcases=[tc_eth_transfers[:n], tc_hive_comments[:n], tc_wikipedia_growth[:n], tc_flickr_growth[:n]], query_intervals=[1024, 256, 64, 16, 4, 1])
result = saturation_test.run()
fig = saturation_test.plot((5,3))
fig.savefig(output_dir / "saturation.pdf", bbox_inches='tight')
result

In [None]:
class ProgressExperiment():
    def __init__(self, testcases: (list[BasicTestCase], str), query_interval: int) -> None:
        self.testcases = testcases
        self.query_interval = query_interval
        self.results = None
    
    def run(self) -> (pd.DataFrame, pd.DataFrame):
        self.results = []
        for tc, name in self.testcases:
            tc_ts = TimeseriesTestCase(interval_time=self.query_interval, track_progress=True, **tc.__dict__)
            result = run_algorithm_ts(tc=tc_ts)
            self.results.append((name, result.progress))
        return self.results

    def plot(self, figsize) -> plt.Figure:
        results = self.results if self.results != None else self.run()
        fig, axs = plt.subplots(len(results), 1, figsize=figsize)
        if len(results) == 1:
            axs = [axs]

        for ax, (name, progress) in zip(axs, results):
            flow_progress = progress.loc[progress["type"] == "flow"]
            x_max = progress.iloc[-1]["time"]
            y_max = max(progress.loc[progress["type"] == "flow"]["source-sent"])*2

            ax.plot("time", "source-sent", data=flow_progress, marker='.', markersize=5, label=f"Source Sent")
            ax.plot("time", "sink-received", data=flow_progress, marker='.', markersize=5, label=f"Sink Received")
            ax.vlines(progress.loc[progress["type"] == "query-triggered"]["time"], 0, y_max, label=f"Query Triggered", color="red", linewidth=0.5)
            ax.vlines(progress.loc[progress["type"] == "query-done"]["time"], 0, y_max, label=f"Query Finished", color="green", linewidth=0.5)
            ax.yaxis.grid(alpha=0.5)
            ax.set(xlim=[0, x_max], ylim=[1, y_max], yscale="log")
            ax.title.set(text=name, size=10)
        
        axs[-1].set_xlabel("Algorithm Time (ms)")

        handles, labels = axs[-1].get_legend_handles_labels()
        fig.legend(handles, labels, bbox_to_anchor=(1, 0.96), loc='upper left', ncol=1, prop={'size': 10})
        fig.text(-0.03, 0.56, "Flow Amount", va='center', rotation='vertical')
        fig.tight_layout(pad=0.5)
        return fig

progress_experiment = ProgressExperiment([
    (tc_hive_comments[0], f"Subplot 1: Hive-comments, Most Popular (s,t)"),
    (tc_hive_comments[4], f"Subplot 2: Hive-comments, 5th Most Popular (s,t)"),
    (tc_wikipedia_growth[0], f"Subplot 3: Wikipedia-growth, Most Popular (s,t)")
    ], 128)
# result = progress_experiment.run()
fig = progress_experiment.plot((7,5))
fig.savefig(output_dir / "progress.pdf", bbox_inches='tight')

In [None]:
class ScalabilityTest():
    def __init__(self, testcases: list[BasicTestCase], interval_time: int = None, interval_edge: int = None) -> None:
        self.testcases_ts = [
            [
                TimeseriesTestCase(interval_time=interval_time, interval_edge=interval_edge, threads=t, **tc.__dict__) 
                for tc in testcases
            ]
            for t in threads
        ]
        self.testcases_static = [
            [
                StaticTestCase(threads=t, **tc.__dict__) 
                for tc in testcases
            ]
            for t in threads
        ]

        indices = [threads]
        columns = ["speedup-static-alg", "speedup-static-total", "speedup-ts-latency-each", "speedup-ts-latency-all", "speedup-ts-total-runtime"]
        columns += ["static-alg", "static-total", "ts-latency", "ts-total-runtime"]
        self.columns_ts = [f"ts-{tc.get_name()}" for tc in testcases]
        self.columns_static = [f"static-{tc.get_name()}" for tc in testcases]
        columns += self.columns_ts + self.columns_static
        self.results = pd.DataFrame(index=indices, columns=columns)
    
    def run(self) -> pd.DataFrame:
        self._run_timeseries()
        self._run_static()
        return self.results

    def _run_static(self) -> None:
        for tc_row in self.testcases_static:
            for tc in tc_row:
                result = run_algorithm_static(tc=tc, cpu_affinity=True)
                self.results.at[tc.threads, f"static-{tc.get_name()}"] = result
        
        for i, row in self.results.iterrows():
            # mean
            algs, totals = [], []
            for c in self.columns_static:
                algs.append(row[c].time_alg)
                totals.append(row[c].time_general)
            self.results.at[i, "static-alg"] = statistics.mean(algs)
            self.results.at[i, "static-total"] = statistics.mean(totals)

            # speedup
            row_0 = self.results.iloc[0]
            speedup_static_alg = []
            speedup_static_total = []
            for c in self.columns_static:
                speedup_static_alg.append(row_0[c].time_alg / row[c].time_alg)
                speedup_static_total.append(row_0[c].time_general / row[c].time_general)
            self.results.at[i, "speedup-static-alg"] = statistics.geometric_mean(speedup_static_alg)
            self.results.at[i, "speedup-static-total"] = statistics.geometric_mean(speedup_static_total)
    
    def _run_timeseries(self) -> None:
        for tc_row in self.testcases_ts:
            for tc in tc_row:
                result = run_algorithm_ts(tc=tc, cpu_affinity=True)
                self.results.at[tc.threads, f"ts-{tc.get_name()}"] = result

        for i, row in self.results.iterrows():
            # mean
            latency_means = []
            latencies_all = []
            total_runtimes = []
            for c in self.columns_ts:
                result: TimeseriesResult = row[c]
                latency_means.append(result.latency_mean)
                latencies_all += result.timeseries.loc[:, "Latency"].tolist()
                total_runtimes.append(result.time_termination)
            self.results.at[i, "ts-latency"] = statistics.mean(latency_means)
            self.results.at[i, "ts-total-runtime"] = statistics.mean(total_runtimes)

            # speedup
            row_0 = self.results.iloc[0]
            speedup_latency_each, speedup_latency_mean, speedup_total_runtime = [], [], []
            for c in self.columns_ts:
                speedup_latency_mean.append(row_0[c].latency_mean / row[c].latency_mean)
                speedup_total_runtime.append(row_0[c].time_termination / row[c].time_termination)
                speedup_latency_each += list(row_0[c].timeseries.loc[:, "Latency"] / row[c].timeseries.loc[:, "Latency"])
                assert(row_0[c].timeseries.shape == row[c].timeseries.shape)

            self.results.at[i, "speedup-ts-latency-each"] = statistics.geometric_mean(speedup_latency_each)
            self.results.at[i, "speedup-ts-latency-all"] = statistics.geometric_mean(speedup_latency_mean)
            self.results.at[i, "speedup-ts-total-runtime"] = statistics.geometric_mean(speedup_total_runtime)

scalability_test = ScalabilityTest(testcases=tc_wikipedia_growth[:5], interval_time=128)
# scalability_test.run()  

In [None]:
class SlidingWindowTest():
    def __init__(self, testcases: list[BasicTestCase], delete_window: int, query_intervals: list[int]) -> None:
        self.delete_window = delete_window
        self.query_intervals = query_intervals
        self.testcases_ts = [
            [
                (interval, [
                    TimeseriesTestCase(interval_time=interval, **tc.__dict__)
                    for tc in tcs
                ])
                for interval in query_intervals
            ]
            for tcs in testcases
        ]

        self.columns_runtime = []
        self.columns_latency = []
        for i in query_intervals:
            self.columns_runtime += [f"runtime-{i}-a", f"runtime-{i}-d"]
        for i in query_intervals:
            self.columns_latency += [f"latency-{i}-a", f"latency-{i}-d"]
        self.results = pd.DataFrame(index=[tcs[0].graph.name for tcs in testcases], columns=self.columns_runtime+self.columns_latency)

    def run(self) -> pd.DataFrame:
        for tc_row in self.testcases_ts:
            for interval, tcs in tc_row:
                add_termination, add_latency, del_termination, del_latency = [], [], [], []
                for tc_add in tcs:
                    assert interval == tc_add.interval_time

                    tc_del = TimeseriesTestCase(**tc_add.__dict__)
                    tc_del.delete_window = self.delete_window

                    result_add = run_algorithm_ts(tc=tc_add)
                    result_del = run_algorithm_ts(tc=tc_del)

                    add_termination.append(result_add.time_termination)
                    add_latency.append(result_add.latency_mean)
                    del_termination.append(result_del.time_termination)
                    del_latency.append(result_del.latency_mean)
                print(add_latency)
                self.results.at[tc_add.graph.name, f"runtime-{interval}-a"] = np.nanmean(add_termination)
                self.results.at[tc_add.graph.name, f"latency-{interval}-a"] = np.nanmean(add_latency)
                self.results.at[tc_del.graph.name, f"runtime-{interval}-d"] = np.nanmean(del_termination)
                self.results.at[tc_del.graph.name, f"latency-{interval}-d"] = np.nanmean(del_latency)
        self.results = self.results / 1000 # convert to seconds
        return self.results

    def save(self) -> None:
        results = self.run()

        tab_total = "% Total Runtime\n"
        df_total = results.loc[:, self.columns_runtime]
        tab_total += "& " + " & ".join([c[8:] for c in self.columns_runtime]) + " \\\\\n"
        for index, row in df_total.iterrows():
            tab_total += index + " & " + " & ".join([f"{v:.2f}" for v in row.values]) + " \\\\\n"

        tab_latency = "% Average Latency\n"
        df_latency = results.loc[:, self.columns_latency]
        tab_latency += "& " + " & ".join([c[8:] for c in self.columns_latency]) + " \\\\\n"
        for index, row in df_latency.iterrows():
            tab_latency += index + " & " + " & ".join([f"{v:.2f}" for v in row.values]) + " \\\\\n"

        tab_deletions = tab_total + "\n" + tab_latency
        with open(output_dir / "tab-deletions.txt", "w") as f:
            f.write(tab_deletions)

n = 10
sliding_window = SlidingWindowTest(testcases=[tc_eth_transfers[:n], tc_hive_comments[:n], tc_wikipedia_growth[:n], tc_flickr_growth[:n]], delete_window=120, query_intervals=[128, 64, 32])
result = sliding_window.run()
sliding_window.save()
result

In [None]:
def get_averaged_ts(dfs: list[pd.DataFrame]) -> pd.DataFrame:
    # TODO: Fix this
    assert len(dfs) > 0
    output_df = pd.DataFrame()
    output_df.loc[:, "Date"] = pd.to_datetime(output_df.loc[:, "Date"])
    return output_df

class RateLimitingTest():
    def __init__(self, testcases: list[BasicTestCase], delete_window: int, query_intervals: list[int], ingest_rates: list[int]) -> None:
        self.delete_window = delete_window
        self.testcases_basic = testcases
        self.query_intervals = query_intervals
        self.ingest_rates = ingest_rates
        self.testcases_ts = [
            [
                [
                    TimeseriesTestCase(interval_time=interval, delete_window=delete_window, target_ingest_rate=rate, **tc.__dict__)
                    for tc in testcases
                ]
                for rate in ingest_rates
            ]
            for interval in query_intervals
        ]

        indices = pd.MultiIndex.from_product([query_intervals, ingest_rates])
        columns = [tc.get_name() for tc in testcases] + ["ingest-rate", "latency-average"]
        self.results = pd.DataFrame(index=indices, columns=columns)

    def run(self) -> pd.DataFrame:
        for testcases_intervals in self.testcases_ts:
            for testcases_rates in testcases_intervals:
                for tc in testcases_rates:
                    result = run_algorithm_ts(tc=tc)
                    self.results.at[(tc.interval_time, tc.target_ingest_rate), tc.get_name()] = result
                    self.results.at[(tc.interval_time, tc.target_ingest_rate), "ingest-rate"] = result.ingest_rate_actual
                    self.results.at[(tc.interval_time, tc.target_ingest_rate), "latency-average"] = result.latency_mean
        return self.results 
    
    def plot_1(self, y_max, x_min, figsize, legend_box) -> plt.Figure:
        name = self.testcases_basic[0].name
        timeseries = self.results.iloc[0][name].timeseries
        timestamps, e = pd.to_datetime(timeseries["Date"]), timeseries["EdgeCount"]
        min_ts, max_ts, max_e = min(timestamps), max(timestamps), max(e)
        print(f"min(ts)={min_ts} max(ts)={max_ts} max(e)={max_e}")
        tc_names = [tc.get_name() for tc in self.testcases_basic]

        fig, axs = plt.subplots(len(self.query_intervals), len(self.ingest_rates), sharex=True, sharey=True, figsize=figsize)

        for row_index, interval in enumerate(self.query_intervals):
            for column_index, rate in enumerate(self.ingest_rates):
                ax = axs[row_index, column_index]

                print(f"{interval}, {rate}")
                results = self.results.loc[(interval, rate), tc_names].values.tolist()
                ts = get_averaged_ts([r.timeseries for r in results])
                print(ts)

                x, y1 = ts["Date"], ts["Latency"]
                # ax.patch.set_visible(False)
                # ax.set(xlim=[x_min, max_ts], ylim=[0,y_max], zorder=2, xticks=[])
                ax.plot(x, y1, marker = '.', markersize = 5, color="chocolate", label="Latency (ms) (left)")
                ax.fill_between(x, y1, alpha=0.2, facecolor="red", edgecolor=None)
        
        fig.tight_layout(pad=0.5)
        return fig
    
    def plot_2(self, figsize) -> plt.Figure:
        fig, axs = plt.subplots(len(self.query_intervals), 1, sharex=True, sharey=True, figsize=figsize)
        tc_names = [tc.get_name() for tc in self.testcases_basic]

        for row_index, interval in enumerate(self.query_intervals):
            ax = axs[row_index]
            x, y_mid, y_top, y_bottom = [r / 1e6 for r in self.ingest_rates], [], [], [] # [r / 1e6 for r in self.ingest_rates]
            for rate in self.ingest_rates:
                results = self.results.loc[(interval, rate), tc_names]
                latencies = []
                for result in results:
                    latencies += result.timeseries.loc[:, "Latency"].values.tolist()
                # rates = [result.ingest_rate_actual for result in results]
                # x.append(statistics.mean(rates) / 1e6)
                y_mid.append(statistics.median(latencies) / 1000)
                y_top.append(np.percentile(latencies, 80) / 1000)
                y_bottom.append(np.percentile(latencies, 20) / 1000)
            
            
            ax.plot(x, y_mid, marker = '.', markersize = 5, color="chocolate", label="Latency (ms)")
            ax.fill_between(x, y_bottom, y_top, alpha=0.2)
            # ax.set_ylabel(f"{interval} days", labelpad=7)
            # ax.set_ylabel(f"Result Latency (seconds)", labelpad=7)
            ax.annotate(f"Query Interval: {interval} days", xy=(0.99, 0.96), xycoords='axes fraction', horizontalalignment='right', verticalalignment='top')
        
        ax.set(ylim=[0, 6])
        ax.invert_xaxis()
        axs[-1].set_xlabel(f"Ingestion Rate (million events/second)", labelpad=7)
        fig.text(-0.03, 0.56, "Result Latency (seconds)", va='center', rotation='vertical')

        fig.tight_layout(pad=0.5)
        return fig

rate_limiting_test = RateLimitingTest(testcases=tc_hive_comments[:1], delete_window=0, query_intervals=[64, 32], ingest_rates=list(range(300000, 0, -50000))+[25000])
# rate_limiting_test = RateLimitingTest(testcases=tc_wikipedia_growth[:1], delete_window=0, query_intervals=[512, 256, 128, 64, 32], ingest_rates=range(5000000, 2000000, -250000))
results = rate_limiting_test.run()
# fig = rate_limiting_test.plot_1(0, 0, (6, 4), None)
fig = rate_limiting_test.plot_2((5, 3))
fig.savefig(output_dir / "rate-limiting.pdf", bbox_inches='tight')
results

In [None]:
class StabilityTest:
    def __init__(self, tc: BasicTestCase, interval_time: int, interval_edge: int) -> None:
        self.testcase = TimeseriesTestCase(interval_time=interval_time, interval_edge=interval_edge, **tc.__dict__)
        self.count_columns = ["total-flow-vertex-dynamic", "total-flow-vertex-static", "same-flow-vertex-dynamic", "same-flow-vertex-static"]
        self.percentage_columns = ["same-percent-dynamic", "same-percent-static"]
        self.columns = ["latency-dynamic", "latency-static"] + self.percentage_columns + self.count_columns
        self.count_columns_original = ["VertexNumInFlowDynamic", "VertexNumInFlowStatic", "VertexFlowSameDynamic", "VertexFlowSameStatic"]
    
    def run(self) -> pd.DataFrame:
        result_stability = run_algorithm_ts(tc=self.testcase, stability=True)
        result_latency = run_algorithm_ts(tc=self.testcase)

        index = pd.to_datetime(result_latency.timeseries.loc[:, "Date"])
        self.result = pd.DataFrame(index=index, columns=self.columns)

        self.result.loc[:, "latency-dynamic"] = result_latency.timeseries.loc[:, "Latency"]
        counts = result_stability.timeseries_flow.tail(self.result.shape[0]).loc[:, self.count_columns_original + ["StaticLatency"]].values
        self.result.loc[:, self.count_columns + ["latency-static"]] = counts
        self.result.loc[:, "same-percent-dynamic"] = self.result.loc[:, "same-flow-vertex-dynamic"] / self.result.loc[:, "total-flow-vertex-dynamic"]
        self.result.loc[:, "same-percent-static"] = self.result.loc[:, "same-flow-vertex-static"] / self.result.loc[:, "total-flow-vertex-static"]
        print(result_latency.ingest_rate_actual)
        return self.result

    def plot(self, y_min_percentage, y_max_latency, figsize) -> plt.Figure:
        result = self.run()
        x = result.index
        fig, ax = plt.subplots(figsize=figsize)
        ax.plot(result.index, result.loc[:, "same-percent-dynamic"], marker = '.', markersize = 5, label=f"Same Vertices, Dynamic (Left)")
        ax.plot(result.index, result.loc[:, "same-percent-static"], marker = '.', markersize = 5, label=f"Same Vertices, Static (Left)")

        ax.yaxis.set_major_formatter(mtick.PercentFormatter(1))
        ax.tick_params(axis='x', labelrotation = 45)
        ax.set_xticks(x[1:])
        ax.xaxis.set_major_formatter(mdates.DateFormatter("%Y-%m"))
        ax.set(xlim=[x[1], x[-1]], ylim=[y_min_percentage, 1], ylabel=f"% of Same Vertices", zorder=1)
        ax.patch.set_visible(False)
        ax.xaxis.grid(alpha=0.3)
        ax.yaxis.grid(alpha=0.3)
        ax.set_xlabel("Graph Evolution (Date)", labelpad=7)

        ax = ax.twinx()
        ax.plot(result.index, result.loc[:, "latency-dynamic"] / 1000, marker = 'x', markersize = 5, linestyle='dashed', label=f"Latency, Dynamic (Right)")
        ax.plot(result.index, result.loc[:, "latency-static"] / 1000, marker = 'x', markersize = 5, linestyle='dashed', label=f"Latency, Static (Right)")
        ax.set(ylim=[0, y_max_latency], ylabel=f"Result Latency (s)", zorder=1)

        fig.legend(bbox_to_anchor=(0.51, 1.03), loc='center', ncol=2, framealpha=1, prop={'size': 7.5})
        fig.tight_layout(pad=0.5)
        return fig

stability_test = StabilityTest(tc_wikipedia_growth[0], interval_time=128, interval_edge=None)
result = stability_test.run()
fig = stability_test.plot(y_min_percentage=0, y_max_latency=10, figsize=(6, 2.5))
fig.savefig(output_dir / "stability.pdf", bbox_inches='tight')
result