<a href="https://colab.research.google.com/github/elangbijak4/Swarm-Verifiers-and-Swarm-Problem-Solvers-over-a-Mathematical-Only-Embedding-Space/blob/main/PoC_SWARM_PROVER.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
# Demo Colab-style script: a minimal simulation of Swarm Solvers + Swarm Verifiers
# This code demonstrates the main ideas from the paper in a tiny, runnable environment:
# - Mathematical-only "vocabulary" for proof steps
# - M solver agents proposing proof-step sequences
# - N verifier agents giving scores and localized defect annotations
# - Aggregation (consensus) and simple solver revision loop
# The goal: illustrate dynamics (proposal -> critique -> revision -> consensus)
# This is intentionally small, deterministic-ish, and annotated for clarity.

import random
import numpy as np
import pandas as pd
from pprint import pprint

random.seed(42)
np.random.seed(42)

# -----------------------
# 1) Mathematical-only vocabulary (toy)
# -----------------------
VOCAB = [
    "assume_even_a",          # a = 2*k1
    "assume_even_b",          # b = 2*k2
    "sum_expr",               # a + b -> representation
    "rewrite_distrib",        # 2*k1 + 2*k2 -> 2*(k1+k2)
    "conclude_even",          # exists k: a+b = 2*k
    "bad_factor",             # an incorrect step producing a wrong factor
    "extra_obvious_step",     # redundant but valid
]

# Small helper: represent a proof as a list of step tokens
def pretty_proof(p):
    return " -> ".join(p)

# -----------------------
# 2) Solver agents (generators)
# Each solver has a simple heuristic policy: choose a sequence of steps.
# Some solvers are "careful", some "fast but error-prone", introducing heterogeneity.
# -----------------------
class SolverAgent:
    def __init__(self, name, style="careful"):
        self.name = name
        self.style = style
        self.self_score = None  # solver's self-evaluation (0..1)

    def propose(self, problem):
        # For this demo problem (even+even), we craft a few patterns
        if self.style == "careful":
            proof = ["assume_even_a", "assume_even_b", "sum_expr", "rewrite_distrib", "conclude_even"]
            self.self_score = 1.0
        elif self.style == "fast":
            # sometimes makes a bad factorization
            if random.random() < 0.5:
                proof = ["assume_even_a", "assume_even_b", "sum_expr", "bad_factor", "conclude_even"]
                self.self_score = 0.5
            else:
                proof = ["assume_even_a", "assume_even_b", "sum_expr", "rewrite_distrib", "conclude_even"]
                self.self_score = 0.8
        elif self.style == "minimal":
            # minimal outline, may lack rewrite
            proof = ["assume_even_a", "assume_even_b", "sum_expr", "conclude_even"]
            self.self_score = 0.6
        else:
            proof = ["assume_even_a", "assume_even_b", "sum_expr", "rewrite_distrib", "conclude_even"]
            self.self_score = 0.9
        return proof

    def revise(self, proof, defect_suggestions):
        # Apply simple revisions based on defect suggestions (list of (step_idx, suggestion_token))
        revised = proof.copy()
        for idx, suggestion in defect_suggestions:
            if 0 <= idx < len(revised):
                revised[idx] = suggestion
        # Simple heuristic: if missing rewrite_distrib and suggestion says to add, append it before conclude_even
        if "rewrite_distrib" not in revised and "conclude_even" in revised:
            ci = revised.index("conclude_even")
            revised.insert(ci, "rewrite_distrib")
        # update self-score heuristically
        self.self_score = min(1.0, self.self_score + 0.2)
        return revised

