In [13]:
import math
import os
import sys
import warnings
from pprint import PrettyPrinter
from typing import Dict, List

import matplotlib.pyplot as plt
from barazmoon.twitter import twitter_workload_generator

pp = PrettyPrinter(indent=4)

# # Get an absolute path to the directory that contains parent files
# __file__ = globals().get("_dh", [__file__])[0]
# project_dir = os.path.abspath(os.path.join(__file__, "../../../"))
# sys.path.append(os.path.normpath(project_dir))

base_dir = "/home/cc/ipa/"
sys.path.append(base_dir)
print("sys.path:", sys.path)

import experiments.utils.drawing
from experiments.utils.constants import FIGURES_PATH, FINAL_RESULTS_PATH
from experiments.utils.drawing import draw_cumulative, draw_temporal_final, draw_temporal_final2
from experiments.utils.parser import AdaptationParser


sys.path: ['/home/cc/miniconda3/envs/central/lib/python39.zip', '/home/cc/miniconda3/envs/central/lib/python3.9', '/home/cc/miniconda3/envs/central/lib/python3.9/lib-dynload', '', '/home/cc/.local/lib/python3.9/site-packages', '/home/cc/miniconda3/envs/central/lib/python3.9/site-packages', '/home/cc/ipa/load_tester', '/home/cc/ipa/', '/home/cc/ipa/', '/home/cc/ipa/']


In [15]:
# Load types and series names
BURSTY = "Bursty"
STEADY_LOW = "Steady Low"
STEADY_HIGH = "Steady High"
FLUCTUATING = "Fluctuating"

series_load_type = {
    1: BURSTY,
    # 2: BURSTY,
    # 3: BURSTY,
    4: BURSTY,
    # # 5: BURSTY,
    6: STEADY_LOW,
    # # 7: STEADY_LOW,
    # # 8: STEADY_LOW,
    9: STEADY_LOW,
    # # 10: STEADY_LOW,
    11: STEADY_HIGH,
    # # 12: STEADY_HIGH,
    # # 13: STEADY_HIGH,
    14: STEADY_HIGH,
    # # 15: STEADY_HIGH,
    16: FLUCTUATING,
    # # 17: FLUCTUATING,
    # # 18: FLUCTUATING,
    19: FLUCTUATING,
    # # 20: FLUCTUATING,
}

series_names = {
    1: "IPA",
    # 2: "FA2-low",
    # 3: "FA2-high",
    4: "IPA-QL",
    # # 5: "RIM",
    6: "IPA",
    # # 7: "FA2-low",
    # # 8: "FA2-high",
    9: "IPA-QL",
    # # 10: "RIM",
    11: "IPA",
    # # 12: "FA2-low",
    # # 13: "FA2-high",
    14: "IPA-QL",
    # # 15: "RIM",
    16: "IPA",
    # # 17: "FA2-low",
    # # 18: "FA2-high",
    19: "IPA-QL",
    # # 20: "RIM",
}

metaseries = 21

# Organize series by load type
load_series = {load_type: [] for load_type in [BURSTY, STEADY_LOW, STEADY_HIGH, FLUCTUATING]}
for serie, load_type in series_load_type.items():
    load_series[load_type].append(serie)

pipeline_name = "video"

# Construct series paths
series_paths = {
    series: os.path.join(FINAL_RESULTS_PATH, "metaseries", str(metaseries), "series", str(series))
    for series in series_names.keys()
}

print(series_names.keys())


dict_keys([1, 4, 6, 9, 11, 14, 16, 19])


In [16]:
import importlib

# Initialize loaders
for series, series_path in series_paths.items():
    if os.path.exists(series_path):
        try:
            loaders[series] = AdaptationParser(
                series_path=series_path, model_name="video", type_of="router_pipeline"
            )
        except Exception as e:
            warnings.warn(f"Failed to initialize AdaptationParser for series {series} at {series_path}: {e}")
    else:
        warnings.warn(f"Series path does not exist for series {series}: {series_path}")

