In [22]:
from __future__ import annotations

from pathlib import Path
from typing import List, Tuple, Optional, Dict, Any

import pandas as pd

ROOT = Path.cwd()
EVAL_DIR_NAME = "eval_results"

pd.set_option("display.max_colwidth", 300)
pd.set_option("display.width", 140)

In [23]:
def find_eval_results_dirs(root: Path, eval_dir_name: str = "eval_results") -> List[Path]:
    eval_dirs = []
    for sub in root.iterdir():
        if not sub.is_dir():
            continue
        for p in sub.rglob(eval_dir_name):
            if p.is_dir() and p.name == eval_dir_name:
                eval_dirs.append(p)
    return sorted(eval_dirs)


def index_pairs(eval_dir: Path) -> List[Tuple[str, Path, Path]]:
    bench = {}
    verif = {}

    for f in eval_dir.iterdir():
        if not f.is_file():
            continue
        name = f.name
        if name.startswith("benchmark_"):
            run_id = name[len("benchmark_"):]
            bench[run_id] = f
        elif name.startswith("verification_"):
            run_id = name[len("verification_"):]
            verif[run_id] = f

    run_ids = sorted(set(bench.keys()) & set(verif.keys()))
    return [(rid, bench[rid], verif[rid]) for rid in run_ids]

In [24]:
def read_verification_metrics_csv(path: Path) -> pd.DataFrame:
    """
    Read verification metrics CSV and return ALL rows (one per step).

    Expected columns:
      step,initial_events,final_events,initial_entities,final_entities

    Returns a DataFrame with normalized columns:
      step, init_events, final_events, init_entities, final_entities
    """
    df = pd.read_csv(path)

    if "step" not in df.columns:
        return pd.DataFrame(columns=["step", "init_events", "final_events", "init_entities", "final_entities"])

    # Ensure expected metric columns exist (fill missing with NA)
    for col in ["initial_events", "final_events", "initial_entities", "final_entities"]:
        if col not in df.columns:
            df[col] = pd.NA

    # Keep only rows that have a step
    df = df.dropna(subset=["step"]).copy()
    df["step"] = df["step"].astype(str).str.strip()
    df = df[df["step"] != ""]

    out = df[["step", "initial_events", "final_events", "initial_entities", "final_entities"]].copy()
    out = out.rename(columns={
        "initial_events": "init_events",
        "final_events": "final_events",
        "initial_entities": "init_entities",
        "final_entities": "final_entities",
    })

    # Convert metrics to numeric where possible; keep NaN for non-numeric
    for c in ["init_events", "final_events", "init_entities", "final_entities"]:
        out[c] = pd.to_numeric(out[c], errors="coerce")

    return out


def read_benchmark_metrics_csv(path: Path) -> pd.DataFrame:
    """
    Read benchmark metrics CSV and return ALL rows (one per step).

    Expected columns (at minimum):
      - step
      - Time (s)   (seconds)

    Returns a DataFrame with:
      step (string), query_exec_time_s (float)
    """
    df = pd.read_csv(path)

    if "step" not in df.columns:
        return pd.DataFrame(columns=["step", "query_exec_time_s"])

    # Try common variants just in case
    time_col = None
    for candidate in ["Time (s)", "time (s)", "time_s", "time", "Time(s)"]:
        if candidate in df.columns:
            time_col = candidate
            break

    if time_col is None:
        # If the column isn't present, return empty times (no inference)
        out = df[["step"]].copy()
        out["query_exec_time_s"] = pd.NA
        out["step"] = out["step"].astype(str).str.strip()
        out = out[out["step"] != ""]
        return out

    df = df.dropna(subset=["step"]).copy()
    df["step"] = df["step"].astype(str).str.strip()
    df = df[df["step"] != ""]

    out = df[["step", time_col]].copy()
    out = out.rename(columns={time_col: "query_exec_time_s"})
    out["query_exec_time_s"] = pd.to_numeric(out["query_exec_time_s"], errors="coerce")

    return out