# -----------------------
# 3) Verifier agents
# Each verifier has a different detection behavior, producing:
#   - score in {0, 0.5, 1}
#   - defect annotations: list of (step_index, defect_type, suggested_fix)
# -----------------------
class VerifierAgent:
    def __init__(self, name, style="strict"):
        self.name = name
        self.style = style

    def evaluate(self, proof):
        # Basic checks: must have assume steps, a sum, and conclude_even.
        score = 1.0
        defects = []
        # missing assumptions
        if "assume_even_a" not in proof or "assume_even_b" not in proof:
            score = 0.0
            defects.append((0, "missing_assumption", "assume_even_a/assume_even_b"))
            return score, defects  # immediate fail

        # check rewrite presence or bad_factor
        if "bad_factor" in proof:
            # strict verifiers mark as fail, lenient may give 0.5
            if self.style == "strict":
                score = 0.0
                idx = proof.index("bad_factor")
                defects.append((idx, "incorrect_factorization", "rewrite_distrib"))
            else:
                score = 0.5
                idx = proof.index("bad_factor")
                defects.append((idx, "suspicious_factor", "rewrite_distrib"))
        elif "rewrite_distrib" not in proof:
            # missing explicit rewrite
            if self.style == "strict":
                score = 0.5
                defects.append((len(proof)-1, "missing_rewrite", "rewrite_distrib"))
            else:
                score = 1.0  # lenient accepts implicit step
        else:
            score = 1.0

        # some verifiers check for redundancy (not a defect for correctness)
        if "extra_obvious_step" in proof and self.style == "picky":
            score = max(0.5, score - 0.2)
            idx = proof.index("extra_obvious_step")
            defects.append((idx, "redundant_step", None))
        return score, defects

# -----------------------
# 4) Coordination module
# Aggregates verifier outputs and decides consensus
# -----------------------
def aggregate_verifier_outputs(verifier_results):
    # verifier_results: list of tuples (verifier_name, score, defects)
    scores = [r[1] for r in verifier_results]
    mean_score = float(np.mean(scores))
    var_score = float(np.var(scores))
    # union defects: map step_idx -> list of defect descriptions
    defect_union = {}
    for vname, score, defects in verifier_results:
        for (idx, dtype, suggestion) in defects:
            defect_union.setdefault(idx, []).append((vname, dtype, suggestion))
    return mean_score, var_score, defect_union

# Convert defect_union to suggestions for solver: choose majority suggestion for each step index if any
def make_suggestions_from_union(defect_union):
    suggestions = []
    for idx, anns in defect_union.items():
        # pick first non-None suggestion
        suggestion = None
        for vname, dtype, s in anns:
            if s is not None:
                suggestion = s
                break
        if suggestion is None:
            # fallback: if incorrect_factorization -> rewrite_distrib
            for _, dtype, _ in anns:
                if "factor" in dtype:
                    suggestion = "rewrite_distrib"
                    break
        if suggestion:
            suggestions.append((idx, suggestion))
    return suggestions