# If no loaders are available, exit early
if not loaders:
    raise FileNotFoundError("No valid series data found. Please check the series paths.")


In [17]:
# Display available loaders
loaders


{1: <experiments.utils.parser.AdaptationParser at 0x7fdae8037370>,
 4: <experiments.utils.parser.AdaptationParser at 0x7fda12b15b20>,
 6: <experiments.utils.parser.AdaptationParser at 0x7fda12b15c40>,
 9: <experiments.utils.parser.AdaptationParser at 0x7fdae065da30>,
 11: <experiments.utils.parser.AdaptationParser at 0x7fda12de6730>,
 14: <experiments.utils.parser.AdaptationParser at 0x7fdae8045160>,
 16: <experiments.utils.parser.AdaptationParser at 0x7fdae8045070>,
 19: <experiments.utils.parser.AdaptationParser at 0x7fda12b15b50>}

In [18]:
# Dictionaries to store configurations and metrics
accuracy_methods: Dict[int, str] = {}
adaptation_intervals: Dict[int, int] = {}
simulation_modes: Dict[int, str] = {}
configs: Dict[int, Dict] = {}

for series, loader in loaders.items():
    try:
        configs_exp = loader.load_configs()
        print(f"Series: {series} Config:")
        config = configs_exp.get("0.yaml")
        if config is None:
            warnings.warn(f"No '0.yaml' config found for series {series}. Skipping.")
            continue
        pp.pprint(config)
        configs[series] = config
        accuracy_methods[series] = config.get("accuracy_method", "default_method")
        adaptation_intervals[series] = config.get("adaptation_interval", 1)
        simulation_modes[series] = config.get("simulation_mode", "default_mode")
    except Exception as e:
        warnings.warn(f"Failed to load config for series {series}: {e}")

# Remove series without configs
available_series = set(configs.keys())
print(configs.keys())
if not available_series:
    raise ValueError("No series configurations loaded successfully.")