In [25]:
def finalize_row(verif: Dict[str, Any], bench_time_s: Any) -> Dict[str, Any]:
    """
    Build output columns for one (run_id, step) row.

    - bench_time_s: seconds from the benchmark CSV for the SAME step (or None).
    - #Processed Nodes is computed from verification deltas:
        (init_events - final_events) + (init_entities - final_entities)
    - ProcessedNodes/ms = #Processed Nodes / Query Exec Time (ms)
    """
    s = float(bench_time_s) if bench_time_s is not None and pd.notna(bench_time_s) else None
    ms = (s * 1000.0) if isinstance(s, (int, float)) else None

    ie = verif.get("init_events")
    fe = verif.get("final_events")
    ien = verif.get("init_entities")
    fen = verif.get("final_entities")

    processed_nodes = None
    if ie is not None and fe is not None and ien is not None and fen is not None:
        # Ensure numeric (they may be numpy floats)
        try:
            processed_nodes = (float(ie) - float(fe)) + (float(ien) - float(fen))
            # If they were integer-ish, keep int
            if processed_nodes.is_integer():
                processed_nodes = int(processed_nodes)
        except Exception:
            processed_nodes = None

    processed_per_ms = None
    if ms is not None and ms != 0 and processed_nodes is not None:
        processed_per_ms = processed_nodes / ms

    time_str = f"{s:.6g} s" if s is not None else None

    return {
        "step": verif.get("step"),
        "#Init Events": ie,
        "#Final Events": fe,
        "#Init Entities": ien,
        "#Final Entities": fen,
        "Query Exec Time": time_str,
        "Query Exec Time (ms)": ms,
        "#Processed Nodes": processed_nodes,
        "ProcessedNodes/ms": processed_per_ms,
    }


In [26]:
rows = []

eval_dirs = find_eval_results_dirs(ROOT, EVAL_DIR_NAME)
print(f"Found {len(eval_dirs)} '{EVAL_DIR_NAME}' directories under {ROOT}")

for eval_dir in eval_dirs:
    pairs = index_pairs(eval_dir)
    if not pairs:
        continue

    try:
        project = eval_dir.relative_to(ROOT).parts[0]
    except Exception:
        project = eval_dir.parent.name

    for run_id, bench_path, verif_path in pairs:
        bench_df = read_benchmark_metrics_csv(bench_path)          # many rows (one per step)
        verif_df = read_verification_metrics_csv(verif_path)       # many rows (one per step)

        # Normalize bench lookup as: step -> last time_s for that step
        bench_time_by_step = {}
        if not bench_df.empty:
            for _, b in bench_df.iterrows():
                st = b.get("step")
                if pd.isna(st):
                    continue
                st = str(st).strip()
                if not st:
                    continue
                bench_time_by_step[st] = b.get("query_exec_time_s")

        if verif_df.empty:
            # Optional: keep a placeholder row so the run shows up
            verif_m = {
                "step": None,
                "init_events": None,
                "final_events": None,
                "init_entities": None,
                "final_entities": None,
            }
            table_cols = finalize_row(verif_m, bench_time_s=None)
            rows.append({
                "project": project,
                "run_id": run_id,
                **table_cols,
                "eval_results_dir": str(eval_dir),
                "benchmark_file": bench_path.name,
                "verification_file": verif_path.name,
            })
            continue

        for _, v in verif_df.iterrows():
            step = v["step"]

            verif_m = {
                "step": step,
                "init_events": None if pd.isna(v["init_events"]) else int(v["init_events"]),
                "final_events": None if pd.isna(v["final_events"]) else int(v["final_events"]),
                "init_entities": None if pd.isna(v["init_entities"]) else int(v["init_entities"]),
                "final_entities": None if pd.isna(v["final_entities"]) else int(v["final_entities"]),
            }

            bench_time_s = bench_time_by_step.get(str(step).strip())
            table_cols = finalize_row(verif_m, bench_time_s=bench_time_s)

            rows.append({
                "project": project,
                "run_id": run_id,
                **table_cols,
                "eval_results_dir": str(eval_dir),
                "benchmark_file": bench_path.name,
                "verification_file": verif_path.name,
            })

