# Motivation

We want to increase the confidence in our pipeline run results by running the same experiment pipelines with different
seeds.

This yields different evaluation metrics. In consequence, we want to aggregate (e.g. mean, median) the evaluation
metrics over runs.

In [None]:
from pathlib import Path
from modyn.supervisor.internal.pipeline_executor.models import PipelineLogs
import pandas as pd
from analytics.app.data.transform import logs_dataframe
from analytics.app.data.transform import dfs_models_and_evals
from copy import deepcopy
from collections import defaultdict
from modyn.supervisor.internal.grpc.enums import PipelineStage
from modyn.supervisor.internal.pipeline_executor.models import SingleEvaluationInfo

pipeline_files = [
    Path("/Users/robinholzinger/robin/dev/eth/modyn-2/.analytics.log/.data/pipeline_4/pipeline.log"),
    Path("/Users/robinholzinger/robin/dev/eth/modyn-2/.analytics.log/.data/pipeline_5/pipeline.log"),
    Path("/Users/robinholzinger/robin/dev/eth/modyn-2/.analytics.log/.data/pipeline_6/pipeline.log")
]

aggregated_log_path = Path("/Users/robinholzinger/robin/dev/eth/modyn-2/.analytics.log/.data/pipeline_2")

In [None]:
logs = [
    PipelineLogs.model_validate_json(pipeline_logfile.read_text())
    for pipeline_logfile in pipeline_files
]


In [None]:
# assert that all pipelines are the same except from the seed

candidates = [
    deepcopy(log) for log in logs
]
# set seeds to seed of first pipeline
for i, candidate in enumerate(candidates):
    candidate.config.pipeline.training.seed = candidates[0].config.pipeline.training.seed

assert all(
    [candidate.config == candidates[0].config for candidate in candidates]
), "Not all pipelines are the same (ignoring seed)"

In [None]:
dfs_logs = [
    logs_dataframe(log)
    for log in logs
]

In [None]:
max_sample_time = max([
    df["sample_time"].max()
    for df in dfs_logs
])

In [None]:
dfs_models_evals: list[str, tuple[pd.DataFrame, pd.DataFrame | None, pd.DataFrame | None]] = [
    dfs_models_and_evals(log, max_sample_time)
    for log in logs
]

In [None]:
df_models = pd.concat(
    [
        _df_models for _df_models, _, _ in dfs_models_evals
    ]
)

df_eval_requests = pd.concat(
    [
        single_df_eval_requests
        for _, single_df_eval_requests, _ in dfs_models_evals
        if single_df_eval_requests is not None
    ]
)
df_eval_single = pd.concat(
    [
        _single_eval_df
        for _, _, _single_eval_df in dfs_models_evals
        if _single_eval_df is not None
    ]
)

In [None]:
# let's find the primary keys of the models

df_eval_single[
    (df_eval_single["id_model"] == 2)
    & (df_eval_single["eval_handler"] == "exactmatrix")
    & (df_eval_single["dataset_id"] == "cglm_hierarchical_min25-test")
    & (df_eval_single["interval_start"] == "2004-01-01")
    & (df_eval_single["interval_end"] == "2004-12-31")
    & (df_eval_single["metric"] == "Accuracy")
]

In [None]:
groups = df_eval_single.groupby(
    ["id_model", "eval_handler", "dataset_id", "interval_start", "interval_end", "metric"]
)

for size in groups.size():
    assert size == len(logs), "Wrong primary key"

aggregated_metrics = groups.agg({
    "value": "mean"
}).reset_index()
aggregated_metrics

In [None]:
from modyn.supervisor.internal.grpc.enums import PipelineStage
from modyn.supervisor.internal.pipeline_executor.models import SingleEvaluationInfo


aggregated_logs = deepcopy(logs[0])
for log in aggregated_logs.supervisor_logs.stage_runs:
    if log.id == PipelineStage.EVALUATE_SINGLE.name:
        assert isinstance(log.info, SingleEvaluationInfo)
        if not log.info.results:
            continue

        eval_req = log.info.eval_request

        # find aggregated value
        for metric in log.info.results["metrics"]:
            lookup = aggregated_metrics[
                (aggregated_metrics["id_model"] == eval_req.id_model)
                & (aggregated_metrics["eval_handler"] == eval_req.eval_handler)
                & (aggregated_metrics["dataset_id"] == eval_req.dataset_id)
                & (aggregated_metrics["interval_start"] == pd.to_datetime(eval_req.interval_start, unit="s"))
                & (aggregated_metrics["interval_end"] == pd.to_datetime(eval_req.interval_end, unit="s"))
                & (aggregated_metrics["metric"] == metric['name'])
            ]
            assert len(lookup) == 1, f"Primary key not unique: {metric['name']}"
            metric["result"] = float(lookup["value"])
        # print(log.info.eval_request)

In [None]:
df_eval_single[
    (df_eval_single["id_model"] == 2)
    & (df_eval_single["eval_handler"] == "exactmatrix")
    & (df_eval_single["dataset_id"] == "cglm_hierarchical_min25-test")
    & (df_eval_single["interval_start"] == "2004-01-01")
    & (df_eval_single["interval_end"] == "2004-12-31")
    & (df_eval_single["metric"] == "Accuracy")
]
# ["value"]

In [None]:

