# Multi‑Reasoning MCP Orchestrator (Codex + Gemini)

**Generated:** 2025-12-26

This notebook builds a complete MCP server project that:

- keeps a **warm** `codex mcp-server` subprocess alive and reused per tool call
- auto‑routes tasks and selects **reasoning effort** + **verbosity**
- supports common workflows: refactors+CI, audits, doc analysis, file ops, ingestion/indexing, code review/optimization
- integrates **Gemini CLI** as an optional secondary agent

---

## Prerequisites (install outside Jupyter)

### 1) Install Codex CLI

```bash
npm i -g @openai/codex
```

### 2) Install Gemini CLI

```bash
npm install -g @google/gemini-cli@latest
```

### 3) Python

- Python 3.10+
- Recommended: run this in a clean virtualenv

---

## What you will build

A project folder: `multi_reasoning_mcp/` with a Python MCP server at:

- `src/multi_reasoning_mcp/server.py`

and modules for routing, prompts, warm Codex client, Gemini runner, and a repo index.


In [2]:
# 1) Write the project files to disk
import os, pathlib, textwrap

PROJECT_DIR = pathlib.Path("./multi_reasoning_mcp").resolve()
print("Creating project at:", PROJECT_DIR)

files = {
    "./multi_reasoning_mcp/README.md": "# Multi-Reasoning MCP Orchestrator (Codex + Gemini)\n\nThis project provides an MCP server that:\n\n- Keeps a **warm** `codex mcp-server` subprocess alive (no respawn per call).\n- Routes tasks to **Codex**, **Gemini CLI**, **OpenAI Chat**, or **both**.\n- Automatically selects **reasoning effort** and **verbosity** for GPT-5.2 style models.\n- Includes a simple SQLite FTS **repo index** for fast repo-wide search.\n\n## Run\n\n```bash\npip install -r requirements.txt\nmcp dev src/multi_reasoning_mcp/server.py\n```\n\nThen connect from:\n- Codex CLI via `codex mcp add ...`\n- Gemini CLI via `~/.gemini/settings.json`\n- MCP Inspector via `mcp dev ...` then open inspector UI\n\n## Environment Variables\n\n- `OPENAI_API_KEY` (enables OpenAI routing + analysis)\n- `CODEX_BIN` (defaults to `codex`)\n- `GEMINI_BIN` (defaults to `gemini`)\n- `OPENAI_MODEL` (defaults to `gpt-5.2`)\n- `MCP_INDEX_DB` (defaults to `.mcp_index.sqlite3`)\n",
    "./multi_reasoning_mcp/requirements.txt": "mcp[cli]>=1.10.0\nopenai>=1.0.0\npython-dotenv>=1.0.0\n",
    "./multi_reasoning_mcp/src/multi_reasoning_mcp/__init__.py": '__all__ = ["server"]',
    "./multi_reasoning_mcp/src/multi_reasoning_mcp/server.py": '"""\nMulti-Reasoning MCP Orchestrator\n================================\n\nThis MCP server:\n- Keeps a Codex MCP server subprocess warm (long-lived) and reuses it across tool calls.\n- Can route tasks to:\n  - Codex MCP server (best for repo edits, refactors, CI fixes, code optimization)\n  - Gemini CLI (best for broad doc analysis / cross-checking / extra tooling)\n  - OpenAI Chat model (best for planning, audit reports, summarization, routing)\n\nRun:\n  mcp dev src/multi_reasoning_mcp/server.py\nor:\n  python -m multi_reasoning_mcp.server\n\nNotes:\n- This server is designed to be consumed by MCP clients (Codex CLI, Gemini CLI, MCP Inspector, etc.).\n- It spawns `codex mcp-server` ONCE at startup and keeps it alive.\n"""\n\nfrom __future__ import annotations\n\nimport os\nfrom contextlib import asynccontextmanager\nfrom collections.abc import AsyncIterator\nfrom dataclasses import asdict\n\nfrom mcp.server.fastmcp import FastMCP\n\nfrom .codex_client import CodexMCPClient\nfrom .gemini_client import GeminiCliRunner\nfrom .indexer import RepoIndex\nfrom .openai_llm import OpenAIChat\nfrom .router import route_task\nfrom .types import (\n    IndexBuildResult,\n    RouteDecision,\n    SearchResults,\n    ToolRunResult,\n)\nfrom .workflows import Orchestrator\n\n\n@asynccontextmanager\nasync def app_lifespan(server: FastMCP) -> AsyncIterator[dict]:\n    """\n    Lifespan hook: start expensive resources ONCE, reuse per tool call.\n\n    We keep Codex MCP server warm by maintaining:\n      - a single long-lived `codex mcp-server` subprocess\n      - a single MCP ClientSession to that subprocess\n    """\n    codex_bin = os.environ.get("CODEX_BIN", "codex")\n    codex_args = ["mcp-server"]\n    codex_env = None  # inherit current env\n\n    codex = CodexMCPClient(command=codex_bin, args=codex_args, env=codex_env)\n    await codex.start()\n\n    gemini = GeminiCliRunner(command=os.environ.get("GEMINI_BIN", "gemini"))\n\n    openai_model = os.environ.get("OPENAI_MODEL", "gpt-5.2")\n    openai_chat = OpenAIChat(model=openai_model)\n\n    index_db = os.environ.get("MCP_INDEX_DB", ".mcp_index.sqlite3")\n    repo_index = RepoIndex(db_path=index_db)\n\n    orchestrator = Orchestrator(\n        codex=codex,\n        gemini=gemini,\n        openai=openai_chat,\n        repo_index=repo_index,\n    )\n\n    try:\n        yield {\n            "codex": codex,\n            "gemini": gemini,\n            "openai": openai_chat,\n            "repo_index": repo_index,\n            "orchestrator": orchestrator,\n        }\n    finally:\n        # graceful shutdown\n        await codex.close()\n\n\nmcp = FastMCP(\n    name="MultiReasoningOrchestrator",\n    lifespan=app_lifespan,\n    # Optional: declare dependencies for `mcp dev --with ...` workflows\n    dependencies=["openai", "python-dotenv"],\n)\n\n\ndef _ctx():\n    ctx = mcp.get_context()\n    return ctx.request_context.lifespan_context\n\n\n@mcp.tool(title="Warm status", description="Check that the warm Codex subprocess is alive and list Codex MCP tools.")\nasync def warm_status() -> dict:\n    lc = _ctx()\n    codex: CodexMCPClient = lc["codex"]\n    tools = await codex.list_tools()\n    return {\n        "codex_alive": codex.is_started,\n        "codex_tools": tools,\n    }\n\n\n@mcp.tool(title="Route task", description="Classify a task and choose backend + reasoning effort. Uses LLM router if OPENAI_API_KEY is set, otherwise uses heuristics.")\ndef route(task: str, task_type: str | None = None) -> RouteDecision:\n    lc = _ctx()\n    openai: OpenAIChat = lc["openai"]\n    return route_task(task=task, task_type=task_type, openai=openai)\n\n\n@mcp.tool(title="Codex direct", description="Run a single Codex session using the warm Codex MCP subprocess.")\nasync def codex_direct(\n    prompt: str,\n    reasoning_effort: str = "medium",\n    verbosity: str = "medium",\n    sandbox: str = "read-only",\n    approval_policy: str = "on-failure",\n    cwd: str = ".",\n    include_plan_tool: bool = True,\n    base_instructions: str | None = None,\n) -> ToolRunResult:\n    lc = _ctx()\n    codex: CodexMCPClient = lc["codex"]\n\n    text, meta = await codex.run(\n        prompt=prompt,\n        reasoning_effort=reasoning_effort,\n        verbosity=verbosity,\n        sandbox=sandbox,\n        approval_policy=approval_policy,\n        cwd=cwd,\n        include_plan_tool=include_plan_tool,\n        base_instructions=base_instructions,\n    )\n    return ToolRunResult(\n        backend="codex",\n        agent_role="general",\n        reasoning_effort=reasoning_effort,  # type: ignore\n        verbosity=verbosity,  # type: ignore\n        output_text=text,\n        raw=meta,\n    )\n\n\n@mcp.tool(title="Gemini direct", description="Run Gemini CLI in non-interactive mode (gemini -p).")\ndef gemini_direct(\n    prompt: str,\n    model: str | None = None,\n    all_files: bool = False,\n    sandbox: bool = False,\n    cwd: str = ".",\n    timeout_sec: int = 1800,\n) -> ToolRunResult:\n    lc = _ctx()\n    gemini: GeminiCliRunner = lc["gemini"]\n\n    out = gemini.run(\n        prompt=prompt,\n        model=model,\n        all_files=all_files,\n        sandbox=sandbox,\n        cwd=cwd,\n        timeout_sec=timeout_sec,\n    )\n    return ToolRunResult(\n        backend="gemini",\n        agent_role="general",\n        reasoning_effort="medium",\n        verbosity="medium",\n        output_text=out["stdout"].strip(),\n        raw=out,\n        warnings=[out["stderr"].strip()] if out.get("stderr") else [],\n    )\n\n\n@mcp.tool(title="Build repo index", description="Build/update a local SQLite FTS index for fast doc/code search.")\ndef build_repo_index(\n    root: str = ".",\n    include_globs: list[str] | None = None,\n    exclude_dirs: list[str] | None = None,\n    max_file_bytes: int = 2_000_000,\n    rebuild: bool = False,\n) -> IndexBuildResult:\n    lc = _ctx()\n    repo_index: RepoIndex = lc["repo_index"]\n    return repo_index.build(\n        root=root,\n        include_globs=include_globs,\n        exclude_dirs=exclude_dirs,\n        max_file_bytes=max_file_bytes,\n        rebuild=rebuild,\n    )\n\n\n@mcp.tool(title="Search repo index", description="Search the local repo index (SQLite FTS).")\ndef search_repo_index(query: str, top_k: int = 10) -> SearchResults:\n    lc = _ctx()\n    repo_index: RepoIndex = lc["repo_index"]\n    return repo_index.search(query=query, top_k=top_k)\n\n\n@mcp.tool(title="Orchestrate", description="Main entrypoint: auto-route a task and run it via Codex, Gemini, OpenAI, or both.")\nasync def orchestrate(\n    task: str,\n    task_type: str | None = None,\n    backend: str | None = None,\n    reasoning_effort: str | None = None,\n    verbosity: str | None = None,\n    dry_run: bool = False,\n) -> ToolRunResult:\n    lc = _ctx()\n    orchestrator: Orchestrator = lc["orchestrator"]\n\n    result = await orchestrator.run(\n        task=task,\n        task_type=task_type,\n        backend_override=backend,\n        reasoning_override=reasoning_effort,\n        verbosity_override=verbosity,\n        dry_run=dry_run,\n    )\n    return result\n\n\nif __name__ == "__main__":\n    # Direct execution (stdio transport)\n    mcp.run()\n',
    "./multi_reasoning_mcp/src/multi_reasoning_mcp/codex_client.py": 'from __future__ import annotations\n\nimport asyncio\nimport json\nimport os\nfrom contextlib import AsyncExitStack\nfrom typing import Any, Dict, List, Optional, Tuple\n\nfrom mcp.client.session import ClientSession\nfrom mcp.client.stdio import StdioServerParameters, stdio_client\n\n\ndef _content_item_to_dict(item: Any) -> dict:\n    # mcp content items are usually pydantic models; be resilient.\n    if hasattr(item, "model_dump"):\n        return item.model_dump()\n    if hasattr(item, "dict"):\n        return item.dict()\n    if isinstance(item, dict):\n        return item\n    return {"type": type(item).__name__, "value": str(item)}\n\n\ndef _extract_text_from_call_tool_result(result: Any) -> str:\n    pieces: List[str] = []\n    content = getattr(result, "content", None)\n\n    if content is None:\n        return str(result)\n\n    for item in content:\n        d = _content_item_to_dict(item)\n        if d.get("type") == "text" and "text" in d:\n            pieces.append(d["text"])\n        elif "text" in d:\n            pieces.append(str(d["text"]))\n        else:\n            # fall back to JSON\n            pieces.append(json.dumps(d, ensure_ascii=False))\n    return "\\n".join(pieces).strip()\n\n\ndef _best_effort_conversation_id(result: Any) -> Optional[str]:\n    """\n    Codex MCP tool supports continuing a session via `codex-reply` with a conversationId.\n    In practice, depending on Codex/MCP versions, the conversation ID may appear in:\n      - metadata fields\n      - streamed event payloads in result.content\n      - (sometimes) not at all\n\n    We try to locate it, but the orchestrator does not rely on it.\n    """\n    # Attempt 1: direct field\n    cid = getattr(result, "conversationId", None) or getattr(result, "conversation_id", None)\n    if isinstance(cid, str) and cid:\n        return cid\n\n    # Attempt 2: scan content for JSON that includes conversationId\n    content = getattr(result, "content", None) or []\n    for item in content:\n        d = _content_item_to_dict(item)\n        txt = d.get("text")\n        if isinstance(txt, str):\n            # sometimes text may be JSON lines\n            for candidate in (txt,):\n                try:\n                    j = json.loads(candidate)\n                except Exception:\n                    continue\n                if isinstance(j, dict) and isinstance(j.get("conversationId"), str):\n                    return j["conversationId"]\n    return None\n\n\nclass CodexMCPClient:\n    """\n    Long-lived MCP client to a local `codex mcp-server` subprocess.\n\n    Key property: call `start()` once at server startup and reuse the same ClientSession\n    for all subsequent tool calls.\n\n    The MCP Python SDK example for stdio clients uses:\n      - StdioServerParameters\n      - stdio_client(...)\n      - ClientSession(...)\n      - session.initialize()\n    """\n\n    def __init__(\n        self,\n        command: str = "codex",\n        args: list[str] | None = None,\n        env: dict[str, str] | None = None,\n    ) -> None:\n        self.command = command\n        self.args = args or ["mcp-server"]\n        self.env = env  # if None, inherit env\n        self._stack = AsyncExitStack()\n        self._session: ClientSession | None = None\n        self._lock = asyncio.Lock()\n\n    @property\n    def is_started(self) -> bool:\n        return self._session is not None\n\n    async def start(self) -> None:\n        if self._session is not None:\n            return\n\n        params = StdioServerParameters(\n            command=self.command,\n            args=self.args,\n            env=self.env,\n        )\n        read, write = await self._stack.enter_async_context(stdio_client(params))\n        session = await self._stack.enter_async_context(ClientSession(read, write))\n        await session.initialize()\n        self._session = session\n\n    async def close(self) -> None:\n        await self._stack.aclose()\n        self._session = None\n\n    async def list_tools(self) -> list[str]:\n        await self.start()\n        assert self._session is not None\n        tools = await self._session.list_tools()\n        # tools.tools is a list of Tool objects; get names robustly\n        names: list[str] = []\n        for t in getattr(tools, "tools", []) or []:\n            if hasattr(t, "name"):\n                names.append(str(t.name))\n            else:\n                d = _content_item_to_dict(t)\n                if "name" in d:\n                    names.append(str(d["name"]))\n        return names\n\n    async def run(\n        self,\n        prompt: str,\n        reasoning_effort: str = "medium",\n        verbosity: str = "medium",\n        sandbox: str = "read-only",\n        approval_policy: str = "on-failure",\n        cwd: str = ".",\n        include_plan_tool: bool = True,\n        base_instructions: str | None = None,\n    ) -> tuple[str, dict]:\n        """\n        Calls the Codex MCP tool named `codex` using the warm session.\n\n        Tool schema documented by OpenAI includes keys like:\n          - prompt\n          - approval-policy\n          - base-instructions\n          - config\n          - include-plan-tool\n          - sandbox\n          - cwd\n        """\n        await self.start()\n        assert self._session is not None\n\n        args: Dict[str, Any] = {\n            "prompt": prompt,\n            "sandbox": sandbox,\n            "cwd": cwd,\n            "approval-policy": approval_policy,\n            "include-plan-tool": include_plan_tool,\n            "config": {\n                # let the user\'s Codex config.toml choose the base model;\n                # we ONLY override reasoning/verbosity here by default.\n                "model_reasoning_effort": reasoning_effort,\n                "model_verbosity": verbosity,\n            },\n        }\n        if base_instructions:\n            args["base-instructions"] = base_instructions\n\n        async with self._lock:\n            result = await self._session.call_tool("codex", args)\n\n        text = _extract_text_from_call_tool_result(result)\n        cid = _best_effort_conversation_id(result)\n        meta = {\n            "conversation_id": cid,\n            "raw": _content_item_to_dict(getattr(result, "model_dump", lambda: result)()),\n        }\n        return text, meta\n',
    "./multi_reasoning_mcp/src/multi_reasoning_mcp/gemini_client.py": 'from __future__ import annotations\n\nimport subprocess\nfrom typing import Any, Optional\n\n\nclass GeminiCliRunner:\n    """\n    Lightweight wrapper around Gemini CLI in non-interactive mode.\n\n    Gemini CLI supports:\n      - `gemini -p "prompt"` (non-interactive / scripting)\n      - piping stdin: `echo "..." | gemini`\n      - `--model <model_name>`\n      - `--all-files` to include all project files as context\n      - `--sandbox` to run tools in a sandbox\n    """\n\n    def __init__(self, command: str = "gemini") -> None:\n        self.command = command\n\n    def run(\n        self,\n        prompt: str,\n        model: Optional[str] = None,\n        all_files: bool = False,\n        sandbox: bool = False,\n        cwd: str = ".",\n        timeout_sec: int = 1800,\n    ) -> dict[str, Any]:\n        cmd = [self.command, "--prompt", prompt]\n        if model:\n            cmd += ["--model", model]\n        if all_files:\n            cmd += ["--all-files"]\n        if sandbox:\n            cmd += ["--sandbox"]\n\n        proc = subprocess.run(\n            cmd,\n            cwd=cwd,\n            capture_output=True,\n            text=True,\n            timeout=timeout_sec,\n        )\n        return {\n            "cmd": cmd,\n            "cwd": cwd,\n            "returncode": proc.returncode,\n            "stdout": proc.stdout,\n            "stderr": proc.stderr,\n        }\n',
    "./multi_reasoning_mcp/src/multi_reasoning_mcp/openai_llm.py": 'from __future__ import annotations\n\nimport json\nimport os\nfrom typing import Any, Optional\n\ntry:\n    from openai import OpenAI\nexcept Exception as e:  # pragma: no cover\n    OpenAI = None  # type: ignore\n\n\nclass OpenAIChat:\n    """\n    Tiny wrapper around OpenAI Chat Completions.\n\n    Why we keep this tiny:\n    - Your MCP server should remain stable even if OpenAI SDK evolves.\n    - If OPENAI_API_KEY is missing, we simply disable LLM routing and fall back to heuristics.\n    """\n\n    def __init__(self, model: str = "gpt-5.2") -> None:\n        self.model = model\n        self.enabled = bool(os.environ.get("OPENAI_API_KEY")) and OpenAI is not None\n        self._client = OpenAI() if self.enabled else None\n\n    def complete(\n        self,\n        developer: str,\n        user: str,\n        reasoning_effort: str = "medium",\n        verbosity: str = "medium",\n        response_format: Optional[dict[str, Any]] = None,\n    ) -> str:\n        if not self.enabled or self._client is None:\n            raise RuntimeError("OpenAIChat is disabled (missing OPENAI_API_KEY or openai package).")\n\n        kwargs: dict[str, Any] = {}\n        if response_format is not None:\n            kwargs["response_format"] = response_format\n\n        resp = self._client.chat.completions.create(\n            model=self.model,\n            messages=[\n                {"role": "developer", "content": developer},\n                {"role": "user", "content": user},\n            ],\n            reasoning_effort=reasoning_effort,\n            verbosity=verbosity,\n            **kwargs,\n        )\n        return (resp.choices[0].message.content or "").strip()\n\n    def complete_json(\n        self,\n        developer: str,\n        user: str,\n        reasoning_effort: str = "low",\n        verbosity: str = "low",\n    ) -> dict[str, Any]:\n        """\n        Uses JSON mode to reliably return parseable JSON.\n        """\n        txt = self.complete(\n            developer=developer,\n            user=user,\n            reasoning_effort=reasoning_effort,\n            verbosity=verbosity,\n            response_format={"type": "json_object"},\n        )\n        return json.loads(txt)\n',
    "./multi_reasoning_mcp/src/multi_reasoning_mcp/prompts.py": 'from __future__ import annotations\n\nROLE_INSTRUCTIONS: dict[str, str] = {\n    # Most common tasks\n    "refactorer": """You are RefactorerAgent.\n\nGoals:\n- Perform safe refactors without changing external behavior unless explicitly requested.\n- Keep changes incremental and easy to review.\n- Update tests and CI configs as needed.\n- Prefer small, obvious improvements over clever rewrites.\n\nOutput format (always):\n1) Plan (3-8 bullets)\n2) Changes made (files + what changed)\n3) Commands to run (exact)\n4) Risks / follow-ups\n""",\n    "ci_fixer": """You are CIFixerAgent.\n\nGoals:\n- Make CI green with the smallest safe change.\n- Identify root cause and explain it briefly.\n- Prefer fixing tests/tooling over masking failures.\n\nOutput format:\n1) Diagnosis\n2) Minimal fix\n3) Validation steps (commands)\n4) If CI is flaky: stabilization suggestions\n""",\n    "auditor": """You are CodeAuditAgent (read-only by default).\n\nGoals:\n- Identify correctness, security, and maintainability issues.\n- For each finding: severity, evidence (file path + snippet), and recommended fix.\n- If asked for a patch, propose a diff (do NOT apply changes unless instructed).\n\nOutput format:\n- Executive summary\n- Findings table (severity, area, file, recommendation)\n- Top 3 quick wins\n- Deeper refactor suggestions (optional)\n""",\n    "doc_analyst": """You are DocumentAnalystAgent.\n\nGoals:\n- Analyze all relevant documents as a whole: themes, trends, contradictions, gaps.\n- Cite evidence by file path and (if possible) headings/quotes.\n- Provide an actionable synthesis (not just summaries).\n\nOutput format:\n- Executive summary\n- Key themes (with evidence)\n- Trends over time (if dates exist)\n- Contradictions / missing info\n- Recommended next actions\n""",\n    "file_ops": """You are FileOpsAgent.\n\nGoals:\n- Sort/rename/move files safely.\n- Produce a deterministic rename/move plan.\n- Avoid data loss; preserve git history when possible.\n- If a change is large, stage it in steps.\n\nOutput format:\n1) Proposed mapping (old -> new)\n2) Rationale (rules used)\n3) Safety checks\n4) Commands to apply + rollback plan\n""",\n    "data_engineer": """You are DataEngineerAgent.\n\nGoals:\n- Build ingestion/analysis/storage/indexing pipelines that are reliable and observable.\n- Prefer explicit schemas, validation, and idempotent jobs.\n- Be clear about assumptions and scaling limits.\n\nOutput format:\n- Requirements & assumptions\n- Proposed architecture\n- Implementation steps\n- Data model / schema\n- Test & monitoring plan\n""",\n    "researcher": """You are ResearchIngestAgent.\n\nGoals:\n- Gather info, summarize, and turn it into ingestible structured data.\n- Be explicit about sources, recency, and confidence.\n- If the environment lacks web access, propose offline ingestion steps.\n\nOutput format:\n- Source list\n- Key extracted facts\n- Normalized schema\n- Ingestion plan\n""",\n    "reviewer": """You are CodeReviewAgent.\n\nGoals:\n- Review changes for correctness, readability, tests, security, and performance.\n- Provide specific actionable comments and suggested diffs/snippets.\n\nOutput format:\n- Summary verdict (approve / request changes / nit)\n- High priority issues\n- Medium/low priority suggestions\n- Test coverage notes\n""",\n    "optimizer": """You are PerformanceOptimizerAgent.\n\nGoals:\n- Identify bottlenecks and suggest measurable improvements.\n- Prefer profiling/measurement before risky micro-optimizations.\n\nOutput format:\n- Baseline assumptions\n- Likely bottlenecks\n- Proposed optimizations (ranked by ROI)\n- Benchmark plan\n""",\n    "general": """You are a general purpose engineering agent.\n\nOutput format:\n- Plan\n- Execution\n- Verification\n- Next steps\n""",\n}\n',
    "./multi_reasoning_mcp/src/multi_reasoning_mcp/router.py": 'from __future__ import annotations\n\nimport re\nfrom typing import Optional\n\nfrom .openai_llm import OpenAIChat\nfrom .types import RouteDecision\n\n\ndef _normalize(s: str) -> str:\n    return re.sub(r"\\s+", " ", s or "").strip().lower()\n\n\ndef heuristic_route(task: str, task_type: str | None = None) -> RouteDecision:\n    t = _normalize(task)\n    tt = _normalize(task_type or "")\n\n    # defaults\n    backend = "codex"\n    agent_role = "general"\n    reasoning_effort = "medium"\n    verbosity = "medium"\n    codex_sandbox = "read-only"\n    codex_approval_policy = "on-failure"\n    gemini_model = None\n    notes = "heuristic"\n\n    def set_(**kw):\n        nonlocal backend, agent_role, reasoning_effort, verbosity, codex_sandbox, codex_approval_policy, gemini_model, notes\n        for k, v in kw.items():\n            if v is not None:\n                locals()[k]  # noop for lint\n        backend = kw.get("backend", backend)\n        agent_role = kw.get("agent_role", agent_role)\n        reasoning_effort = kw.get("reasoning_effort", reasoning_effort)\n        verbosity = kw.get("verbosity", verbosity)\n        codex_sandbox = kw.get("codex_sandbox", codex_sandbox)\n        codex_approval_policy = kw.get("codex_approval_policy", codex_approval_policy)\n        gemini_model = kw.get("gemini_model", gemini_model)\n        notes = kw.get("notes", notes)\n\n    # If explicit task_type provided\n    if tt:\n        if "refactor" in tt:\n            set_(agent_role="refactorer", reasoning_effort="high", codex_sandbox="workspace-write")\n        elif "ci" in tt or "pipeline" in tt:\n            set_(agent_role="ci_fixer", reasoning_effort="high", codex_sandbox="workspace-write")\n        elif "audit" in tt:\n            set_(agent_role="auditor", backend="codex", reasoning_effort="high", codex_sandbox="read-only", codex_approval_policy="untrusted")\n        elif "document" in tt or "trend" in tt:\n            set_(agent_role="doc_analyst", backend="gemini", reasoning_effort="high")\n        elif "rename" in tt or "sort" in tt:\n            set_(agent_role="file_ops", reasoning_effort="medium", codex_sandbox="workspace-write")\n        elif "index" in tt or "ingest" in tt or "data" in tt:\n            set_(agent_role="data_engineer", reasoning_effort="high", codex_sandbox="workspace-write")\n        elif "review" in tt:\n            set_(agent_role="reviewer", backend="both", reasoning_effort="high", codex_sandbox="read-only")\n        elif "optimiz" in tt:\n            set_(agent_role="optimizer", reasoning_effort="high", codex_sandbox="workspace-write")\n\n        return RouteDecision(\n            backend=backend, agent_role=agent_role, reasoning_effort=reasoning_effort, verbosity=verbosity,\n            codex_sandbox=codex_sandbox, codex_approval_policy=codex_approval_policy,\n            gemini_model=gemini_model, notes=notes\n        )\n\n    # Keyword routing\n    if any(k in t for k in ["refactor", "re-architect", "cleanup", "rename symbol", "extract", "modularize"]):\n        set_(agent_role="refactorer", reasoning_effort="high", codex_sandbox="workspace-write", notes="refactor heuristic")\n    if any(k in t for k in ["ci", "github action", "pipeline", "build failing", "tests failing", "lint failing"]):\n        set_(agent_role="ci_fixer", reasoning_effort="high", codex_sandbox="workspace-write", notes="ci heuristic")\n    if any(k in t for k in ["audit", "security", "vulnerability", "threat model", "permissions"]):\n        set_(agent_role="auditor", backend="codex", reasoning_effort="xhigh", codex_sandbox="read-only", codex_approval_policy="untrusted", notes="audit heuristic")\n    if any(k in t for k in ["docs", "document", "trend", "themes", "summarize all", "corpus"]):\n        set_(agent_role="doc_analyst", backend="gemini", reasoning_effort="high", notes="doc heuristic")\n    if any(k in t for k in ["rename files", "sort files", "move files", "reorganize folders"]):\n        set_(agent_role="file_ops", reasoning_effort="medium", codex_sandbox="workspace-write", notes="file ops heuristic")\n    if any(k in t for k in ["ingest", "etl", "pipeline", "index", "vector", "embedding", "warehouse"]):\n        set_(agent_role="data_engineer", reasoning_effort="high", codex_sandbox="workspace-write", notes="data heuristic")\n    if any(k in t for k in ["code review", "review this diff", "review changes"]):\n        set_(agent_role="reviewer", backend="both", reasoning_effort="high", codex_sandbox="read-only", notes="review heuristic")\n    if any(k in t for k in ["optimize", "performance", "speed up", "reduce memory"]):\n        set_(agent_role="optimizer", reasoning_effort="high", codex_sandbox="workspace-write", notes="opt heuristic")\n\n    return RouteDecision(\n        backend=backend,\n        agent_role=agent_role,\n        reasoning_effort=reasoning_effort,\n        verbosity=verbosity,\n        codex_sandbox=codex_sandbox,\n        codex_approval_policy=codex_approval_policy,\n        gemini_model=gemini_model,\n        notes=notes,\n    )\n\n\ndef llm_route(task: str, task_type: str | None, openai: OpenAIChat) -> RouteDecision:\n    dev = """You are RoutingAgent for a multi-backend engineering orchestrator.\n\nReturn ONLY a JSON object with these keys:\n- backend: one of ["codex","gemini","openai","both"]\n- agent_role: one of ["refactorer","ci_fixer","auditor","doc_analyst","file_ops","data_engineer","researcher","reviewer","optimizer","general"]\n- reasoning_effort: one of ["minimal","low","medium","high","xhigh"]\n- verbosity: one of ["low","medium","high"]\n- codex_sandbox: one of ["read-only","workspace-write","danger-full-access"]\n- codex_approval_policy: one of ["untrusted","on-failure","never"]\n- gemini_model: string or null\n- notes: short string\n\nRouting rules:\n- Use codex when code changes, refactors, CI fixes, or optimization require editing/running code.\n- Use gemini for broad doc analysis or when you want extra tooling/cross-check.\n- Use both for high-stakes code changes: codex implements, gemini reviews.\n- Use openai for pure analysis/planning/audit writeups without code execution.\n\nPick the LOWEST reasoning_effort that is safe; increase effort for:\n- repo-wide changes, security concerns, ambiguous requirements, architectural decisions.\n"""\n    user = f"""TASK_TYPE: {task_type or "(none)"}\n\nTASK:\n{task}\n"""\n\n    j = openai.complete_json(developer=dev, user=user, reasoning_effort="low", verbosity="low")\n    # Normalize and validate lightly with fallbacks\n    backend = j.get("backend", "codex")\n    agent_role = j.get("agent_role", "general")\n    reasoning_effort = j.get("reasoning_effort", "medium")\n    verbosity = j.get("verbosity", "medium")\n    codex_sandbox = j.get("codex_sandbox", "read-only")\n    codex_approval_policy = j.get("codex_approval_policy", "on-failure")\n    gemini_model = j.get("gemini_model", None)\n    notes = j.get("notes", "llm")\n\n    return RouteDecision(\n        backend=backend,\n        agent_role=agent_role,\n        reasoning_effort=reasoning_effort,\n        verbosity=verbosity,\n        codex_sandbox=codex_sandbox,\n        codex_approval_policy=codex_approval_policy,\n        gemini_model=gemini_model,\n        notes=notes,\n    )\n\n\ndef route_task(task: str, task_type: str | None, openai: OpenAIChat) -> RouteDecision:\n    if openai.enabled:\n        try:\n            return llm_route(task=task, task_type=task_type, openai=openai)\n        except Exception:\n            return heuristic_route(task=task, task_type=task_type)\n    return heuristic_route(task=task, task_type=task_type)\n',
    "./multi_reasoning_mcp/src/multi_reasoning_mcp/indexer.py": 'from __future__ import annotations\n\nimport os\nimport sqlite3\nfrom dataclasses import dataclass\nfrom pathlib import Path\nfrom typing import Iterable, Optional\n\nfrom .types import IndexBuildResult, SearchHit, SearchResults\n\n\nDEFAULT_INCLUDE_GLOBS = [\n    "**/*.md",\n    "**/*.txt",\n    "**/*.rst",\n    "**/*.py",\n    "**/*.js",\n    "**/*.ts",\n    "**/*.tsx",\n    "**/*.json",\n    "**/*.yaml",\n    "**/*.yml",\n    "**/*.toml",\n]\n\nDEFAULT_EXCLUDE_DIRS = [\n    ".git", ".venv", "venv", "node_modules", "dist", "build", ".mypy_cache", ".pytest_cache", "__pycache__",\n]\n\n\nclass RepoIndex:\n    """\n    Simple local index using SQLite FTS5.\n    - Great for repo-wide search without needing an LLM to "remember everything".\n    - Fast enough for most repos.\n    """\n\n    def __init__(self, db_path: str = ".mcp_index.sqlite3") -> None:\n        self.db_path = db_path\n        self._ensure_schema()\n\n    def _connect(self) -> sqlite3.Connection:\n        con = sqlite3.connect(self.db_path)\n        con.row_factory = sqlite3.Row\n        return con\n\n    def _ensure_schema(self) -> None:\n        con = self._connect()\n        try:\n            con.execute("PRAGMA journal_mode=WAL;")\n            con.execute("""\n                CREATE TABLE IF NOT EXISTS files (\n                    path TEXT PRIMARY KEY,\n                    mtime REAL NOT NULL,\n                    bytes INTEGER NOT NULL\n                );\n            """)\n            # FTS5 content table\n            con.execute("""\n                CREATE VIRTUAL TABLE IF NOT EXISTS fts USING fts5(\n                    path UNINDEXED,\n                    content,\n                    tokenize = \'porter\'\n                );\n            """)\n            con.commit()\n        finally:\n            con.close()\n\n    def build(\n        self,\n        root: str = ".",\n        include_globs: list[str] | None = None,\n        exclude_dirs: list[str] | None = None,\n        max_file_bytes: int = 2_000_000,\n        rebuild: bool = False,\n    ) -> IndexBuildResult:\n        root_path = Path(root).resolve()\n        include_globs = include_globs or DEFAULT_INCLUDE_GLOBS\n        exclude_dirs = exclude_dirs or DEFAULT_EXCLUDE_DIRS\n\n        con = self._connect()\n        indexed, skipped, bytes_indexed = 0, 0, 0\n\n        try:\n            if rebuild:\n                con.execute("DELETE FROM fts;")\n                con.execute("DELETE FROM files;")\n                con.commit()\n\n            exclude_set = set(exclude_dirs)\n\n            def is_excluded(p: Path) -> bool:\n                return any(part in exclude_set for part in p.parts)\n\n            # Collect candidate files\n            candidates: set[Path] = set()\n            for g in include_globs:\n                for p in root_path.glob(g):\n                    if p.is_file() and not is_excluded(p):\n                        candidates.add(p)\n\n            for p in sorted(candidates):\n                try:\n                    stat = p.stat()\n                except OSError:\n                    skipped += 1\n                    continue\n                if stat.st_size > max_file_bytes:\n                    skipped += 1\n                    continue\n\n                rel = str(p.relative_to(root_path))\n                mtime = stat.st_mtime\n                size = stat.st_size\n\n                row = con.execute("SELECT mtime FROM files WHERE path = ?", (rel,)).fetchone()\n                if row is not None and float(row["mtime"]) >= mtime:\n                    # up-to-date\n                    continue\n\n                try:\n                    content = p.read_text(encoding="utf-8", errors="ignore")\n                except Exception:\n                    skipped += 1\n                    continue\n\n                # Upsert\n                con.execute("INSERT OR REPLACE INTO files(path, mtime, bytes) VALUES(?,?,?)", (rel, mtime, size))\n                con.execute("DELETE FROM fts WHERE path = ?", (rel,))\n                con.execute("INSERT INTO fts(path, content) VALUES(?,?)", (rel, content))\n                indexed += 1\n                bytes_indexed += size\n\n            con.commit()\n        finally:\n            con.close()\n\n        return IndexBuildResult(\n            root=str(root_path),\n            indexed_files=indexed,\n            skipped_files=skipped,\n            bytes_indexed=bytes_indexed,\n            db_path=self.db_path,\n        )\n\n    def search(self, query: str, top_k: int = 10) -> SearchResults:\n        con = self._connect()\n        try:\n            rows = con.execute(\n                """\n                SELECT path, bm25(fts) AS score,\n                       snippet(fts, 1, \'[\', \']\', \'…\', 12) AS snippet\n                FROM fts\n                WHERE fts MATCH ?\n                ORDER BY score\n                LIMIT ?;\n                """,\n                (query, top_k),\n            ).fetchall()\n\n            hits = [\n                SearchHit(\n                    path=r["path"],\n                    score=float(r["score"]),\n                    snippet=str(r["snippet"]),\n                )\n                for r in rows\n            ]\n            return SearchResults(query=query, hits=hits)\n        finally:\n            con.close()\n',
    "./multi_reasoning_mcp/src/multi_reasoning_mcp/types.py": 'from __future__ import annotations\n\nfrom dataclasses import dataclass, field\nfrom typing import Any, Literal, Optional\n\n\nReasoningEffort = Literal["minimal", "low", "medium", "high", "xhigh"]\nVerbosity = Literal["low", "medium", "high"]\nBackend = Literal["codex", "gemini", "openai", "both"]\n\nSandboxMode = Literal["read-only", "workspace-write", "danger-full-access"]\nApprovalPolicy = Literal["untrusted", "on-failure", "never"]\n\nAgentRole = Literal[\n    "refactorer",\n    "ci_fixer",\n    "auditor",\n    "doc_analyst",\n    "file_ops",\n    "data_engineer",\n    "researcher",\n    "reviewer",\n    "optimizer",\n    "general",\n]\n\n\n@dataclass\nclass RouteDecision:\n    backend: Backend\n    agent_role: AgentRole\n    reasoning_effort: ReasoningEffort\n    verbosity: Verbosity = "medium"\n    codex_sandbox: SandboxMode = "read-only"\n    codex_approval_policy: ApprovalPolicy = "on-failure"\n    gemini_model: Optional[str] = None\n    notes: str = ""\n\n\n@dataclass\nclass ToolRunResult:\n    backend: Backend\n    agent_role: AgentRole\n    reasoning_effort: ReasoningEffort\n    verbosity: Verbosity\n    output_text: str\n    raw: Any = field(default=None)\n    warnings: list[str] = field(default_factory=list)\n\n\n@dataclass\nclass IndexBuildResult:\n    root: str\n    indexed_files: int\n    skipped_files: int\n    bytes_indexed: int\n    db_path: str\n\n\n@dataclass\nclass SearchHit:\n    path: str\n    score: float\n    snippet: str\n\n\n@dataclass\nclass SearchResults:\n    query: str\n    hits: list[SearchHit]\n',
    "./multi_reasoning_mcp/src/multi_reasoning_mcp/workflows.py": 'from __future__ import annotations\n\nimport asyncio\nfrom dataclasses import asdict\nfrom typing import Optional\n\nfrom .codex_client import CodexMCPClient\nfrom .gemini_client import GeminiCliRunner\nfrom .indexer import RepoIndex\nfrom .openai_llm import OpenAIChat\nfrom .prompts import ROLE_INSTRUCTIONS\nfrom .router import route_task\nfrom .types import RouteDecision, ToolRunResult\n\n\ndef _join(*parts: str) -> str:\n    return "\\n\\n".join([p.strip() for p in parts if p and p.strip()]).strip()\n\n\nclass Orchestrator:\n    """\n    High-level workflow engine:\n    - Routes the task\n    - Selects reasoning level / sandbox / approvals\n    - Applies an agent prompt template\n    - Executes via the chosen backend(s)\n    """\n\n    def __init__(\n        self,\n        codex: CodexMCPClient,\n        gemini: GeminiCliRunner,\n        openai: OpenAIChat,\n        repo_index: RepoIndex,\n    ) -> None:\n        self.codex = codex\n        self.gemini = gemini\n        self.openai = openai\n        self.repo_index = repo_index\n\n    async def run(\n        self,\n        task: str,\n        task_type: str | None = None,\n        backend_override: str | None = None,\n        reasoning_override: str | None = None,\n        verbosity_override: str | None = None,\n        dry_run: bool = False,\n    ) -> ToolRunResult:\n        decision: RouteDecision = route_task(task=task, task_type=task_type, openai=self.openai)\n\n        if backend_override:\n            decision.backend = backend_override  # type: ignore\n        if reasoning_override:\n            decision.reasoning_effort = reasoning_override  # type: ignore\n        if verbosity_override:\n            decision.verbosity = verbosity_override  # type: ignore\n\n        role_preamble = ROLE_INSTRUCTIONS.get(decision.agent_role, ROLE_INSTRUCTIONS["general"])\n\n        # Add a small policy wrapper that standardizes outputs and reduces failure modes\n        policy = """Global constraints:\n- If you plan to edit files or run commands, state that in the plan.\n- Prefer small, reversible changes.\n- Never delete data unless explicitly asked.\n- Always end with a short verification checklist.\n"""\n\n        full_prompt = _join(\n            role_preamble,\n            policy,\n            f"TASK:\\n{task}",\n            "If you need clarification, make a best-effort assumption and list it explicitly.",\n        )\n\n        if dry_run:\n            return ToolRunResult(\n                backend=decision.backend,\n                agent_role=decision.agent_role,\n                reasoning_effort=decision.reasoning_effort,\n                verbosity=decision.verbosity,\n                output_text="DRY RUN\\n\\nRoute decision:\\n" + str(asdict(decision)) + "\\n\\nPrompt:\\n" + full_prompt,\n                raw={"route": asdict(decision)},\n            )\n\n        if decision.backend == "codex":\n            text, meta = await self.codex.run(\n                prompt=full_prompt,\n                reasoning_effort=decision.reasoning_effort,\n                verbosity=decision.verbosity,\n                sandbox=decision.codex_sandbox,\n                approval_policy=decision.codex_approval_policy,\n                cwd=".",\n                include_plan_tool=True,\n            )\n            return ToolRunResult(\n                backend="codex",\n                agent_role=decision.agent_role,\n                reasoning_effort=decision.reasoning_effort,\n                verbosity=decision.verbosity,\n                output_text=text,\n                raw={"route": asdict(decision), "codex": meta},\n            )\n\n        if decision.backend == "gemini":\n            out = self.gemini.run(\n                prompt=full_prompt,\n                model=decision.gemini_model,\n                all_files=(decision.agent_role in ("doc_analyst", "auditor")),\n                sandbox=False,\n                cwd=".",\n                timeout_sec=1800,\n            )\n            return ToolRunResult(\n                backend="gemini",\n                agent_role=decision.agent_role,\n                reasoning_effort=decision.reasoning_effort,\n                verbosity=decision.verbosity,\n                output_text=out["stdout"].strip(),\n                raw={"route": asdict(decision), "gemini": out},\n                warnings=[out["stderr"].strip()] if out.get("stderr") else [],\n            )\n\n        if decision.backend == "openai":\n            # Pure LLM response (no tool execution)\n            if not self.openai.enabled:\n                return ToolRunResult(\n                    backend="openai",\n                    agent_role=decision.agent_role,\n                    reasoning_effort=decision.reasoning_effort,\n                    verbosity=decision.verbosity,\n                    output_text="OpenAI backend requested but OPENAI_API_KEY is not set. Re-run with OPENAI_API_KEY or use codex/gemini backend.",\n                    raw={"route": asdict(decision)},\n                    warnings=["OPENAI_API_KEY missing"],\n                )\n\n            txt = self.openai.complete(\n                developer=f"You are {decision.agent_role}. Follow the instructions in the user message.",\n                user=full_prompt,\n                reasoning_effort=decision.reasoning_effort,\n                verbosity=decision.verbosity,\n            )\n            return ToolRunResult(\n                backend="openai",\n                agent_role=decision.agent_role,\n                reasoning_effort=decision.reasoning_effort,\n                verbosity=decision.verbosity,\n                output_text=txt,\n                raw={"route": asdict(decision)},\n            )\n\n        if decision.backend == "both":\n            # Codex first (implementation), then Gemini review (cross-check).\n            # If you want a different composition, adjust here.\n            text1, meta1 = await self.codex.run(\n                prompt=full_prompt,\n                reasoning_effort=decision.reasoning_effort,\n                verbosity=decision.verbosity,\n                sandbox=decision.codex_sandbox,\n                approval_policy=decision.codex_approval_policy,\n                cwd=".",\n                include_plan_tool=True,\n            )\n\n            review_prompt = _join(\n                ROLE_INSTRUCTIONS["reviewer"],\n                "Review the following Codex output and propose fixes or risks.",\n                "CODEX OUTPUT:",\n                text1,\n            )\n            out2 = self.gemini.run(\n                prompt=review_prompt,\n                model=decision.gemini_model,\n                all_files=False,\n                sandbox=False,\n                cwd=".",\n                timeout_sec=1800,\n            )\n\n            combined = _join(\n                "=== CODEX ===",\n                text1,\n                "=== GEMINI REVIEW ===",\n                out2["stdout"].strip(),\n            )\n\n            warnings = []\n            if out2.get("stderr"):\n                warnings.append(out2["stderr"].strip())\n\n            return ToolRunResult(\n                backend="both",\n                agent_role=decision.agent_role,\n                reasoning_effort=decision.reasoning_effort,\n                verbosity=decision.verbosity,\n                output_text=combined,\n                raw={"route": asdict(decision), "codex": meta1, "gemini": out2},\n                warnings=warnings,\n            )\n\n        # fallback\n        return ToolRunResult(\n            backend="openai",\n            agent_role="general",\n            reasoning_effort="medium",\n            verbosity="medium",\n            output_text=f"Unknown backend \'{decision.backend}\'.",\n            raw={"route": asdict(decision)},\n            warnings=["Unknown backend"],\n        )\n',
}

