# Evolution Pipeline Verification

Tests all evolution modules + live demo with real Kalshi event.

In [None]:
import sys, os
from pathlib import Path

# Set project root
notebook_dir = Path(os.getcwd()).resolve()
if notebook_dir.name == 'notebooks':
    project_root = notebook_dir.parent
else:
    project_root = notebook_dir

os.chdir(project_root)
if str(project_root) not in sys.path:
    sys.path.insert(0, str(project_root))

print(f"Project root: {project_root}")
print(f"Python: {sys.executable}")
print(f"sys.path[0]: {sys.path[0]}")

In [None]:
# Test basic imports first
import json
import tempfile
import shutil
from datetime import datetime, timezone
from textwrap import dedent

# Root-level modules
import schemas
import config

# Packages at root
from tools.registry import ToolRegistry, build_default_registry
from tools.base_tool import BaseTool
from agent.graph import build_agent_graph, build_evolution_graph
from api.kalshi_client import KalshiClient
from engine.tool_runner import run_tools
from engine.scorer import compute_score

# Evolution package (nested under prediction_agent/)
from prediction_agent.evolution.schemas import (
    ExecutionLogEntry, GapReport, ToolSpec, VerificationResult,
    ToolLifecycleRecord, RiskLevel, ToolStatus
)
from prediction_agent.evolution.execution_logger import log_execution
from prediction_agent.evolution.tool_gap_analyzer import analyze_gaps
from prediction_agent.evolution.tool_spec_generator import generate_tool_spec
from prediction_agent.evolution.tool_builder import build_tool
from prediction_agent.evolution.tool_verifier import verify_tool
from prediction_agent.evolution.tool_lifecycle_manager import ToolLifecycleManager

print("All imports successful!")
print(f"ENABLE_EVOLUTION = {config.ENABLE_EVOLUTION}")

In [None]:
# Create temp directory
TEMP_DIR = Path(tempfile.mkdtemp(prefix="evo_verify_"))
print(f"Temp dir: {TEMP_DIR}")

def check(label, condition):
    status = "PASS" if condition else "FAIL"
    print(f"[{status}] {label}")
    if not condition:
        raise AssertionError(label)

## Quick Validation Tests

In [None]:
# Test schema
spec = ToolSpec(tool_name="test", description="Test", deterministic=True, risk_level="low")
check("ToolSpec creation", spec.tool_name == "test")

# Test gap analyzer
gap_file = TEMP_DIR / "gap.jsonl"
with open(gap_file, "w") as f:
    for i in range(3):
        f.write(json.dumps({
            "run_id": f"r-{i}", "market_id": "M", "market_title": "T",
            "selected_tools": ["t1"], "tool_weights": [1.0],
            "tool_outputs": [{"tool_name": "t1", "output_vector": [0.5]}],
            "final_score": 0.5, "threshold": 0.5, "bet_triggered": True,
            "reasoning_segments": "test", "failed_tool_attempts": [],
            "total_tokens_used": 100, "timestamp": "2025-01-01T00:00:00+00:00"
        }) + "\n")
result = analyze_gaps(log_path=gap_file, min_runs=5)
check("Gap analyzer: insufficient data -> None", result is None)

# Test verifier
safe_code = dedent('''
from tools.base_tool import BaseTool
from schemas import EventInput, ToolOutput

class TestTool(BaseTool):
    @property
    def name(self): return "test_tool"
    @property
    def description(self): return "Test"
    def run(self, event, **kwargs):
        return ToolOutput(tool_name=self.name, output_vector=[0.5], metadata={})
''').strip()

safe_file = TEMP_DIR / "test_tool.py"
safe_file.write_text(safe_code)
verification = verify_tool(safe_file, spec)
check("Verifier: safe tool passes", verification.passed)

# Test registry
registry = build_default_registry()
check("Registry: has core tools", len(registry) > 0)

# Test graphs compile
agent_graph = build_agent_graph()
evo_graph = build_evolution_graph()
check("Graphs compile", agent_graph is not None and evo_graph is not None)

print("\nAll validation tests passed!")

## Live Demo: Real Event + Full Pipeline

In [None]:
import logging
logging.basicConfig(level=logging.INFO, format="%(levelname)s: %(message)s")

# Fetch real market
client = KalshiClient()
markets = client.get_active_markets(limit=10)
print(f"\nFetched {len(markets)} markets:")
for i, m in enumerate(markets[:3]):
    print(f"  [{i}] {m.get('title', 'Unknown')[:50]}")

