<a href="https://colab.research.google.com/github/cotrader/versuch/blob/main/Agents.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
from __future__ import annotations

from dataclasses import dataclass, field
from typing import Any, Dict, List, Optional, Tuple
import time
import traceback


# -----------------------------
# Shared State / Task Artifacts
# -----------------------------
@dataclass
class AgentState:
    goal: str
    inputs: Dict[str, Any] = field(default_factory=dict)
    artifacts: Dict[str, Any] = field(default_factory=dict)   # tables, charts, report text, files, etc.
    logs: List[str] = field(default_factory=list)
    metrics: Dict[str, Any] = field(default_factory=dict)     # confidence scores, timings, etc.
    issues: List[str] = field(default_factory=list)           # QA/Compliance flags


def log(state: AgentState, msg: str) -> None:
    stamp = time.strftime("%Y-%m-%d %H:%M:%S")
    state.logs.append(f"[{stamp}] {msg}")


# -----------------------------
# Level 2: Specialist Agents
# -----------------------------
def research_agent(state: AgentState) -> AgentState:
    log(state, "ResearchAgent: collecting sources and raw facts.")
    state.artifacts["sources"] = state.inputs.get("sources", [])
    state.artifacts["raw_facts"] = state.inputs.get("raw_facts", {})
    state.metrics["research_confidence"] = state.inputs.get("research_confidence", 0.6)
    return state


def data_engineering_agent(state: AgentState) -> AgentState:
    log(state, "DataEngineeringAgent: normalizing raw facts into structured rows.")
    raw = state.artifacts.get("raw_facts", {})
    rows = raw.get("rows") if isinstance(raw, dict) else None
    if rows is None:
        rows = state.inputs.get("rows", [])
    state.artifacts["table_rows"] = rows
    state.metrics["data_quality"] = 0.7 if rows else 0.3
    if not rows:
        state.issues.append("DATA_EMPTY: No rows produced by DataEngineeringAgent.")
    return state


def quant_signal_agent(state: AgentState) -> AgentState:
    log(state, "QuantSignalAgent: computing signals and metrics.")
    rows = state.artifacts.get("table_rows", [])
    enriched = []
    for r in rows:
        r2 = dict(r)
        px = r2.get("price")
        tgt = r2.get("target")
        if isinstance(px, (int, float)) and isinstance(tgt, (int, float)) and px != 0:
            r2["upside_pct"] = (tgt - px) / px * 100.0
        enriched.append(r2)
    state.artifacts["signal_rows"] = enriched
    state.metrics["signal_confidence"] = 0.65 if enriched else 0.2
    return state


def risk_manager_agent(state: AgentState) -> AgentState:
    log(state, "RiskManagerAgent: applying constraints and sizing.")
    rows = state.artifacts.get("signal_rows", [])
    for r in rows:
        if "upside_pct" in r and isinstance(r["upside_pct"], (int, float)):
            r["upside_pct_capped"] = max(min(r["upside_pct"], 200.0), -90.0)
        r["max_weight"] = 0.10
    state.artifacts["risk_rows"] = rows
    state.metrics["risk_ok"] = True
    return state


def execution_strategy_agent(state: AgentState) -> AgentState:
    log(state, "ExecutionStrategyAgent: generating portfolio weights.")
    rows = state.artifacts.get("risk_rows", [])
    rows_sorted = sorted(rows, key=lambda x: x.get("upside_pct_capped", x.get("upside_pct", -1e9)), reverse=True)
    top_n = state.inputs.get("top_n", 5)
    selected = rows_sorted[:top_n] if top_n > 0 else rows_sorted
    n = len(selected)
    if n == 0:
        state.issues.append("EXEC_EMPTY: No rows available for allocation.")
        state.artifacts["weights"] = {}
        return state

    w = 1.0 / n
    weights = {}
    for r in selected:
        sym = r.get("symbol") or r.get("ticker") or r.get("name") or f"asset_{len(weights)+1}"
        cap = r.get("max_weight", 1.0)
        weights[sym] = min(w, cap)
    state.artifacts["weights"] = weights
    state.artifacts["selected_assets"] = selected
    return state


def coding_agent(state: AgentState) -> AgentState:
    log(state, "CodingAgent: creating code plan / modules.")
    state.artifacts["code_plan"] = {
        "modules": ["data.py", "signals.py", "risk.py", "report.py"],
        "tests": ["test_signals.py", "test_risk.py"],
    }
    return state


def debugger_self_healing_agent(state: AgentState) -> AgentState:
    log(state, "DebuggerAgent: checking issues and proposing fixes.")
    if state.issues:
        fixes = []
        for issue in state.issues:
            if issue.startswith("DATA_EMPTY"):
                fixes.append("Add data source fallback or require user-provided rows.")
            elif issue.startswith("EXEC_EMPTY"):
                fixes.append("Ensure DataEngineeringAgent outputs at least one asset row.")
            else:
                fixes.append("Add guardrails and retries for upstream agent.")
        state.artifacts["debug_fixes"] = fixes
    return state


