# Quantum Volume Finder

## A Deep Agent Example for Actual QV Measurement

This notebook demonstrates a multi-agent system that **finds the highest achievable Quantum Volume** for IBM Quantum backends through **actual hardware execution**.

Unlike simple analysis tools, this agent **runs experiments** and reports **actual results**.
It uses a top-down strategy: start at the highest requested depth and work down until it finds a depth that passes the QV criteria.

### What is Quantum Volume?

Quantum Volume (QV) 2^n is **achieved** when:
- Running n-qubit, depth-n random circuits
- Heavy Output Probability (HOP) > 2/3
- HOP = (shots resulting in heavy outputs) / (total shots)

### Two Modes: Quick Test vs Full Protocol

**Single-Circuit Mode** (default): For each depth, generates one random QV circuit, runs it once on hardware, and checks if HOP > 2/3. This gives a quick signal but is **not statistically rigorous**.

**Full QV Protocol** (`NUM_CIRCUITS = 100`): Implements the standard QV protocol ([arXiv:1811.12926](https://arxiv.org/abs/1811.12926)):
1. Generates N **independent** random QV circuits per depth (each with a different seed)
2. Runs each circuit on hardware via the `run_qv_depth_trial` batch tool
3. Computes individual HOP for each circuit
4. Applies a one-sided confidence interval test: the lower bound of the 97.5% CI of the mean HOP must exceed 2/3
5. Only then is QV officially "achieved" for that depth

Set `NUM_CIRCUITS = 100` (or more) in the configuration cell for official QV certification.

### Strategy: Top-Down Search

1. Start at max_depth (e.g., 5)
2. Run QV circuit on hardware
3. Calculate HOP from measurement results
4. If HOP > 2/3: **SUCCESS!** QV 2^n achieved
5. If HOP <= 2/3: Try depth-1
6. Repeat until success or depth 2

## Architecture

```
                    QUANTUM VOLUME FINDER
                      (Coordinator Agent)
                             |
          +------------------+------------------+
          |                  |                  |
          v                  v                  v
   BACKEND ANALYST    QUBIT CHAIN         QV EXPERIMENT
                      OPTIMIZER           RUNNER
          |                  |                  |
          v                  v                  v
   qiskit-ibm-        qiskit-ibm-        transpile_qv_circuit
   runtime-mcp        runtime-mcp        submit_qv_job
   (backends)         (QV qubit tools    get_job_status_tool
                       searches ALL      (local wrappers +
                       qubits)           MCP tools)
```

### Data Flow Design

Large data (QASM circuits, QPY binaries, measurement counts) **never flows through the LLM**.
Local tools call MCP tools programmatically and pass data via closures:

- `transpile_qv_circuit(depth, backend, layout)` — generates QV circuit lazily, calls `hybrid_ai_transpile_tool` MCP internally, stores QPY
- `submit_qv_job(depth, backend, shots)` — reads stored QPY, calls `run_sampler_tool` MCP internally
- `calculate_hop(job_id, depth)` — calls `get_job_results_tool` MCP internally, looks up heavy outputs
- `run_qv_depth_trial(depth, backend, layout, num_circuits)` — **full protocol batch tool**: generates N circuits, transpiles, submits, polls, computes HOPs, runs CI test (multi-circuit mode only)

### MCP Tools Used (via wrappers or directly)
- `get_backend_properties_tool`, `find_optimal_qv_qubits_tool` (runtime MCP, direct)
- `hybrid_ai_transpile_tool` (transpiler MCP, via `transpile_qv_circuit` / `run_qv_depth_trial`)
- `run_sampler_tool` (runtime MCP, via `submit_qv_job` / `run_qv_depth_trial`)
- `get_job_status_tool` (runtime MCP, direct + via `run_qv_depth_trial`)
- `get_job_results_tool` (runtime MCP, via `calculate_hop` / `run_qv_depth_trial`)

## Setup

```bash
pip install deepagents langchain langchain-mcp-adapters python-dotenv
pip install langchain-anthropic
pip install qiskit-mcp-servers
```

In [None]:
import json
import os
import sys
from datetime import datetime
from typing import Any

from deepagents import create_deep_agent
from dotenv import load_dotenv
from langchain_anthropic import ChatAnthropic
from langchain_core.callbacks import BaseCallbackHandler
from langchain_mcp_adapters.client import MultiServerMCPClient

# Load .env (override=True so .env takes precedence over shell env vars)
load_dotenv(override=True)

print("Configuration:")
print(f"  QISKIT_IBM_TOKEN: {'Set' if os.getenv('QISKIT_IBM_TOKEN') else 'Not set'}")
print(f"  ANTHROPIC_API_KEY: {'Set' if os.getenv('ANTHROPIC_API_KEY') else 'Not set'}")


# Callback handler for agent observability
class AgentActivityHandler(BaseCallbackHandler):
    """Shows what the agent is doing during execution."""

    def __init__(self, verbose: bool = True):
        self.verbose = verbose
        self.indent_level = 0
        self.current_tool = None

    def _timestamp(self) -> str:
        return datetime.now().strftime("%H:%M:%S")

    def _print(self, msg: str, color: str = "") -> None:
        indent = "  " * self.indent_level
        if color and sys.stdout.isatty():
            colors = {
                "blue": "\033[94m",
                "green": "\033[92m",
                "yellow": "\033[93m",
                "red": "\033[91m",
                "cyan": "\033[96m",
                "magenta": "\033[95m",
                "reset": "\033[0m",
            }
            print(f"{colors.get(color, '')}{indent}{msg}{colors['reset']}", flush=True)
        else:
            print(f"{indent}{msg}", flush=True)

    def on_tool_start(self, serialized: dict | None, input_str: str, **kwargs) -> None:
        tool_name = serialized.get("name", "unknown_tool") if serialized else "unknown_tool"
        self.current_tool = tool_name
        self._print(f"\n[{self._timestamp()}] TOOL: {tool_name}", "cyan")
        self.indent_level += 1
        if self.verbose and input_str:
            input_preview = str(input_str)[:200] + ("..." if len(str(input_str)) > 200 else "")
            self._print(f"Input: {input_preview}", "blue")

    def on_tool_end(self, output: str, **kwargs) -> None:
        if self.verbose and output:
            output_preview = str(output)[:300] + ("..." if len(str(output)) > 300 else "")
            self._print(f"Output: {output_preview}", "green")
        self.indent_level = max(0, self.indent_level - 1)
        self._print(f"[{self._timestamp()}] Done: {self.current_tool}", "green")

    def on_tool_error(self, error: Exception, **kwargs) -> None:
        self.indent_level = max(0, self.indent_level - 1)
        self._print(f"[{self._timestamp()}] FAILED: {self.current_tool}: {error}", "red")

    def on_agent_action(self, action, **kwargs) -> None:
        tool = getattr(action, "tool", "unknown")
        self._print(f"\n[{self._timestamp()}] Agent calling: {tool}", "yellow")

    def on_agent_finish(self, finish, **kwargs) -> None:
        self._print(f"\n[{self._timestamp()}] Agent finished", "green")

    def on_llm_start(self, serialized: dict | None, prompts: list, **kwargs) -> None:
        if self.verbose:
            model = (
                (serialized.get("name") or serialized.get("id", ["LLM"])[-1])
                if serialized
                else "LLM"
            )
            self._print(f"[{self._timestamp()}] {model} thinking...", "blue")


callback_handler = AgentActivityHandler(verbose=True)

In [None]:
COORDINATOR_PROMPT = """
╔══════════════════════════════════════════════════════════════════════════════╗
║  MANDATORY: task() REQUIRES description PARAMETER - NEVER OMIT IT!           ║
║                                                                              ║
║  task(subagent_type="X", description="Y")  ← BOTH parameters REQUIRED        ║
╚══════════════════════════════════════════════════════════════════════════════╝

╔══════════════════════════════════════════════════════════════════════════════╗
║  YOU MUST ACTUALLY RUN EXPERIMENTS - NOT JUST ANALYZE OR DOCUMENT!           ║
║                                                                              ║
║  DO NOT write reports about "what you would do"                              ║
║  DO NOT say "ready for execution" - EXECUTE IT!                              ║
║  DO NOT create documentation files - SUBMIT JOBS!                            ║
╚══════════════════════════════════════════════════════════════════════════════╝

You are the Quantum Volume Finder. Your job is to EXECUTE experiments and report ACTUAL results.

## REQUIRED WORKFLOW (DO ALL STEPS)

1. Get backend info → **backend-analyst**
2. Find optimal qubits for the depth → **qubit-chain-optimizer**
3. Run experiment on hardware → **qv-experiment-runner** (transpiles, submits, polls, returns job_id)
4. Calculate HOP → call calculate_hop(job_id=<job_id>, depth=N)
5. If HOP above threshold: SUCCESS. If not: try depth-1 (repeat from step 2)

## Subagents — COPY THESE FORMATS EXACTLY

### backend-analyst
```
task(subagent_type="backend-analyst", description="Get ibm_boston properties")
```

### qubit-chain-optimizer
```
task(subagent_type="qubit-chain-optimizer", description="Find 5 optimal qubits for QV-5 on ibm_boston")
```

### qv-experiment-runner
Pass depth, backend, and initial_layout (qubits). The subagent transpiles the QV circuit,
submits it to hardware, polls for completion, and returns the job_id.
```
task(subagent_type="qv-experiment-runner", description="Run QV experiment: depth=5, backend_name=ibm_boston, initial_layout=[47, 57, 66, 67, 68]")
```

## HOP Calculation

After the experiment runner returns the job_id:
1. Call calculate_hop(job_id=<job_id from runner>, depth=N)
   This fetches the job results and looks up heavy outputs automatically.
2. Check if above_threshold is true

## Critical Rules

1. DO NOT make recommendations - RUN the experiments
2. DO NOT stop after one failure - try lower depths
3. DO NOT limit qubit search - use all qubits on the backend
4. ALWAYS report actual job ID, measurement counts, and HOP
"""

COORDINATOR_MULTI_CIRCUIT_APPENDIX = """

## Multi-Circuit QV Mode (Full Protocol)

You have the run_qv_depth_trial tool for statistically rigorous QV testing.
This runs N independent random circuits per depth and performs the full statistical
confidence interval test (arXiv:1811.12926).

### Workflow Change
- Steps 1-2 are the same (backend analysis, find optimal qubits)
- Step 3: Instead of qv-experiment-runner, call:
  run_qv_depth_trial(depth=N, backend_name=<backend>, initial_layout=[q1, q2, ...], num_circuits=<N>, shots=4096)
  This tool handles everything internally: generates N random circuits, transpiles each,
  submits each to hardware, polls for completion, computes all HOPs, and runs the
  statistical CI test. It prints progress as it works.
- Step 4: Check qv_achieved in the result (replaces manual HOP check)
  - qv_achieved=true means the lower bound of the 97.5% CI exceeds 2/3 — QV is officially achieved
  - qv_achieved=false means it failed the statistical test — try depth-1
- Step 5: If not achieved, try depth-1 (find new optimal qubits first)

### DO NOT use qv-experiment-runner in multi-circuit mode — use run_qv_depth_trial instead.

### Reading Results
- qv_achieved: true/false (the CI test result)
- mean_hop: average HOP across all circuits
- ci_lower: lower bound of 97.5% confidence interval (must be > 2/3 for QV)
- num_successful: how many circuits completed successfully
- individual_hops: list of all HOP values
- message: human-readable summary
"""

BACKEND_ANALYST_PROMPT = """You are the Backend Analyst, an expert in IBM Quantum hardware.

Your role is to:
1. List all available quantum backends for the user's account
2. Get detailed properties for promising backends
3. Identify backends suitable for Quantum Volume experiments
4. Report on current queue status and availability

When analyzing backends, focus on:
- Number of qubits (need at least the target QV depth)
- Quantum volume already achieved
- Overall system status
- Queue length (prefer less busy systems)

Use the IBM Runtime MCP tools to gather this information. Report your findings
in a structured format that the coordinator can use for decision-making.
"""

QUBIT_CHAIN_PROMPT = """You are the Qubit Chain Optimizer, finding optimal qubits for QV experiments.

## Primary Tool: find_optimal_qv_qubits_tool

This tool searches the ENTIRE backend (all 127+ qubits) to find the best subgraphs.
It does NOT limit to just the first 10 qubits - it analyzes ALL qubits.

### Usage
```
find_optimal_qv_qubits_tool(
    backend_name="ibm_brisbane",
    num_qubits=5,       # QV depth
    num_results=10,     # Get 10 candidates to try
    metric="qv_optimized"
)
```

### Important Parameters
- **num_qubits**: The QV depth (e.g., 5 for QV-32)
- **num_results**: Request at least 10 results to have fallback options
- **metric**: Use "qv_optimized" for best QV performance

Return the top 10 qubit subsets with their scores. The coordinator will use
these to run QV experiments, potentially trying multiple if the first fails.
"""

QV_EXPERIMENT_RUNNER_PROMPT = """You run QV experiments on hardware.

Your task description contains: depth, backend_name, and initial_layout (qubits).

## WORKFLOW — Follow these steps IN ORDER:

### STEP 1: Transpile the QV circuit
```
transpile_qv_circuit(depth=<depth>, backend_name=<backend>, optimization_level=3, initial_layout=<qubits>)
```
This generates the QV circuit and transpiles it. The result shows transpilation metrics.

### STEP 2: Submit to hardware
```
submit_qv_job(depth=<depth>, backend_name=<backend>, shots=4096)
```
This submits the transpiled circuit. Returns a job_id.

### STEP 3: Wait for completion
Poll get_job_status_tool(job_id=<id>) every call until job_status is "DONE".

### STEP 4: Report back
Return ALL of: backend, depth, qubits, job_id, shots.
The coordinator will fetch results and calculate HOP using the job_id.
"""

In [None]:
def get_mcp_config():
    """MCP config for QV experiments.

    Note: Only qiskit-ibm-runtime and qiskit-ibm-transpiler are included.
    qiskit-mcp-server is excluded because hybrid_ai_transpile_tool from
    qiskit-ibm-transpiler accepts backend_name directly (simpler for agents).
    """
    return {
        "qiskit-ibm-runtime": {
            "transport": "stdio",
            "command": "qiskit-ibm-runtime-mcp-server",
            "args": [],
            "env": {
                "QISKIT_IBM_TOKEN": os.getenv("QISKIT_IBM_TOKEN", ""),
                "QISKIT_IBM_RUNTIME_MCP_INSTANCE": os.getenv("QISKIT_IBM_RUNTIME_MCP_INSTANCE", ""),
            },
        },
        "qiskit-ibm-transpiler": {
            "transport": "stdio",
            "command": "qiskit-ibm-transpiler-mcp-server",
            "args": [],
            "env": {"QISKIT_IBM_TOKEN": os.getenv("QISKIT_IBM_TOKEN", "")},
        },
    }


def generate_qv_circuit_with_ideal_distribution(
    num_qubits: int,
    depth: int | None = None,
    seed: int | None = None,
) -> dict[str, Any]:
    """Generate a QV circuit and compute its ideal heavy output bitstrings."""
    import logging

    import numpy as np
    from qiskit import QuantumCircuit
    from qiskit.circuit.library import quantum_volume
    from qiskit.qasm3 import dumps
    from qiskit.quantum_info import Statevector

    logger = logging.getLogger(__name__)

    try:
        if num_qubits < 2:
            num_qubits = 2
        elif num_qubits > 20:
            logger.warning(
                f"QV with {num_qubits} qubits will be slow to simulate. "
                "Consider using <= 20 qubits."
            )

        if depth is None:
            depth = num_qubits
        elif depth < 1:
            depth = 1
        elif depth > num_qubits:
            depth = num_qubits

        if seed is None:
            seed = np.random.randint(0, 2**31)

        qv_circuit = quantum_volume(num_qubits, depth=depth, seed=seed)
        qv_decomposed = qv_circuit.decompose()

        statevector = Statevector.from_label("0" * num_qubits)
        final_state = statevector.evolve(qv_decomposed)
        probabilities = final_state.probabilities()

        ideal_probs = {}
        for i, prob in enumerate(probabilities):
            # Format as bitstring matching Qiskit convention (qubit 0 = rightmost)
            bitstring = format(i, f"0{num_qubits}b")
            ideal_probs[bitstring] = prob

        median_prob = float(np.median(probabilities))
        heavy_outputs = [bs for bs, prob in ideal_probs.items() if prob > median_prob]

        qv_with_meas = QuantumCircuit(num_qubits, num_qubits)
        qv_with_meas.compose(qv_decomposed, inplace=True)
        qv_with_meas.measure(range(num_qubits), range(num_qubits))
        qasm3_circuit = dumps(qv_with_meas)

        result = {
            "status": "success",
            "circuit_qasm": qasm3_circuit,
            "num_qubits": num_qubits,
            "depth": depth,
            "seed": seed,
            "heavy_outputs": heavy_outputs,
            "num_heavy_outputs": len(heavy_outputs),
            "median_probability": median_prob,
            "message": f"Generated QV-{num_qubits} circuit with {len(heavy_outputs)} heavy outputs",
        }

        if num_qubits <= 6:
            result["ideal_probabilities"] = ideal_probs

        return result

    except Exception as e:
        logger.error(f"Failed to generate QV circuit: {e}")
        return {"status": "error", "message": f"Failed to generate QV circuit: {e!s}"}


def calculate_heavy_output_probability(
    counts: dict[str, int],
    heavy_outputs: list[str],
) -> dict[str, Any]:
    """Calculate the Heavy Output Probability (HOP) for QV validation."""
    import logging

    logger = logging.getLogger(__name__)

    try:
        if not counts:
            return {"status": "error", "message": "No counts provided"}

        if not heavy_outputs:
            return {"status": "error", "message": "No heavy outputs provided"}

        heavy_set = set(heavy_outputs)
        total_shots = sum(counts.values())
        heavy_counts = sum(count for bitstring, count in counts.items() if bitstring in heavy_set)

        hop = heavy_counts / total_shots if total_shots > 0 else 0.0
        threshold = 2 / 3
        above_threshold = hop > threshold

        return {
            "status": "success",
            "heavy_output_probability": hop,
            "total_shots": total_shots,
            "heavy_counts": heavy_counts,
            "num_heavy_bitstrings": len(heavy_outputs),
            "threshold": threshold,
            "above_threshold": above_threshold,
            "message": f"HOP = {hop:.4f} ({'above' if above_threshold else 'below'} threshold of {threshold:.4f})",
        }

    except Exception as e:
        logger.error(f"Failed to calculate HOP: {e}")
        return {"status": "error", "message": f"Failed to calculate HOP: {e!s}"}


def analyze_qv_experiment_results(
    hop_values: list[float],
    confidence_level: float = 0.975,
) -> dict[str, Any]:
    """Statistical analysis of multiple QV circuit runs (full protocol).

    Used by run_qv_depth_trial in multi-circuit mode. Computes mean HOP,
    t-distribution CI, and determines if QV is achieved per arXiv:1811.12926.
    """
    import logging

    import numpy as np
    from scipy import stats

    logger = logging.getLogger(__name__)

    try:
        if not hop_values:
            return {"status": "error", "message": "No HOP values provided"}

        hop_array = np.array(hop_values)
        n = len(hop_array)

        if n < 10:
            logger.warning(f"Only {n} HOP values. Recommend at least 100 for statistical significance.")

        mean_hop = float(np.mean(hop_array))
        std_hop = float(np.std(hop_array, ddof=1))
        sem = std_hop / np.sqrt(n)

        t_critical = stats.t.ppf(confidence_level, df=n - 1)
        ci_lower = mean_hop - t_critical * sem
        ci_upper = mean_hop + t_critical * sem

        threshold = 2 / 3
        qv_achieved = bool(ci_lower > threshold)
        margin = float(ci_lower - threshold)

        if qv_achieved:
            message = (
                f"QV ACHIEVED! Mean HOP = {mean_hop:.4f}, "
                f"CI lower bound = {ci_lower:.4f} > threshold {threshold:.4f}"
            )
        else:
            message = (
                f"QV NOT achieved. Mean HOP = {mean_hop:.4f}, "
                f"CI lower bound = {ci_lower:.4f} <= threshold {threshold:.4f}"
            )

        return {
            "status": "success",
            "qv_achieved": qv_achieved,
            "mean_hop": mean_hop,
            "std_hop": std_hop,
            "standard_error": sem,
            "confidence_interval": (ci_lower, ci_upper),
            "confidence_level": confidence_level,
            "num_circuits": n,
            "threshold": threshold,
            "margin": margin,
            "message": message,
        }

    except ImportError as e:
        return {"status": "error", "message": f"Missing scipy for statistical analysis: {e!s}"}
    except Exception as e:
        return {"status": "error", "message": f"Failed to analyze QV results: {e!s}"}

In [None]:
import asyncio

# Shared cache for QV circuit data — populated lazily by tools
qv_data: dict[int, dict] = {}


async def create_agent(model: str = "claude-sonnet-4-20250514", num_circuits: int = 1):
    """Create the QV Optimizer agent with local tools that wrap MCP calls.

    Args:
        model: Anthropic model to use
        num_circuits: Number of independent QV circuits per depth (1=quick, 100+=full protocol)
    """
    from langchain_core.tools import tool as langchain_tool

    mcp_config = get_mcp_config()
    mcp_client = MultiServerMCPClient(mcp_config)

    # Load MCP tools from each server
    server_tools = {}
    for name in mcp_config:
        try:
            tools = await mcp_client.get_tools(server_name=name)
            server_tools[name] = tools
            print(f"{name}: {len(tools)} tools")
        except Exception as e:
            print(f"{name}: FAILED - {e}")

    print(f"\nTotal MCP tools loaded: {sum(len(t) for t in server_tools.values())}")

    llm = ChatAnthropic(model=model, temperature=0, max_tokens=8192)

    # Find MCP tools for programmatic use (avoids passing large data through LLM)
    transpiler_tools = server_tools.get("qiskit-ibm-transpiler", [])
    runtime_tools = server_tools.get("qiskit-ibm-runtime", [])
    hybrid_transpile_mcp = next(
        (t for t in transpiler_tools if t.name == "hybrid_ai_transpile_tool"), None
    )
    run_sampler_mcp = next(
        (t for t in runtime_tools if t.name == "run_sampler_tool"), None
    )
    get_job_results_mcp = next(
        (t for t in runtime_tools if t.name == "get_job_results_tool"), None
    )
    get_job_status_mcp = next(
        (t for t in runtime_tools if t.name == "get_job_status_tool"), None
    )

    def _parse_mcp_result(result) -> dict:
        """Parse the result from a programmatic MCP tool call into a dict."""
        if isinstance(result, list):
            text = result[0]["text"] if result else "{}"
            return json.loads(text)
        if isinstance(result, str):
            return json.loads(result)
        return result

    def _ensure_qv_data(depth: int) -> dict | None:
        """Generate QV circuit data lazily. Returns error dict or None."""
        if depth < 2 or depth > 20:
            return {"status": "error", "message": f"Depth must be between 2 and 20, got {depth}"}
        if depth not in qv_data:
            result = generate_qv_circuit_with_ideal_distribution(depth, seed=42 + depth)
            if result["status"] != "success":
                return result
            qv_data[depth] = result
        return None

    # --- Local tools that wrap MCP calls (avoids large data through LLM) ---

    @langchain_tool
    async def transpile_qv_circuit(
        depth: int,
        backend_name: str,
        optimization_level: int = 3,
        initial_layout: list[int] | None = None,
    ) -> dict:
        """Transpile a QV circuit using the AI transpiler.

        Generates the QV circuit on demand and transpiles it via the MCP transpiler.
        The transpiled QPY is stored internally — call submit_qv_job next.

        Args:
            depth: The QV depth (e.g., 5 for QV-32, 11 for QV-2048)
            backend_name: IBM backend name (e.g., 'ibm_boston')
            optimization_level: Optimization level 1-3 (default: 3)
            initial_layout: Physical qubits to map virtual qubits to
        """
        error = _ensure_qv_data(depth)
        if error:
            return error

        if hybrid_transpile_mcp is None:
            return {"status": "error", "message": "hybrid_ai_transpile_tool not available"}

        circuit_qasm = qv_data[depth]["circuit_qasm"]

        result = await hybrid_transpile_mcp.ainvoke({
            "circuit": circuit_qasm,
            "backend_name": backend_name,
            "optimization_level": optimization_level,
            "initial_layout": initial_layout,
        })

        result = _parse_mcp_result(result)

        if result.get("status") == "success":
            qv_data[depth]["circuit_qpy"] = result["circuit_qpy"]
            return {
                "status": "success",
                "depth": depth,
                "backend_name": backend_name,
                "original_circuit": result.get("original_circuit"),
                "optimized_circuit": result.get("optimized_circuit"),
                "improvements": result.get("improvements"),
                "message": "Transpiled successfully. Call submit_qv_job next.",
            }
        return result

    @langchain_tool
    async def submit_qv_job(
        depth: int,
        backend_name: str,
        shots: int = 4096,
    ) -> dict:
        """Submit a transpiled QV circuit to hardware via the sampler.

        Uses the QPY stored by transpile_qv_circuit — must be called after it.

        Args:
            depth: The QV depth (must have been transpiled first)
            backend_name: IBM backend name
            shots: Number of measurement shots (default: 4096)
        """
        if depth not in qv_data or "circuit_qpy" not in qv_data.get(depth, {}):
            return {
                "status": "error",
                "message": f"No transpiled circuit for depth {depth}. Call transpile_qv_circuit first.",
            }

        if run_sampler_mcp is None:
            return {"status": "error", "message": "run_sampler_tool not available"}

        circuit_qpy = qv_data[depth]["circuit_qpy"]

        result = await run_sampler_mcp.ainvoke({
            "circuit": circuit_qpy,
            "backend_name": backend_name,
            "shots": shots,
        })

        return _parse_mcp_result(result)

    @langchain_tool
    async def calculate_hop(job_id: str, depth: int) -> dict:
        """Calculate Heavy Output Probability (HOP) for QV validation.

        Fetches job results automatically — just pass the job_id and depth.

        Args:
            job_id: The job ID from submit_qv_job (e.g., "d668ng8qbmes739dsh90")
            depth: The QV depth — used to look up heavy outputs automatically

        Returns:
            Dictionary with heavy_output_probability, above_threshold, and message
        """
        error = _ensure_qv_data(depth)
        if error:
            return error

        if get_job_results_mcp is None:
            return {"status": "error", "message": "get_job_results_tool not available"}

        result = await get_job_results_mcp.ainvoke({"job_id": job_id})
        result = _parse_mcp_result(result)
        if result.get("status") != "success":
            return result

        counts = result["counts"]
        heavy_outputs = qv_data[depth]["heavy_outputs"]
        return calculate_heavy_output_probability(counts, heavy_outputs)

    @langchain_tool
    async def run_qv_depth_trial(
        depth: int,
        backend_name: str,
        initial_layout: list[int],
        num_circuits: int = 100,
        shots: int = 4096,
    ) -> dict:
        """Run a full QV trial: N independent circuits at one depth with statistical analysis.

        This is the batch tool for the full QV protocol. It generates N random QV circuits
        (each with a different seed), transpiles, submits, polls, computes HOPs, and runs
        the statistical confidence interval test.

        Args:
            depth: The QV depth to test (e.g., 5 for QV-32)
            backend_name: IBM backend name (e.g., 'ibm_boston')
            initial_layout: Physical qubits to map virtual qubits to
            num_circuits: Number of independent random circuits (default: 100)
            shots: Number of measurement shots per circuit (default: 4096)

        Returns:
            Dictionary with qv_achieved, mean_hop, confidence interval, and per-circuit details
        """
        if hybrid_transpile_mcp is None or run_sampler_mcp is None:
            return {"status": "error", "message": "Required MCP tools not available"}
        if get_job_results_mcp is None or get_job_status_mcp is None:
            return {"status": "error", "message": "Required MCP tools not available"}

        print(f"\n[QV Trial] Starting full QV trial: depth={depth}, {num_circuits} circuits, "
              f"backend={backend_name}, qubits={initial_layout}")

        # Phase 1: Generate N circuits with different seeds
        circuits = []
        for i in range(num_circuits):
            seed = depth * 1000 + i
            result = generate_qv_circuit_with_ideal_distribution(depth, seed=seed)
            if result["status"] != "success":
                return {"status": "error", "message": f"Failed to generate circuit {i}: {result['message']}"}
            circuits.append({
                "circuit_qasm": result["circuit_qasm"],
                "heavy_outputs": result["heavy_outputs"],
                "seed": seed,
            })
        print(f"[QV Trial] Generated {num_circuits} circuits")

        # Phase 2: Transpile each circuit via MCP (sequential — stdio transport)
        for i, circ in enumerate(circuits):
            result = await hybrid_transpile_mcp.ainvoke({
                "circuit": circ["circuit_qasm"],
                "backend_name": backend_name,
                "optimization_level": 3,
                "initial_layout": initial_layout,
            })
            result = _parse_mcp_result(result)
            if result.get("status") != "success":
                return {"status": "error", "message": f"Failed to transpile circuit {i}: {result.get('message', 'unknown error')}"}
            circ["circuit_qpy"] = result["circuit_qpy"]
            if (i + 1) % 10 == 0 or i == num_circuits - 1:
                print(f"[QV Trial] Transpiled {i + 1}/{num_circuits} circuits")

        # Phase 3: Submit each circuit via MCP (sequential)
        job_ids = []
        for i, circ in enumerate(circuits):
            result = await run_sampler_mcp.ainvoke({
                "circuit": circ["circuit_qpy"],
                "backend_name": backend_name,
                "shots": shots,
            })
            result = _parse_mcp_result(result)
            if result.get("status") != "success":
                return {"status": "error", "message": f"Failed to submit circuit {i}: {result.get('message', 'unknown error')}"}
            job_ids.append(result["job_id"])
            if (i + 1) % 10 == 0 or i == num_circuits - 1:
                print(f"[QV Trial] Submitted {i + 1}/{num_circuits} jobs")

        # Phase 4: Poll all jobs until completion
        pending = set(range(num_circuits))
        failed_jobs = {}
        while pending:
            await asyncio.sleep(10)
            still_pending = set()
            for idx in pending:
                result = await get_job_status_mcp.ainvoke({"job_id": job_ids[idx]})
                result = _parse_mcp_result(result)
                job_status = result.get("job_status", "UNKNOWN")
                if job_status == "DONE":
                    continue
                elif job_status in ("ERROR", "CANCELLED"):
                    failed_jobs[idx] = result.get("error_message", job_status)
                else:
                    still_pending.add(idx)
            pending = still_pending
            done_count = num_circuits - len(pending) - len(failed_jobs)
            print(f"[QV Trial] depth={depth}: {done_count}/{num_circuits} jobs done, "
                  f"{len(pending)} pending, {len(failed_jobs)} failed")

        # Phase 5: Get results and compute HOPs
        hop_values = []
        circuit_results = []
        for i in range(num_circuits):
            if i in failed_jobs:
                circuit_results.append({
                    "circuit_index": i, "seed": circuits[i]["seed"],
                    "job_id": job_ids[i], "status": "failed",
                    "error": failed_jobs[i],
                })
                continue

            result = await get_job_results_mcp.ainvoke({"job_id": job_ids[i]})
            result = _parse_mcp_result(result)
            if result.get("status") != "success":
                circuit_results.append({
                    "circuit_index": i, "seed": circuits[i]["seed"],
                    "job_id": job_ids[i], "status": "error",
                    "error": result.get("message", "Failed to get results"),
                })
                continue

            counts = result["counts"]
            hop_result = calculate_heavy_output_probability(counts, circuits[i]["heavy_outputs"])
            hop = hop_result.get("heavy_output_probability", 0.0)
            hop_values.append(hop)
            circuit_results.append({
                "circuit_index": i, "seed": circuits[i]["seed"],
                "job_id": job_ids[i], "status": "success",
                "hop": hop, "above_threshold": hop_result.get("above_threshold", False),
            })

        if not hop_values:
            return {"status": "error", "message": "All circuits failed — no HOP values collected"}

        # Phase 6: Statistical analysis
        analysis = analyze_qv_experiment_results(hop_values)

        print(f"[QV Trial] depth={depth}: mean_hop={analysis.get('mean_hop', 0):.4f}, "
              f"ci_lower={analysis.get('confidence_interval', (0, 0))[0]:.4f}, "
              f"qv_achieved={analysis.get('qv_achieved', False)}")

        return {
            "status": "success",
            "depth": depth,
            "num_circuits": num_circuits,
            "num_successful": len(hop_values),
            "num_failed": len(failed_jobs),
            "qv_achieved": analysis.get("qv_achieved", False),
            "mean_hop": analysis.get("mean_hop", 0.0),
            "std_hop": analysis.get("std_hop", 0.0),
            "ci_lower": analysis.get("confidence_interval", (0, 0))[0],
            "ci_upper": analysis.get("confidence_interval", (0, 0))[1],
            "confidence_level": analysis.get("confidence_level", 0.975),
            "threshold": 2 / 3,
            "individual_hops": hop_values,
            "message": analysis.get("message", ""),
        }

    # --- Subagent definitions ---

    backend_analyst = {
        "name": "backend-analyst",
        "description": "Expert in IBM Quantum backends. Use this agent to list backends, get properties, find least busy systems, and analyze hardware capabilities.",
        "system_prompt": BACKEND_ANALYST_PROMPT,
        "tools": server_tools.get("qiskit-ibm-runtime", []),
    }

    qubit_chain_optimizer = {
        "name": "qubit-chain-optimizer",
        "description": "Expert in qubit topology analysis. Use this agent to find optimal qubit subsets for QV experiments using algorithmic chain/subgraph finding tools.",
        "system_prompt": QUBIT_CHAIN_PROMPT,
        "tools": server_tools.get("qiskit-ibm-runtime", []),
    }

    # Runner: local tools + MCP get_job_status_tool for polling
    runner_mcp_tools = [t for t in runtime_tools if t.name == "get_job_status_tool"]
    qv_experiment_runner = {
        "name": "qv-experiment-runner",
        "description": "Expert in running QV experiments on hardware. Use this agent to transpile circuits, submit jobs, and retrieve results.",
        "system_prompt": QV_EXPERIMENT_RUNNER_PROMPT,
        "tools": runner_mcp_tools + [transpile_qv_circuit, submit_qv_job],
    }

    # Coordinator: runtime tools (minus execution/results) + calculate_hop
    excluded_tools = {"run_sampler_tool", "run_estimator_tool", "get_job_results_tool"}
    coordinator_tools = [
        tool for tool in server_tools.get("qiskit-ibm-runtime", [])
        if tool.name not in excluded_tools
    ]
    coordinator_tools.append(calculate_hop)

    # Multi-circuit mode: add batch tool and extended prompt
    system_prompt = COORDINATOR_PROMPT
    if num_circuits > 1:
        coordinator_tools.append(run_qv_depth_trial)
        system_prompt = COORDINATOR_PROMPT + COORDINATOR_MULTI_CIRCUIT_APPENDIX

    print(f"\nMode: {'multi-circuit (' + str(num_circuits) + ' circuits/depth)' if num_circuits > 1 else 'single-circuit'}")
    print(f"Coordinator tools: {len(coordinator_tools)} (excluded: {excluded_tools})")
    print(f"Runner tools: {len(runner_mcp_tools)} MCP + transpile_qv_circuit + submit_qv_job")

    agent = create_deep_agent(
        model=llm,
        tools=coordinator_tools,
        system_prompt=system_prompt,
        subagents=[backend_analyst, qubit_chain_optimizer, qv_experiment_runner],
    )

    print("Agent ready!")
    return agent


# Configuration — change NUM_CIRCUITS to 100+ for full QV protocol
NUM_CIRCUITS = 1  # 1 = quick test, 100+ = full protocol

agent = await create_agent(num_circuits=NUM_CIRCUITS)

In [None]:
# Configuration
BACKEND = "ibm_brisbane"  # Change to your preferred backend
MAX_DEPTH = 5  # Maximum QV depth to try (starts here, works down)

# Build the request — circuits are generated lazily when the agent requests them
backend_section = f"""
## Step 1: Backend
Use backend: **{BACKEND}**
Get its properties to confirm it's available."""

if NUM_CIRCUITS > 1:
    # Multi-circuit mode: full QV protocol
    request = f"""
# FIND THE HIGHEST ACHIEVABLE QUANTUM VOLUME (Full Protocol — {NUM_CIRCUITS} circuits/depth)

Your task: Find the highest QV this backend can achieve using the full QV protocol
with {NUM_CIRCUITS} independent random circuits per depth and statistical CI testing.

{backend_section}

## Step 2: Find Optimal Qubits
Use qubit-chain-optimizer with find_optimal_qv_qubits_tool(num_qubits=N) for the depth you're testing.
Get 10 candidate qubit subsets. The tool searches ALL qubits on the backend.

## Step 3: Run Iterative QV Experiments (TOP-DOWN)

Supported depths: 2 through {MAX_DEPTH} (QV {2**2} through QV {2**MAX_DEPTH}).

Start from depth {MAX_DEPTH} and work DOWN:

### For each depth:
1. Find optimal qubits: task(subagent_type="qubit-chain-optimizer", description="Find N optimal qubits for QV-N on {BACKEND}")
2. Run full QV trial:
   run_qv_depth_trial(depth=N, backend_name={BACKEND}, initial_layout=[q1, q2, ...], num_circuits={NUM_CIRCUITS}, shots=4096)
   This tool runs all {NUM_CIRCUITS} circuits programmatically and returns the statistical result.
3. Check qv_achieved in the result:
   - If qv_achieved is true → SUCCESS! QV 2^N is statistically achieved
   - If qv_achieved is false → try depth N-1

### IMPORTANT:
- Use run_qv_depth_trial (NOT qv-experiment-runner) for each depth
- The tool handles transpile, submit, poll, HOP, and CI test internally
- You MUST try lower depths if higher ones fail
- STOP when you find a passing depth or reach depth 2
"""
else:
    # Single-circuit mode: quick test
    request = f"""
# FIND THE HIGHEST ACHIEVABLE QUANTUM VOLUME

Your task: Find the highest QV this backend can achieve by running experiments.

{backend_section}

## Step 2: Find Optimal Qubits
Use qubit-chain-optimizer with find_optimal_qv_qubits_tool(num_qubits=N) for the depth you're testing.
Get 10 candidate qubit subsets. The tool searches ALL qubits on the backend.

## Step 3: Run Iterative QV Experiments (TOP-DOWN)

Supported depths: 2 through {MAX_DEPTH} (QV {2**2} through QV {2**MAX_DEPTH}).

Start from depth {MAX_DEPTH} and work DOWN:

### For each depth:
1. Find optimal qubits: task(subagent_type="qubit-chain-optimizer", description="Find N optimal qubits for QV-N on {BACKEND}")
2. Run experiment: task(subagent_type="qv-experiment-runner", description="Run QV experiment: depth=N, backend_name={BACKEND}, initial_layout=[q1, q2, ...]")
   The runner handles: transpile -> submit -> poll. Returns job_id.
3. Calculate HOP: calculate_hop(job_id=<job_id from runner>, depth=N)
4. If above_threshold is true -> SUCCESS! QV 2^N achieved
5. If above_threshold is false -> try depth N-1

### IMPORTANT:
- You MUST call qv-experiment-runner for EACH depth you test
- You MUST call calculate_hop to evaluate results
- You MUST try lower depths if higher ones fail
- STOP when you find a passing depth or reach depth 2
"""

print(f"Starting QV finder for {BACKEND}, max depth {MAX_DEPTH}")
if NUM_CIRCUITS > 1:
    print(f"Protocol: Full QV ({NUM_CIRCUITS} circuits/depth, 97.5% CI test)")
else:
    print("Protocol: Single-circuit quick test")
print("=" * 70)

result = await agent.ainvoke(
    {"messages": [{"role": "user", "content": request}]},
    config={"callbacks": [callback_handler]},
)

print("\n" + "=" * 70)
print("QV FINDING COMPLETE")
print("=" * 70)
print(result.get("messages", [])[-1].content if result.get("messages") else "No response")

In [None]:
# Interactive follow-up with activity logging
# Type 'verbose' to toggle detailed logging, 'quit' to exit
print("Commands: 'quit' to exit, 'verbose' to toggle activity logging")

while True:
    query = input("You: ").strip()
    if query.lower() in ["quit", "exit", "q"]:
        break
    if not query:
        continue
    if query.lower() == "verbose":
        callback_handler.verbose = not callback_handler.verbose
        print(f"Verbose logging is now {'ON' if callback_handler.verbose else 'OFF'}\n")
        continue

    result = await agent.ainvoke(
        {"messages": [{"role": "user", "content": query}]},
        config={"callbacks": [callback_handler]},
    )
    print(
        f"\nAssistant: {result.get('messages', [])[-1].content if result.get('messages') else 'No response'}\n"
    )