# -----------------------
# 5) Simulation: Rounds of propose -> verify -> aggregate -> revise
# -----------------------
def run_simulation(M=3, N=3, rounds=4, verbose=True):
    # create solvers with heterogeneous styles
    styles = ["careful", "fast", "minimal"]
    solvers = [SolverAgent(f"G{i+1}", style=styles[i % len(styles)]) for i in range(M)]
    # verifiers with different styles
    vstyles = ["strict", "lenient", "picky"]
    verifiers = [VerifierAgent(f"V{j+1}", style=vstyles[j % len(vstyles)]) for j in range(N)]

    problem = "even_plus_even_is_even"

    # storage for logs
    history = []

    for t in range(rounds):
        round_log = {"round": t+1, "proposals": {}, "verifier_results": {}, "aggregation": None, "suggestions": None}
        # Solvers propose
        proposals = {}
        self_scores = {}
        for solver in solvers:
            p = solver.propose(problem)
            proposals[solver.name] = p
            self_scores[solver.name] = solver.self_score
        round_log["proposals"] = proposals

        # Verifiers evaluate each proof
        verifier_results = {}
        for verifier in verifiers:
            for sname, proof in proposals.items():
                sc, defects = verifier.evaluate(proof)
                verifier_results.setdefault(sname, []).append((verifier.name, sc, defects))
                round_log["verifier_results"].setdefault(solver.name, {})
        round_log["verifier_results"] = verifier_results

        # Aggregation & suggestions, then solvers revise based on suggestions
        round_aggregations = {}
        suggestions_for_solvers = {}
        for sname, v_results in verifier_results.items():
            mean_score, var_score, defect_union = aggregate_verifier_outputs(v_results)
            sugg = make_suggestions_from_union(defect_union)
            round_aggregations[sname] = {"mean_score": mean_score, "var_score": var_score, "defects": defect_union}
            suggestions_for_solvers[sname] = sugg
        round_log["aggregation"] = round_aggregations
        round_log["suggestions"] = suggestions_for_solvers

        # If not last round, apply revisions
        if t < rounds - 1:
            for solver in solvers:
                sname = solver.name
                current_proof = proposals[sname]
                suggestions = suggestions_for_solvers.get(sname, [])
                revised = solver.revise(current_proof, suggestions)
                # set next proposal by monkey patching propose method (for demo simplicity)
                # We'll set solver.propose to return the revised proof next time
                def make_fixed_propose(p):
                    return lambda problem: p
                solver.propose = make_fixed_propose(revised)

        history.append(round_log)

        if verbose:
            print(f"\n=== Round {t+1} ===")
            print("Proposals:")
            for sname, proof in proposals.items():
                print(f"  {sname}: {pretty_proof(proof)}  (self_score={self_scores[sname]:.2f})")
            print("\nVerifier results per proposal (verifier_name: score, defects):")
            for sname, vres in verifier_results.items():
                print(f"  {sname}:")
                for vname, sc, defects in vres:
                    print(f"    {vname}: score={sc}, defects={defects}")
            print("\nAggregations & suggestions:")
            for sname, agg in round_aggregations.items():
                print(f"  {sname}: mean_score={agg['mean_score']:.2f}, var={agg['var_score']:.3f}, defects={agg['defects']}")
                print(f"    suggestions: {suggestions_for_solvers[sname]}")

    # Final summary table
    rows = []
    final_round = history[-1]
    final_proposals = final_round["proposals"]
    final_agg = final_round["aggregation"]
    for sname, proof in final_proposals.items():
        agg = final_agg[sname]
        rows.append({
            "solver": sname,
            "final_proof": pretty_proof(proof),
            "mean_score": agg["mean_score"],
            "var_score": agg["var_score"],
            "defects": "; ".join([f"{k}:{[d[1] for d in v]}" for k,v in agg["defects"].items()]) if agg["defects"] else ""
        })
    df = pd.DataFrame(rows)
    return history, df

# Run the simulation
history, summary_df = run_simulation(M=3, N=3, rounds=4, verbose=True)

!pip install ace_tools
import ace_tools as tools
try:
    # display dataframe to user via ace_tools if available in the environment
    tools.display_dataframe_to_user("Swarm Simulation Summary", summary_df)
except Exception:
    # fallback: print summary
    print("\nFinal Summary:")
    print(summary_df.to_string(index=False))

# Also show a compact history for inspection (first and last rounds)
print("\n\n--- Compact history (first and last round) ---")
print("First round proposals:")
for sname, proof in history[0]["proposals"].items():
    print(f"  {sname}: {pretty_proof(proof)}")
print("\nLast round proposals:")
for sname, proof in history[-1]["proposals"].items():
    print(f"  {sname}: {pretty_proof(proof)}")


=== Round 1 ===
Proposals:
  G1: assume_even_a -> assume_even_b -> sum_expr -> rewrite_distrib -> conclude_even  (self_score=1.00)
  G2: assume_even_a -> assume_even_b -> sum_expr -> rewrite_distrib -> conclude_even  (self_score=0.80)
  G3: assume_even_a -> assume_even_b -> sum_expr -> conclude_even  (self_score=0.60)

Verifier results per proposal (verifier_name: score, defects):
  G1:
    V1: score=1.0, defects=[]
    V2: score=1.0, defects=[]
    V3: score=1.0, defects=[]
  G2:
    V1: score=1.0, defects=[]
    V2: score=1.0, defects=[]
    V3: score=1.0, defects=[]
  G3:
    V1: score=0.5, defects=[(3, 'missing_rewrite', 'rewrite_distrib')]
    V2: score=1.0, defects=[]
    V3: score=1.0, defects=[]

Aggregations & suggestions:
  G1: mean_score=1.00, var=0.000, defects={}
    suggestions: []
  G2: mean_score=1.00, var=0.000, defects={}
    suggestions: []
  G3: mean_score=0.83, var=0.056, defects={3: [('V1', 'missing_rewrite', 'rewrite_distrib')]}
    suggestions: [(3, 'rewrite_di

ModuleNotFoundError: No module named 'ace_tools'