In [8]:
import os
import json
import glob
import csv
import re
import pandas as pd
from collections import defaultdict

###############################################################################
# 1. FILENAME PATTERN & PROBLEM MAP
###############################################################################

FILENAME_REGEX = re.compile(r'^(.*?)_(.*?)_(.*?)_run(\d+)_output\.json$')

PROBLEM_NAME_MAP = {
    "cities": "OptimalNetworkDesign",
    "directedSocialNetwork": "DirectedSocialNetwork",
    "geneAssociation": "GeneDiseaseAssociation",
    "quantum": "QuantumCircuit",
    "timeDependentDeliveryNetwork": "TimeDependentDelivery"
}

INPUT_GLOB = "./**/*.json"
PER_RUN_CSV = "summaries.csv"
AGGREGATED_CSV = "aggregated_topline.csv"

###############################################################################
# 2. SPLIT-ON-PERIOD HELPER
###############################################################################

def split_by_period_count(msg: str) -> int:
    """
    Splits a message string on '.' and counts the non-empty segments.
    This is how we detect multiple distinct error statements in a single constraint message.
    """
    parts = [p.strip() for p in msg.split('.') if p.strip()]
    return len(parts)

###############################################################################
# 3. GROUP MAPPING
###############################################################################
# Includes constraints from all problems, including "cities" (OptimalNetworkDesign).
# We map each known constraint key to one of the three groups.

CONSTRAINT_GROUPS = {
    # Structural group
    "connectivity": "Structural",
    "road_capacity": "Structural",
    "redundancy": "Structural",
    "max_edges_constraint": "Structural",
    "valid_no_self_loops": "Structural",
    "valid_edge_structure": "Structural",
    "valid_acyclic": "Structural",
    "bipartite_constraint": "Structural",
    "duplicate_associations": "Structural",
    "valid_dag": "Structural",
    "layered_operations": "Structural",
    "degree_constraints" : "Structural",

    # Logical group
    "cost_optimization": "Logical",
    "population_accessibility": "Logical",
    "strategic_road_placement": "Logical",
    "valid_celebrity_outgoing": "Logical",
    "valid_regular_to_expert": "Logical",
    "time_window_compliance": "Logical",
    "storage_capacity_compliance": "Logical",
    "vehicle_capacity_compliance": "Logical",
    "valid_gate_precedences": "Logical",
    "valid_cnot_adjacency": "Logical",
    "valid_swap_constraints": "Logical",
    "valid_associations": "Logical",
    
    # Attribute group
    "output_structure": "Attribute",
    "valid_user_attributes": "Attribute",
    "valid_total_users": "Attribute",
    "valid_categories": "Attribute",
    "valid_trust_scores": "Attribute",
    "defined_counts": "Attribute",
    "valid_gate_types": "Attribute",
    "valid_qubits": "Attribute",
}

###############################################################################
# 4. COUNTING ERRORS & PASSED CONSTRAINTS
###############################################################################

def count_constraints_errors(fullOutput: dict) -> int:
    """
    Count the total number of errors by scanning each constraint (top-level & nested).
    If the constraint is "passed": false, we split its 'message' on '.' to count sub-errors.
    If no message, count it as 1 error.

    We do NOT look at fullOutput["errors"].
    """
    total_errors = 0

    # Check top-level constraints
    for key, val in fullOutput.items():
        if key in ["errors", "overall_passed"]:
            continue
        if isinstance(val, dict) and "passed" in val:
            if not val["passed"]:
                msg = val.get("message", "")
                count = split_by_period_count(msg) if msg else 1
                total_errors += count
        elif isinstance(val, bool):
            if val is False:
                total_errors += 1

    # Check nested constraints under "constraints"
    nested = fullOutput.get("constraints", {})
    if isinstance(nested, dict):
        for key, val in nested.items():
            if isinstance(val, dict) and "passed" in val:
                if not val["passed"]:
                    msg = val.get("message", "")
                    count = split_by_period_count(msg) if msg else 1
                    total_errors += count
            elif isinstance(val, bool):
                if val is False:
                    total_errors += 1

    return total_errors