chosen = markets[0]
event = schemas.EventInput(
    event_id=chosen.get("event_id", chosen["market_id"]),
    market_id=chosen["market_id"],
    market_title=chosen.get("title", "Unknown"),
    current_price=chosen.get("last_price", 0.0),
    timestamp=datetime.fromisoformat(chosen["timestamp"]) if isinstance(chosen["timestamp"], str) else chosen["timestamp"],
)
print(f"\nSelected: {event.market_title}")
print(f"Price: {event.current_price:.2f}")

In [None]:
# Run agent
registry = build_default_registry()
agent_graph = build_agent_graph()

agent_result = agent_graph.invoke({
    "event_input": event.model_dump(mode="json"),
    "tools_list": registry.list_tools(),
    "formula_spec": None,
    "error": None,
})

formula = schemas.FormulaSpec(**agent_result["formula_spec"])
print(f"\nAgent selected {len(formula.selections)} tools")
print(f"Threshold: {formula.threshold:.4f}")

In [None]:
# Execute tools and score
tool_outputs = run_tools(event, formula, registry)
score = compute_score(tool_outputs, formula)
print(f"\nScore: {score.final_score:.4f}")
print(f"Bet triggered: {score.bet_triggered}")

In [None]:
# Log execution + seed borderline runs
live_log = TEMP_DIR / "live_log.jsonl"
live_result = {
    "run_id": f"live-{datetime.now(timezone.utc).strftime('%Y%m%d%H%M%S')}",
    "event": event.model_dump(mode="json"),
    "formula": formula.model_dump(mode="json"),
    "score": score.model_dump(mode="json"),
}
log_execution(live_result, log_path=live_log)

# Seed 9 borderline runs to guarantee gap detection
for i in range(9):
    seeded = {
        "run_id": f"seed-{i:03d}",
        "event": event.model_dump(mode="json"),
        "formula": {"selections": [{"tool_name": s.tool_name, "weight": s.weight} for s in formula.selections],
                    "threshold": formula.threshold, "rationale": formula.rationale},
        "score": {"final_score": formula.threshold + 0.01 * (i % 3 - 1),
                  "tool_outputs": [{"tool_name": s.tool_name, "output_vector": [0.5]} for s in formula.selections],
                  "threshold": formula.threshold, "bet_triggered": i % 2 == 0},
    }
    log_execution(seeded, log_path=live_log)

print(f"\nLogged {len(live_log.read_text().strip().split(chr(10)))} entries")

In [None]:
# Analyze gaps
detected_gap = analyze_gaps(log_path=live_log, min_runs=5, gap_threshold=0.05)
if not detected_gap:
    detected_gap = analyze_gaps(log_path=live_log, min_runs=5, gap_threshold=0.0)

if detected_gap:
    print(f"\nGap detected: {detected_gap.problem_detected}")
    print(f"Priority: {detected_gap.priority_score:.3f}")
else:
    print("\nNo gap detected (unexpected)")

In [None]:
# LLM: Generate tool spec
if detected_gap:
    print("\nCalling LLM to generate tool spec...")
    proposed_spec = generate_tool_spec(detected_gap)
    print(f"Proposed: {proposed_spec.tool_name}")
    print(f"Description: {proposed_spec.description}")
    print(f"Risk: {proposed_spec.risk_level.value}")

In [None]:
# LLM: Build tool code
if detected_gap:
    build_dir = TEMP_DIR / "built"
    build_dir.mkdir(exist_ok=True)
    print(f"\nCalling LLM to generate code for '{proposed_spec.tool_name}'...")
    tool_path = build_tool(proposed_spec, output_dir=build_dir)
    print(f"Written to: {tool_path.name}")
    print(f"Size: {tool_path.stat().st_size} bytes")
    print(f"\n{'='*60}")
    print("GENERATED CODE:")
    print(f"{'='*60}")
    print(tool_path.read_text()[:500] + "...")

In [None]:
# Verify tool
if detected_gap:
    print(f"\nVerifying '{proposed_spec.tool_name}'...")
    verification = verify_tool(tool_path, proposed_spec)
    for check_name, passed in verification.checks.items():
        print(f"  {check_name}: {'PASS' if passed else 'FAIL'}")
    print(f"\nResult: {'APPROVED' if verification.passed else 'REJECTED'}")
    if not verification.passed:
        print(f"Reason: {verification.rejection_reason}")

In [None]:
# Cleanup
shutil.rmtree(TEMP_DIR, ignore_errors=True)
print(f"\nCleaned up: {TEMP_DIR}")
print("\nDONE!")