In [None]:
# Add analysis to the path
import sys
import pathlib
parent_dir = os.path.join(os.path.abspath(''), "..")
sys.path.append(parent_dir)

import parsers
from pprint import pprint
from aggregation import *
import matplotlib.pyplot as plt
from matplotlib import rc
import portion
from collections import namedtuple
from dataclasses import dataclass
import functools
from pprint import pprint

rc('font',**{'family': 'serif', 'size': 19})
rc('text', usetex=True)

path_to_results = os.path.normpath(os.path.join(parent_dir, "archive"))
print(path_to_results)
working_dir = os.path.normpath(os.path.join(parent_dir, "working"))
data = parsers.main(path_to_results, working_dir=working_dir)
print("Finished")

In [None]:
CpuEntry = namedtuple('CpuEntry', 'time cpu')
  
def extract_intervals(iterable, predicate):
    first_matched = None
    last = None
    for (idx, item) in enumerate(iterable):
        if predicate(item):
            last = idx
            if first_matched is None:
                first_matched = idx
        else:
            if first_matched is not None:
                interval = portion.closed(first_matched, idx)
                first_matched = None
                last = None
                yield interval
    if first_matched is not None and last is not None:
        yield portion.closed(first_matched, last)


def get_load_interval(host: parsers.TestHost, sampling_period=0.1, min_length: int=5, load_lower_bound=0.6) -> portion.Interval:
    load_interval_lists = []

    containers = host.containers()
    if not containers:
        return None

    for container in host.containers():
        # Normalize entries from radvisor and moby
        entries = []
        if container.radvisor:
            entries.extend(CpuEntry(time=entry.read, cpu=entry.cpu.total) for entry in container.radvisor[0].values())
        elif container.moby:
            entries.extend(CpuEntry(time=entry.read, cpu=entry.cpu.total) for entry in container.moby)
        else:
            continue

        # Calculate the CPU percentages from the times by using the CPU time/timestamp deltas
        interval_zipped_lter = zip(
            find_deltas([entry.cpu for entry in entries]),
            find_deltas([entry.time for entry in entries]),
            entries[1:])
        utilization_entries = [CpuEntry(time=entry.time / 1E6, cpu=float(c) / t)
            for (c, t, entry) in interval_zipped_lter]

        # Sample the CPU utilization
        times = [entry.time for entry in utilization_entries]
        cpus = [entry.cpu for entry in utilization_entries]
        min_time = min(times)
        max_time = max(times)
        sampling_period_ms = sampling_period * 1E3
        sampling_intervals = pd.interval_range(
            start=min_time, end=max_time, freq=sampling_period_ms)
        cpu_df = pd.DataFrame({'cpu': cpus, 'time': times})
        cpu_df['sampling_intervals'] = pd.cut(
            x=cpu_df['time'], bins=sampling_intervals, include_lowest=True)
        cpu_df = cpu_df.groupby('sampling_intervals').mean()
        sampled_entries = [CpuEntry(cpu=row["cpu"], time=row["time"]) for idx, row in cpu_df.iterrows() if row["cpu"] > 0]
        
        # Perform aggregation on measurements
        def make_time_intervals(predicate):
            idle_index_intervals = list(extract_intervals(sampled_entries, predicate))
            return [portion.closed(sampled_entries[i.lower].time, sampled_entries[i.upper].time)
                for i in idle_index_intervals]
        
        load_intervals = [interval for interval in make_time_intervals(
            lambda e: e.cpu >= load_lower_bound) if interval.upper - interval.lower >= min_length - 1]
        load_interval_lists.append(load_intervals)

    # Find universal intersections of intervals
    def combine_interval_lists(a, b):
        combined_product_intervals = (i & j for i in a for j in b)
        return [interval for interval in combined_product_intervals if not interval.empty]

    load_intervals = functools.reduce(lambda a, b: combine_interval_lists(a, b),
        load_interval_lists, portion.open(-portion.inf, portion.inf))

    # Take union of all universally intersected intervals
    return functools.reduce(lambda i1, i2: i1 | i2, load_intervals, portion.empty())