Series: 1 Config:
{   'accuracy_method': 'multiply',
    'adaptation_interval': 10,
    'allocation_mode': 'base',
    'alpha': 2,
    'backup_predictor_duration': 2,
    'backup_predictor_type': 'max',
    'baseline_mode': None,
    'batching_cap': 8,
    'benchmark_duration': 1,
    'beta': 1,
    'central_queue': True,
    'debug_mode': False,
    'distrpution_time': 30,
    'drop_limit': 10,
    'from_storage': [False, False],
    'gamma': 1e-06,
    'initial_active_model': ['yolov5n', 'resnet18'],
    'initial_batch': [1, 1],
    'initial_cpu_allocation': [1, 1],
    'initial_replica': [1, 1],
    'latency_margin': 0,
    'logs_enabled': False,
    'lowest_model_accuracy': 0.05,
    'metadata': 'bursty - ipa - cpu type: compute_cascadelake_r_ib',
    'metaseries': 21,
    'mode': 'exponential',
    'model_name': ['yolo', 'resnet-human'],
    'monitoring_duration': 2,
    'nodes': [   {   'cpu_request': '1',
                     'data_type': 'image',
                     'max_batch

In [19]:
# Load the sent workload with error handling
sent_loads: Dict[int, List[float]] = {}

for series in available_series:
    config = configs.get(series)
    if not config:
        continue
    workload_type = config.get("workload_type")
    workload_configs = config.get("workload_config", [])
    if not workload_configs:
        warnings.warn(f"No workload configuration found for series {series}. Skipping workload generation.")
        continue
    workload_config = workload_configs[0]
    try:
        start = workload_config["start"]
        end = workload_config["end"]
        damping_factor = workload_config.get("damping_factor", 1.0)
        sent_loads[series] = twitter_workload_generator(
            days=f"{start}-{end}", damping_factor=damping_factor
        )
    except Exception as e:
        warnings.warn(f"Failed to generate sent load for series {series}: {e}")


In [20]:
# Initialize results and simulation modes
results_all = []
# simulation_modes is already populated


In [None]:
# Load adaptation logs with error handling
adaptation_logs: Dict[int, Dict] = {}

for series, loader in loaders.items():
    try:
        adaptation_logs[series] = loader.load_adaptation_log()
    except Exception as e:
        warnings.warn(f"Failed to load adaptation log for series {series}: {e}")


In [None]:
series_changes = {}
for series in series_names.keys():
    series_changes[series] = loaders[series].series_changes(
        adaptation_log=adaptation_logs[series]
    )


 ## Comparing Predicted Load with the Recieved Load

In [None]:
first_sereis = list(series_changes.keys())[
    0
]  # load should be roughly similar among the series
recieved_load = series_changes[first_sereis]["recieved_load"]
recieved_load_x = [x for x in range(0, len(recieved_load))]
predicted_load = series_changes[first_sereis]["predicted_load"]
predicted_load_x = series_changes[first_sereis]["time_interval"]
sent_load = sent_loads[first_sereis]
sent_load_x = [x for x in range(0, len(sent_loads[first_sereis]))]


plt.figure(figsize=(5, 2))
plt.plot(recieved_load_x, recieved_load, label="recieved_load")
plt.plot(sent_load_x, sent_load, label="sent_load")
plt.plot(predicted_load_x, predicted_load, label="predicted_load")
# TODO add predicted reference load here
plt.legend()
plt.show()


In [None]:
# Initialize final_dict with metrics
final_dict: Dict[str, Dict] = {
    "replica_changes": {},
    "core_changes": {},
    "total_core_changes": {},
    "accuracy_changes": {},
    "measured_latency": {},
    "timeout_dics": {},
}

METRIC_TOTAL_CORE_CHANGES = "total_core_changes"
METRIC_ACCURACY_CHANGES = "accuracy_changes"
METRIC_MEASURED_LATENCY = "measured_latency"
METRIC_TIMEOUT_DICS = "timeout_dics"

METRICS = [
    METRIC_TOTAL_CORE_CHANGES,
    METRIC_ACCURACY_CHANGES,
    METRIC_MEASURED_LATENCY,
    METRIC_TIMEOUT_DICS,
]

latency_metric = "p99"  # [min, max, p99]

for metric in METRICS:
    if metric not in final_dict:
        final_dict[metric] = {}

for series, series_dict in series_changes.items():
    final_dict["replica_changes"][series] = {}
    final_dict["core_changes"][series] = {}
    final_dict[METRIC_TOTAL_CORE_CHANGES][series] = {}
    final_dict[METRIC_ACCURACY_CHANGES][series] = {}

    nodes = series_dict.get("nodes", {})
    for node_name, metrics in nodes.items():
        final_dict["replica_changes"][series][node_name] = metrics.get("replicas", [])
        final_dict["core_changes"][series][node_name] = metrics.get("cpu", [])
        final_dict[METRIC_ACCURACY_CHANGES][series][node_name] = metrics.get("accuracy", [])

    # Process per second results
    try:
        timeout_per_second, per_second_results = loaders[series].per_second_result_processing()
    except Exception as e:
        warnings.warn(f"Failed to process per second results for series {series}: {e}")
        continue

    metric_columns = [col for col in per_second_results.columns if latency_metric in col]
    final_dict[METRIC_MEASURED_LATENCY][series] = per_second_results[metric_columns].to_dict(orient="list")
    final_dict[METRIC_TIMEOUT_DICS][series] = {"timeout_per_second": timeout_per_second}

    # Calculate totals
    try:
        final_dict["replica_changes"][series]["total"] = [
            sum(x) for x in zip(*final_dict["replica_changes"][series].values())
        ]
        final_dict["core_changes"][series]["total"] = [
            sum(x) for x in zip(*final_dict["core_changes"][series].values())
        ]

        if accuracy_methods.get(series) == "sum":
            final_dict[METRIC_ACCURACY_CHANGES][series]["e2e"] = [
                sum(x) for x in zip(*final_dict["accuracy_changes"][series].values())
            ]
        elif accuracy_methods.get(series) == "multiply":
            final_dict[METRIC_ACCURACY_CHANGES][series]["e2e"] = [
                100 * math.prod(x) for x in zip(*final_dict["accuracy_changes"][series].values())
            ]

        for key in final_dict["replica_changes"][series].keys():
            if key == "total":
                continue
            replicas = final_dict["replica_changes"][series][key]
            cores = final_dict["core_changes"][series][key]
            final_dict[METRIC_TOTAL_CORE_CHANGES][series][key] = [x * y for x, y in zip(replicas, cores)]
    except Exception as e:
        warnings.warn(f"Failed to compute totals for series {series}: {e}")

# Clean up temporary dictionaries
final_dict.pop("replica_changes", None)
final_dict.pop("core_changes", None)


In [None]:
# Define metrics to plot
METRICS_TO_PLOT = [
    METRIC_TOTAL_CORE_CHANGES,
    METRIC_ACCURACY_CHANGES,
]

# Organize final data by load type
final_by_load_type: Dict[str, Dict] = {
    BURSTY: {metric: {} for metric in METRICS_TO_PLOT},
    STEADY_LOW: {metric: {} for metric in METRICS_TO_PLOT},
    STEADY_HIGH: {metric: {} for metric in METRICS_TO_PLOT},
    FLUCTUATING: {metric: {} for metric in METRICS_TO_PLOT},
}

for metric in METRICS_TO_PLOT:
    for series, data in final_dict.get(metric, {}).items():
        load_type = series_load_type.get(series)
        if not load_type:
            warnings.warn(f"No load type found for series {series}. Skipping.")
            continue
        series_name = series_names.get(series, f"Series {series}")
        final_by_load_type[load_type][metric][series_name] = data


In [None]:
# Reload drawing module to ensure latest changes are picked up
importlib.reload(experiments.utils.drawing)

# Define selected experiments configurations
selected_experiments = {
    METRIC_TOTAL_CORE_CHANGES: {
        "selection": ["total"],
        "title": "Cost",
        "ylabel": "Cost\n (cores)",
    },
    METRIC_ACCURACY_CHANGES: {
        "selection": ["e2e"],
        "title": "$PAS \cdot{} 10^2$",
        "ylabel": "$PAS \cdot{} 10^2$",
    },
    METRIC_MEASURED_LATENCY: {
        "selection": [f"e2e_{latency_metric}"],
        "title": "Latency",
        "ylabel": "Latency (s)",
    },
    METRIC_TIMEOUT_DICS: {
        "selection": [f"timeout_per_second"],
        "title": "SLA Violations",
        "ylabel": "SLA\n Violations",
    },
}

# Colors for series
serie_color = {
    "IPA": "#d7191c",
    "FA2-low": "#a1dab4",
    "FA2-high": "#41b6c4",
    "RIM-low": "#2c7fb8",
    "RIM": "#253494",
    "Q-Learning": "#000000",
}


In [None]:
# Plot the final metrics
try:
    experiments.utils.drawing.draw_temporal_final4(
        final_by_load_type,
        adaptation_interval=adaptation_intervals,
        selected_experiments=selected_experiments,
        serie_color=serie_color,
        hl_for_metric={},  # Assuming no highlights
        bbox_to_anchor=(0.55, 2.5 * len(METRICS_TO_PLOT) + 1.1),
        hspace=0.3,
        save=True,
        filename=os.path.join(FIGURES_PATH, f"metaseries-{metaseries}-{pipeline_name}.pdf"),
    )
except Exception as e:
    warnings.warn(f"Failed to draw final plots: {e}")