def count_grouped_errors(fullOutput: dict) -> dict:
    """
    For each constraint in fullOutput and fullOutput["constraints"] that fails,
    find which group it belongs to (if any), and count sub-errors by splitting on '.'.
    If no message, count as 1. Return a dict: { "Structural": X, "Logical": Y, "Attribute": Z }
    """
    groups = {"Structural": 0, "Logical": 0, "Attribute": 0}

    # Top-level
    for key, val in fullOutput.items():
        if key in ["errors", "overall_passed"]:
            continue
        group = CONSTRAINT_GROUPS.get(key)
        if group:
            if isinstance(val, dict) and "passed" in val:
                if not val["passed"]:
                    msg = val.get("message", "")
                    count = split_by_period_count(msg) if msg else 1
                    groups[group] += count
            elif isinstance(val, bool):
                if val is False:
                    groups[group] += 1

    # Nested constraints
    nested = fullOutput.get("constraints", {})
    if isinstance(nested, dict):
        for key, val in nested.items():
            group = CONSTRAINT_GROUPS.get(key)
            if group:
                if isinstance(val, dict) and "passed" in val:
                    if not val["passed"]:
                        msg = val.get("message", "")
                        count = split_by_period_count(msg) if msg else 1
                        groups[group] += count
                elif isinstance(val, bool):
                    if val is False:
                        groups[group] += 1
    return groups

def count_grouped_passed(fullOutput: dict) -> dict:
    """
    Similar to count_grouped_errors, but we count how many constraints in each group are PASSED.
    Return a dict: { "Structural": X, "Logical": Y, "Attribute": Z } indicating how many passed.
    """
    passed_counts = {"Structural": 0, "Logical": 0, "Attribute": 0}

    # Top-level
    for key, val in fullOutput.items():
        if key in ["errors", "overall_passed"]:
            continue
        group = CONSTRAINT_GROUPS.get(key)
        if group:
            if isinstance(val, dict) and "passed" in val:
                if val["passed"]:
                    passed_counts[group] += 1
            elif isinstance(val, bool):
                if val is True:
                    passed_counts[group] += 1

    # Nested constraints
    nested = fullOutput.get("constraints", {})
    if isinstance(nested, dict):
        for key, val in nested.items():
            group = CONSTRAINT_GROUPS.get(key)
            if group:
                if isinstance(val, dict) and "passed" in val:
                    if val["passed"]:
                        passed_counts[group] += 1
                elif isinstance(val, bool):
                    if val is True:
                        passed_counts[group] += 1

    return passed_counts

###############################################################################
# 5. PER-RUN PARSING
###############################################################################

def parse_json_file(filepath: str) -> dict | None:
    """
    Parse a single JSON file => produce a run-level summary row with:
      - overall_passed
      - constraints_passed, constraints_total
      - error_count
      - structural_errors, logical_errors, attribute_errors
      - structural_constraints_passed, logical_constraints_passed, attribute_constraints_passed
    We ignore fullOutput["errors"] array for counting errors.
    """
    filename = os.path.basename(filepath)
    m = FILENAME_REGEX.match(filename)
    if not m:
        return None
    problem_raw = m.group(1)
    model_name = m.group(2)
    prompt_type = m.group(3)
    run_number = int(m.group(4))
    if not (1 <= run_number <= 5):
        return None

    problem_name = PROBLEM_NAME_MAP.get(problem_raw, problem_raw)

    with open(filepath, "r", encoding="utf-8") as f:
        data = json.load(f)

    overall_passed = False
    fo = data.get("fullOutput", {})
    if isinstance(data.get("result"), bool):
        overall_passed = data["result"]
    if isinstance(fo.get("overall_passed"), bool):
        overall_passed = fo["overall_passed"]

    # Count how many constraints are passed/total (for info)
    constraints_passed = 0
    constraints_total = 0
    if isinstance(fo, dict):
        # top-level
        for key, val in fo.items():
            if key in ["errors", "overall_passed"]:
                continue
            if isinstance(val, dict) and "passed" in val:
                constraints_total += 1
                if val["passed"]:
                    constraints_passed += 1
            elif isinstance(val, bool):
                constraints_total += 1
                if val:
                    constraints_passed += 1
        # nested constraints
        nested = fo.get("constraints", {})
        if isinstance(nested, dict):
            for key, val in nested.items():
                if isinstance(val, dict) and "passed" in val:
                    constraints_total += 1
                    if val["passed"]:
                        constraints_passed += 1
                elif isinstance(val, bool):
                    constraints_total += 1
                    if val:
                        constraints_passed += 1

    # Overall error_count from constraints only
    error_count = count_constraints_errors(fo)

    # Grouped error counts (failing constraints)
    group_errs = count_grouped_errors(fo)

    # Grouped pass counts (passing constraints)
    group_passed = count_grouped_passed(fo)

    return {
        "filename": filename,
        "problem": problem_name,
        "model": model_name,
        "prompt_type": prompt_type,
        "run_number": run_number,
        "overall_passed": overall_passed,
        "constraints_passed": constraints_passed,
        "constraints_total": constraints_total,
        "error_count": error_count,

        # failing constraints
        "structural_errors": group_errs["Structural"],
        "logical_errors": group_errs["Logical"],
        "attribute_errors": group_errs["Attribute"],

        # passing constraints
        "structural_constraints_passed": group_passed["Structural"],
        "logical_constraints_passed": group_passed["Logical"],
        "attribute_constraints_passed": group_passed["Attribute"]
    }