In [None]:
test = "i-rc-100"
best_lower_threshold = None
# for lower in np.linspace(0, 1)
for host in data[test].replicas[1].hosts.values():
    result = get_load_interval(host)
    if result is not None:
        pprint((host.replica_id, host.id))
        pprint(result)
print("Done")

In [None]:
def cpu_threshold_ts_intervals(host: parsers.TestHost, min_length: int=8, cpu_threshold: float=10, lower_threshold: bool=True) -> List[List[float]]:
    cpu_avg = aggregate_cpu(host, sampling_period=1.0)

    runs = []
    current_run = None
    for _, row in cpu_avg.iterrows():
        cpu = row['cpu']
        if (lower_threshold and cpu > cpu_threshold) or (not lower_threshold and cpu < cpu_threshold):
            if not current_run:
                current_run = []
            current_run.append(row['time'])
        else:
            if current_run:
                runs.append(current_run)
                current_run = None
    if current_run:
        runs.append(current_run)
    runs = [r for r in runs if len(r) > min_length]
    intervals = [(r[0], r[-1]) for r in runs]
    return intervals


def describe_intervals(test: parsers.Test, top=0.10, cpu_threshold=50, idle_cpu_threshold=30) -> List[str]:
    output = []
    output.append(f"==== {test.id} ====")
    # First, describe all intervals for the test
    hosts = flatten(replica.hosts.values() for replica in test.replicas)
    all_host_intervals = [host_collection_intervals(host) for host in hosts]
    all_container_intervals = flatten(all_host_intervals)
    all_intervals = flatten(all_container_intervals)
    intervals, _ = zip(*all_intervals)
    intervals_df = pd.DataFrame({'Read deltas (ms)': intervals})
    output.append(str(intervals_df.describe(include='all')))

    # Second, describe top percentage of intervals
    top_percent = []
    for container_list in all_container_intervals:
        container_intervals, _ = zip(*container_list)
        limit = np.quantile(container_intervals, 1 - top)
        top_percent.extend(i for i in container_intervals if i > limit)
    top_percent_df = pd.DataFrame({f'Top {top*100:.1f}% container read deltas (ms)': top_percent})
    output.append(str(top_percent_df.describe(include='all')))

    # Third, describe all intervals under load
    under_load = []
    for host, host_intervals in zip(hosts, all_host_intervals):
        threshold_intervals = cpu_threshold_ts_intervals(host, cpu_threshold=cpu_threshold)
        for container_list in host_intervals:
            for interval, timestamp in container_list:
                for lower, upper in threshold_intervals:
                    if lower <= timestamp <= upper:
                        under_load.append(interval)
                        break
    under_load_df = pd.DataFrame({f'Read deltas undder load (> {cpu_threshold:.1f}% CPU) (ms)': under_load})
    output.append(str(under_load_df.describe(include='all')))

    # Fourth, describe all intervals at idle
    at_idle = []
    for host, host_intervals in zip(hosts, all_host_intervals):
        threshold_intervals = cpu_threshold_ts_intervals(host, cpu_threshold=idle_cpu_threshold, lower_threshold=False)
        for container_list in host_intervals:
            for interval, timestamp in container_list:
                for lower, upper in threshold_intervals:
                    if lower <= timestamp <= upper:
                        at_idle.append(interval)
                        break
    at_idle_df = pd.DataFrame({f'Read deltas at idle (< {idle_cpu_threshold:.1f}% CPU) (ms)': at_idle})
    output.append(str(at_idle_df.describe(include='all')))
    output.append(f"=================")
    return output

In [None]:
tests = ["d-rc-50", "d-rc-100", "d-mc-50", "d-mc-100", "i-rc-50", "i-rc-100", "i-mc-50", "i-mc-100", "ii-rc-s", "ii-rc-b", "ii-mc-s", "ii-mc-b"]
num = len(tests)
done = 0
with Pool(cpu_count()) as pool:
    for output in pool.imap_unordered(describe_intervals, (data[test] for test in tests)):
        print("\n".join(output))
        print(f"{done+1}/{num} done")
        done += 1