def visualization_agent(state: AgentState) -> AgentState:
    log(state, "VisualizationAgent: creating chart specifications.")
    rows = state.artifacts.get("risk_rows", [])
    state.artifacts["charts"] = [
        {"type": "bar", "title": "Upside % by Asset", "x": [r.get("symbol") for r in rows], "y": [r.get("upside_pct_capped") for r in rows]},
        {"type": "bar", "title": "Portfolio Weights", "x": list(state.artifacts.get("weights", {}).keys()), "y": list(state.artifacts.get("weights", {}).values())},
    ]
    return state


def report_agent(state: AgentState) -> AgentState:
    log(state, "ReportAgent: drafting report text.")
    weights = state.artifacts.get("weights", {})
    selected = state.artifacts.get("selected_assets", [])
    lines = [
        f"# Report",
        f"**Goal:** {state.goal}",
        "",
        "## Selected assets & weights",
    ]
    for k, v in weights.items():
        lines.append(f"- **{k}**: {v:.2%}")
    lines.append("")
    lines.append("## Key metrics (top assets)")
    for r in selected[:10]:
        sym = r.get("symbol") or r.get("ticker") or r.get("name")
        up = r.get("upside_pct_capped", r.get("upside_pct"))
        lines.append(f"- {sym}: upside={up}")
    state.artifacts["report_md"] = "\n".join(lines)
    return state


def qa_critic_agent(state: AgentState) -> AgentState:
    log(state, "QACriticAgent: running checks.")
    rows = state.artifacts.get("risk_rows", [])
    if rows and any(r.get("price") in (None, 0) for r in rows):
        state.issues.append("QA_PRICE_MISSING_OR_ZERO: Some assets have missing/zero price.")
    if not state.artifacts.get("report_md"):
        state.issues.append("QA_REPORT_MISSING: Report not generated.")
    score = 1.0
    score -= 0.15 * len(state.issues)
    state.metrics["qa_score"] = max(score, 0.0)
    return state


def compliance_policy_agent(state: AgentState) -> AgentState:
    log(state, "ComplianceAgent: applying policy checks and disclaimers.")
    disclaimer = (
        "\n\n---\n"
        "_Disclaimer: This is informational content, not financial advice. "
        "Verify data and consider your personal risk profile before acting._"
    )
    if "report_md" in state.artifacts:
        state.artifacts["report_md"] += disclaimer
    return state


def memory_knowledge_agent(state: AgentState) -> AgentState:
    log(state, "MemoryAgent: storing reusable learnings.")
    mem = state.artifacts.get("memory_notes", [])
    mem.append({
        "timestamp": time.time(),
        "goal": state.goal,
        "issues": list(state.issues),
        "metrics": dict(state.metrics),
    })
    state.artifacts["memory_notes"] = mem
    return state


def master_orchestrator(state: AgentState) -> AgentState:
    log(state, "Orchestrator: starting pipeline.")

    pipeline = [
        research_agent,
        data_engineering_agent,
        quant_signal_agent,
        risk_manager_agent,
        execution_strategy_agent,
        visualization_agent,
        report_agent,
        qa_critic_agent,
        compliance_policy_agent,
        debugger_self_healing_agent,
        memory_knowledge_agent,
    ]

    for fn in pipeline:
        try:
            state = fn(state)
        except Exception as e:
            tb = traceback.format_exc()
            state.issues.append(f"EXCEPTION_IN_{fn.__name__}: {e}")
            state.artifacts.setdefault("tracebacks", []).append(tb)
            state = debugger_self_healing_agent(state)
            break

        if any(x.startswith("DATA_EMPTY") for x in state.issues):
            log(state, "Orchestrator: stopping early due to empty data.")
            break

    log(state, "Orchestrator: pipeline finished.")
    return state


# Example usage
example_rows = [
    {"symbol": "BAYER", "price": 28.5, "target": 35.0},
    {"symbol": "UMWELT", "price": 5.2, "target": 7.0},
    {"symbol": "SYO", "price": 3.4, "target": 4.1},
]

state = AgentState(
    goal="Create an asset report with upside metrics, weights, and a markdown report.",
    inputs={"rows": example_rows, "top_n": 2, "research_confidence": 0.7},
)

out = master_orchestrator(state)

print("\n--- REPORT ---\n")
print(out.artifacts.get("report_md", "No report generated."))
print("\n--- ISSUES ---\n")
print("\n".join(out.issues) if out.issues else "None")



--- REPORT ---

# Report
**Goal:** Create an asset report with upside metrics, weights, and a markdown report.

## Selected assets & weights
- **UMWELT**: 10.00%
- **BAYER**: 10.00%

## Key metrics (top assets)
- UMWELT: upside=34.615384615384606
- BAYER: upside=22.807017543859647

---
_Disclaimer: This is informational content, not financial advice. Verify data and consider your personal risk profile before acting._

--- ISSUES ---

None