###############################################################################
# 6. BUILD PER-RUN SUMMARY
###############################################################################

def build_summary_csv(input_glob: str, output_csv: str):
    rows = []
    for filepath in glob.glob(input_glob, recursive=True):
        if filepath.endswith(".json"):
            row = parse_json_file(filepath)
            if row is not None:
                rows.append(row)
    # remove duplicates
    seen = set()
    unique_rows = []
    for r in rows:
        combo = (r["problem"], r["model"], r["prompt_type"], r["run_number"])
        if combo not in seen:
            seen.add(combo)
            unique_rows.append(r)
    unique_rows.sort(key=lambda r: (r["problem"], r["model"], r["prompt_type"], r["run_number"]))

    fieldnames = [
        "filename", "problem", "model", "prompt_type", "run_number",
        "overall_passed", "constraints_passed", "constraints_total", "error_count",
        "structural_errors", "logical_errors", "attribute_errors",
        "structural_constraints_passed", "logical_constraints_passed", "attribute_constraints_passed"
    ]
    with open(output_csv, "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        writer.writerows(unique_rows)
    print(f"Wrote {len(unique_rows)} per-run summary rows to {output_csv}")

###############################################################################
# 7. AGGREGATE INTO TOP-LINE
###############################################################################

def aggregate_grouped_summary(input_csv: str, output_csv: str):
    df = pd.read_csv(input_csv)
    # group by problem, model, prompt_type
    group_cols = ["problem", "model", "prompt_type"]
    # keep only combos that have exactly 5 runs
    run_counts = df.groupby(group_cols)["run_number"].nunique().reset_index().rename(columns={"run_number": "run_count"})
    valid_groups = run_counts[run_counts["run_count"] == 5][group_cols]
    df_valid = pd.merge(df, valid_groups, on=group_cols, how="inner")

    agg = df_valid.groupby(group_cols, dropna=False).agg(
        pass_rate = ("overall_passed", "mean"),
        avg_error_count = ("error_count", "mean"),
        avg_structural_errors = ("structural_errors", "mean"),
        avg_logical_errors = ("logical_errors", "mean"),
        avg_attribute_errors = ("attribute_errors", "mean"),

        avg_structural_passed = ("structural_constraints_passed", "mean"),
        avg_logical_passed = ("logical_constraints_passed", "mean"),
        avg_attribute_passed = ("attribute_constraints_passed", "mean"),

        run_count = ("run_number", "count")
    ).reset_index()

    # convert pass_rate to percentage and round
    agg["pass_rate"] = (agg["pass_rate"] * 100).round(1)
    agg["avg_error_count"] = agg["avg_error_count"].round(2)
    agg["avg_structural_errors"] = agg["avg_structural_errors"].round(2)
    agg["avg_logical_errors"] = agg["avg_logical_errors"].round(2)
    agg["avg_attribute_errors"] = agg["avg_attribute_errors"].round(2)

    agg["avg_structural_passed"] = agg["avg_structural_passed"].round(2)
    agg["avg_logical_passed"] = agg["avg_logical_passed"].round(2)
    agg["avg_attribute_passed"] = agg["avg_attribute_passed"].round(2)

    agg.to_csv(output_csv, index=False)
    print(f"Wrote aggregated top-line summary to {output_csv}")
    print(agg.head(20))

###############################################################################
# 8. MAIN
###############################################################################

if __name__ == "__main__":
    build_summary_csv(INPUT_GLOB, PER_RUN_CSV)
    aggregate_grouped_summary(PER_RUN_CSV, AGGREGATED_CSV)


Wrote 975 per-run summary rows to summaries.csv
Wrote aggregated top-line summary to aggregated_topline.csv
                  problem           model        prompt_type  pass_rate  \
0   DirectedSocialNetwork   Claude35Haiku       directprompt        0.0   
1   DirectedSocialNetwork   Claude35Haiku  iterativefeedback        0.0   
2   DirectedSocialNetwork   Claude35Haiku   programaugmented        0.0   
3   DirectedSocialNetwork  Claude35Sonnet       directprompt      100.0   
4   DirectedSocialNetwork  Claude35Sonnet  iterativefeedback      100.0   
5   DirectedSocialNetwork  Claude35Sonnet   programaugmented       60.0   
6   DirectedSocialNetwork      DeepSeekR1       directprompt       20.0   
7   DirectedSocialNetwork      DeepSeekR1  iterativefeedback       40.0   
8   DirectedSocialNetwork      DeepSeekR1   programaugmented        0.0   
9   DirectedSocialNetwork      DeepSeekV3       directprompt        0.0   
10  DirectedSocialNetwork      DeepSeekV3  iterativefeedback       

In [6]:
import pandas as pd
import seaborn as sns
import matplotlib.pyplot as plt

# 1. Load your CSV data
# Replace "summaries.csv" with the filename that contains your constraint-level pass/fail data
df = pd.read_csv("summaries.csv")

# Example of what df might look like:
#   model        prompt        constraint                 failed
# 0 GPT4omini    direct        vehicle_capacity           1
# 1 GPT4omini    direct        connectivity               0
# 2 GPT4omini    direct        cost_optimization          0
# ... etc.

# 2. Group by (model, prompt, constraint) to compute average failures
#    If you have multiple runs per combination, you can average them:
df_grouped = df.groupby(["model", "prompt", "constraint"], as_index=False)["failed"].mean()

# Now df_grouped has columns: model, prompt, constraint, failed (float 0..1)

# 3. Create a pivoted table:
#    Rows = model × prompt (concatenate them into one label if you like)
#    Columns = constraint
#    Values = average failure rate
df_grouped["model_prompt"] = df_grouped["model"] + "_" + df_grouped["prompt"]
df_pivot = df_grouped.pivot(index="model_prompt", columns="constraint", values="failed")

# 4. Plot the heatmap
plt.figure(figsize=(12, 6))
sns.heatmap(df_pivot, annot=True, cmap="Reds", vmin=0, vmax=1,
            cbar_kws={"label": "Failure Rate (0=Perfect, 1=All Failed)"})

plt.title("Constraint Failure Heatmap")
plt.xlabel("Constraint")
plt.ylabel("Model + Prompt")
plt.tight_layout()
plt.show()


KeyError: 'prompt'

In [9]:
# Pivot data for visualization with x-axis as prompt types instead of problems
structural_data_prompt = aggregated_df.pivot_table(index="model", columns="prompt_type", values="avg_structural_passed", aggfunc=np.sum)
logical_data_prompt = aggregated_df.pivot_table(index="model", columns="prompt_type", values="avg_logical_passed", aggfunc=np.sum)
attribute_data_prompt = aggregated_df.pivot_table(index="model", columns="prompt_type", values="avg_attribute_passed", aggfunc=np.sum)

# Creating three subfigures (one for each constraint type) with prompt types on x-axis
fig, axes = plt.subplots(1, 3, figsize=(18, 6))

# Structural Constraints
sns.heatmap(structural_data_prompt, annot=True, cmap="Blues", linewidths=0.5, fmt=".1f", ax=axes[0])
axes[0].set_title("Structural Constraints Passed")
axes[0].set_xlabel("Prompt Type")
axes[0].set_ylabel("Model")

# Logical Constraints
sns.heatmap(logical_data_prompt, annot=True, cmap="Greens", linewidths=0.5, fmt=".1f", ax=axes[1])
axes[1].set_title("Logical Constraints Passed")
axes[1].set_xlabel("Prompt Type")
axes[1].set_ylabel("Model")

# Attribute Constraints
sns.heatmap(attribute_data_prompt, annot=True, cmap="Reds", linewidths=0.5, fmt=".1f", ax=axes[2])
axes[2].set_title("Attribute Constraints Passed")
axes[2].set_xlabel("Prompt Type")
axes[2].set_ylabel("Model")

plt.tight_layout()
plt.show()


NameError: name 'aggregated_df' is not defined