for path_str, content in files.items():
    path = pathlib.Path(path_str).resolve()
    path.parent.mkdir(parents=True, exist_ok=True)
    path.write_text(content, encoding="utf-8")
print("Wrote", len(files), "files.")


Creating project at: /home/victor/projects/codex-multireason-mcp/multi_reasoning_mcp
Wrote 12 files.


## Install Python dependencies

Run these commands in your shell (or in a notebook cell with `!`):

```bash
cd multi_reasoning_mcp
python -m venv .venv
source .venv/bin/activate  # (Windows: .venv\Scripts\activate)
pip install -r requirements.txt
```

If you want OpenAI-based routing/summarization, set:

```bash
export OPENAI_API_KEY='...'
```

Codex and Gemini each have their own auth flows; run their CLIs once interactively to authenticate.


## Run the MCP server

In a terminal:

```bash
cd multi_reasoning_mcp
mcp dev src/multi_reasoning_mcp/server.py
```

Or directly:

```bash
python -m multi_reasoning_mcp.server
```


## Test the server with MCP Inspector

The MCP Python SDK includes an inspector. With the server running, open another terminal:

```bash
mcp dev src/multi_reasoning_mcp/server.py
```

The `mcp dev` command opens an inspector UI; use it to call tools like `warm_status` and `orchestrate`.


## Connect from Codex CLI (as an MCP client)

Codex stores MCP configuration in `~/.codex/config.toml` and also provides `codex mcp add ...`.

Example config snippet (edit paths as needed):

```toml
[mcp_servers.multi_reasoning]
command = "python"
args = ["-m", "multi_reasoning_mcp.server"]
cwd = "/absolute/path/to/your/repo"
```

Then, in Codex TUI, use `/mcp` to confirm it's connected.


## Connect from Gemini CLI (as an MCP client)

Gemini CLI supports MCP servers configured in `~/.gemini/settings.json`.

Example (schema may evolve — adjust to your installed Gemini CLI version):

```json
{
  "mcpServers": {
    "multi_reasoning": {
      "command": "python",
      "args": ["-m", "multi_reasoning_mcp.server"],
      "cwd": "/absolute/path/to/your/repo"
    }
  }
}
```

Inside Gemini CLI, you should then be able to reference the server by name (depends on Gemini CLI UX).


## Example calls

Once connected from a client, try:

- `warm_status` → should list Codex tools `codex` and `codex-reply`
- `orchestrate` with `task_type='refactor+ci'` and a task description
- `build_repo_index` then `search_repo_index` for fast repo-wide search