assert pd.to_datetime("2004-01-01").timestamp() == 1072915200
assert pd.to_datetime("2004-12-31").timestamp() == 1104451200

for log in aggregated_logs.supervisor_logs.stage_runs:
    if log.id == PipelineStage.EVALUATE_SINGLE.name:
        assert isinstance(log.info, SingleEvaluationInfo)
        if not log.info.results:
            continue
        if (
            (log.info.eval_request.id_model != 2)
            or (log.info.eval_request.eval_handler != "exactmatrix")
            or (log.info.eval_request.dataset_id != "cglm_hierarchical_min25-test")
            or (log.info.eval_request.interval_start != 1072915200)
            or (log.info.eval_request.interval_end != 1104451200)
        ):
            continue

        for metric in log.info.results["metrics"]:
            if metric["name"] == "Accuracy":
                print(metric)
                assert metric["result"] == (0.1 - 0.05 + 0) / 3

In [None]:
aggregated_logs.materialize(aggregated_log_path, mode="final")

In [None]:
aggregated_logs.supervisor_logs

# Batched pipeline

Given a directory containing pipeline logs and an output path, this merges all pipelines that are identical except for the seed.

In [None]:
log_dir = Path("/Users/mboether/phd/dynamic-data/sigmod-data/cglm-landmark/data_selection_50%/logs")
output = Path("/Users/mboether/phd/dynamic-data/sigmod-data/cglm-landmark/data_selection_50%/mean_logs")

In [None]:
logfiles = [logfile for logfile in log_dir.glob("**/pipeline.log") if (logfile.parent / "snapshot").exists()] # we only want pipeline.logs where we have a snapshot subdirectory, other files are probably re-executed evaluations

if output.exists():
    raise RuntimeError(f"{output} ttalready exists, we will not override data.")

output.mkdir(parents=True, exist_ok=False)

name_path_map = defaultdict(list)
for logfile in logfiles:
    name = (logfile.parent / ".name").read_text()
    log = PipelineLogs.model_validate_json(logfile.read_text())
    name_path_map[name].append(log)

for idx, (pipeline_name, logs) in enumerate(name_path_map.items()):
    print(f"Processing {len(logs)} runs for {pipeline_name}")
    pl_output = output / f"pipeline_{idx}"
    pl_output.mkdir(parents=True, exist_ok=False)
    
    # Step 1: validate that all pipelines are the same except for the ssed
    candidates = [
        deepcopy(log) for log in logs
    ]
    for i, candidate in enumerate(candidates):
        candidate.config.pipeline.training.seed = candidates[0].config.pipeline.training.seed
    
    assert all(
        [candidate.config == candidates[0].config for candidate in candidates]
    ), "Not all pipelines are the same (ignoring seed)"

    dfs_logs = [
        logs_dataframe(log)
        for log in logs
    ]
    max_sample_time = max([
        df["sample_time"].max()
        for df in dfs_logs
    ])
    dfs_models_evals: list[str, tuple[pd.DataFrame, pd.DataFrame | None, pd.DataFrame | None]] = [
        dfs_models_and_evals(log, max_sample_time)
        for log in logs
    ]
    df_eval_single = pd.concat(
        [
            _single_eval_df
            for _, _, _single_eval_df in dfs_models_evals
            if _single_eval_df is not None
        ]
    )
    groups = df_eval_single.groupby(
        ["model_idx", "eval_handler", "dataset_id", "interval_start", "interval_end", "metric"]
    )

    for size in groups.size():
        assert size == len(logs), f"Wrong primary key: {size}, {len(logs)}"
    
    aggregated_metrics = groups.agg({
        "value": "mean"
    }).reset_index()

    _, _, first_df = dfs_models_evals[0]
    id_model_map_df = first_df[["id_model", "model_idx"]]

    aggregated_logs = deepcopy(logs[0])
    for log in aggregated_logs.supervisor_logs.stage_runs:
        if log.id == PipelineStage.EVALUATE_SINGLE.name:
            assert isinstance(log.info, SingleEvaluationInfo)
            if not log.info.results:
                continue
    
            eval_req = log.info.eval_request
    
            # find aggregated value
            for metric in log.info.results["metrics"]:
                # This is broken
                model_idx = id_model_map_df[id_model_map_df["model_idx"] == eval_req.id_model].iloc[0].model_idx
                
                lookup = aggregated_metrics[
                    (aggregated_metrics["model_idx"] == model_idx)
                    & (aggregated_metrics["eval_handler"] == eval_req.eval_handler)
                    & (aggregated_metrics["dataset_id"] == eval_req.dataset_id)
                    & (aggregated_metrics["interval_start"] == pd.to_datetime(eval_req.interval_start, unit="s"))
                    & (aggregated_metrics["interval_end"] == pd.to_datetime(eval_req.interval_end, unit="s"))
                    & (aggregated_metrics["metric"] == metric['name'])
                ]
                assert len(lookup) == 1, f"Primary key not unique: {metric['name']}"
                metric["result"] = float(lookup["value"])
                
    aggregated_logs.materialize(pl_output, mode="final")

In [None]:
aggregated_metrics

In [None]:
first_df[first_df["model_idx"] == 1]
a = first_df[["id_model", "model_idx"]]
a[a["model_idx"] == 1].iloc[0].id_model