print("rows length =", len(rows))
if rows:
    print("sample row keys =", list(rows[0].keys()))

df = pd.DataFrame(rows)

df = df[[
    "project", "run_id", "step",
    "#Init Events", "#Final Events", "#Init Entities", "#Final Entities",
    "Query Exec Time", "Query Exec Time (ms)",
    "#Processed Nodes", "ProcessedNodes/ms",
]]

df


Found 3 'eval_results' directories under /Users/sara/Documents/Repo/aggregation_lib/datasets
rows length = 27
sample row keys = ['project', 'run_id', 'step', '#Init Events', '#Final Events', '#Init Entities', '#Final Entities', 'Query Exec Time', 'Query Exec Time (ms)', '#Processed Nodes', 'ProcessedNodes/ms', 'eval_results_dir', 'benchmark_file', 'verification_file']


Unnamed: 0,project,run_id,step,#Init Events,#Final Events,#Init Entities,#Final Entities,Query Exec Time,Query Exec Time (ms),#Processed Nodes,ProcessedNodes/ms
0,bpic17,2026-01-23_09-27-28.csv,"AggrStep(aggr_type='ENTITIES', ent_type='Application', group_by=['Type'], where=None, attr_aggrs=[])",1202267,1202267,223661,192152,0.551806 s,551.805973,31509,57.101593
1,bpic17,2026-01-23_09-27-28.csv,"AggrStep(aggr_type='ENTITIES', ent_type='Offer', group_by=['Type'], where=None, attr_aggrs=[])",1202267,1202267,192152,149157,0.297179 s,297.178984,42995,144.677122
2,bpic17,2026-01-23_09-27-28.csv,"AggrStep(aggr_type='ENTITIES', ent_type='Workflow', group_by=['Type'], where=None, attr_aggrs=[])",1202267,1202267,149157,117648,0.20986 s,209.860086,31509,150.142891
3,bpic17,2026-01-23_09-27-28.csv,"AggrStep(aggr_type='ENTITIES', ent_type='Resource', group_by=['Type'], where=None, attr_aggrs=[])",1202267,1202267,117648,117499,0.0101104 s,10.110378,149,14.737332
4,bpic17,2026-01-23_09-27-28.csv,"AggrStep(aggr_type='EVENTS', ent_type=None, group_by=['activity', 'lifecycle'], where=None, attr_aggrs=[])",1202267,0,117499,117499,16.8073 s,16807.284117,1202267,71.532497
5,bpic17,2026-01-23_09-27-28.csv,FINALIZATION,0,0,117499,0,1.54987 s,1549.867868,117499,75.812269
6,bpic17,2026-01-23_09-27-28.csv,TOTAL,1202267,0,223661,0,28.5948 s,28594.761848,1425928,49.866756
7,bpic17,2026-01-23_09-27-28.csv,RELATIONSHIPS,0,0,0,0,13.1154 s,13115.446806,0,0.0
8,order-management,2026-01-25_13-19-00.csv,"AggrStep(aggr_type='ENTITIES', ent_type='customers', group_by=['id'], where=None, attr_aggrs=[])",21008,21008,10840,10825,0.0245039 s,24.503946,15,0.612146
9,order-management,2026-01-25_13-19-00.csv,"AggrStep(aggr_type='ENTITIES', ent_type='orders', group_by=['Type'], where='price >= 1000', attr_aggrs=[AttrAggr(name='price', function=<AggregationFunction.AVG: 'AVG'>)])",21008,21008,10825,9363,0.078505 s,78.505039,1462,18.623008


In [27]:
filtered_df = df[~df['step'].isin(['RELATIONSHIPS', 'TOTAL'])]

print(
    "\nSummary processed nodes per millisecond:"
    f"\n  {filtered_df['ProcessedNodes/ms'].mean()}"
)


Summary processed nodes per millisecond:
  38.87128050807754
