In [None]:
from qiskit import QuantumCircuit
from qiskit.quantum_info import Statevector

from qiskit import QuantumCircuit

def ghz_nk_min_cuts(n: int, k: int) -> QuantumCircuit:
    """
    Create a true GHZ state on N = n*k qubits using k blocks ("partitions") of size n,
    with only (k-1) cross-block entangling gates.

    Blocks:
      block b occupies qubits [b*n .. b*n + (n-1)]

    Construction:
      1) Build GHZ_n on block 0
      2) For each block b=1..k-1:
           - Bridge: cx(0, b*n)          (one cross-block gate)
           - Grow locally inside block: cx(b*n, b*n+1), cx(b*n+1, b*n+2), ...

    This produces |GHZ_{n*k}> = (|0...0> + |1...1>)/sqrt(2) (up to a global phase).
    """
    if n < 2:
        raise ValueError("n must be >= 2")
    if k < 1:
        raise ValueError("k must be >= 1")

    N = n * k
    qc = QuantumCircuit(N)

    # --- Block 0: GHZ_n ---
    qc.h(0)
    for i in range(n - 1):
        qc.cx(i, i + 1)

    # --- Additional blocks ---
    for b in range(1, k):
        start = b * n

        # Bridge (only cross-block entangling gate for this block)
        qc.cx(0, start)

        # Grow GHZ locally within the block
        for q in range(start, start + n - 1):
            qc.cx(q, q + 1)

    return qc



# quick sanity check
n = 10
k = 2
N=n*k
qc = ghz_nk_min_cuts(n,k)

In [None]:
import sys
sys.path.append('../')
from iqm.iqm_client import CircuitCompilationOptions
from iqm.iqm_client.models import DDMode, STANDARD_DD_STRATEGY, HeraldingMode, MoveGateValidationMode, MoveGateFrameTrackingMode
from __future__ import annotations
from layout import build_initial_layout_list

import numpy as np
from typing import Dict, Optional, Sequence, Tuple, Callable, Any, Union, List
from zne_entanglement_iqm import run_on_iqm_hardware


from qiskit import QuantumCircuit
from qiskit.quantum_info import PauliList

from qiskit_addon_cutting import (
    partition_problem,
    generate_cutting_experiments,
    reconstruct_expectation_values,
)
from iqm.iqm_client import CircuitCompilationOptions
from iqm.iqm_client.models import DDMode, STANDARD_DD_STRATEGY, HeraldingMode

# Qiskit locations moved around across versions; this is a robust import pattern.
try:
    from qiskit.primitives import SamplerResult  # Sampler V1 result container
except Exception as e:
    raise ImportError("Could not import qiskit.primitives.SamplerResult. Check your Qiskit version.") from e

try:
    from qiskit.result import QuasiDistribution
except Exception:
    # fallback path in some versions
    from qiskit.result.distributions import QuasiDistribution  # type: ignore


from qiskit.result import QuasiDistribution

def _bitstring_key_to_int(key: str) -> int:
    """
    Convert various Qiskit-style count keys to an int:
      - '0101'
      - '0b0101'
      - '0xA'
      - '01 10 001'  (multiple classical registers)
      - '01_10_001'
    """
    if not isinstance(key, str):
        raise TypeError(f"Count key must be str, got {type(key)}")

    s = key.strip()

    # remove separators used by multiple cregs or formatting
    s = s.replace(" ", "").replace("_", "")

    if s.startswith("0x"):
        return int(s, 16)
    if s.startswith("0b"):
        return int(s, 2)

    # now should be plain binary
    if not set(s).issubset({"0", "1"}):

        raise ValueError(f"Unrecognized count key format: {key!r} -> {s!r}")
    return int(s, 2)


def _counts_to_quasi_dist(counts: dict) -> QuasiDistribution:
    """
    Convert counts dict (possibly with spaced bitstrings) to a QuasiDistribution with int keys.
    """
    shots = sum(counts.values())
    if shots == 0:
        return QuasiDistribution({})

    probs_int_keys = {}
    for k, v in counts.items():
        ki = _bitstring_key_to_int(k) if isinstance(k, str) else int(k)
        probs_int_keys[ki] = probs_int_keys.get(ki, 0.0) + (v / shots)

    return QuasiDistribution(probs_int_keys)



def _iqm_result_to_sampler_result(
    iqm_result,
    circuits: Sequence[QuantumCircuit],
) -> SamplerResult:
    """
    Build a SamplerResult from a backend Result by reading get_counts for each circuit.

    Important: We preserve per-circuit metadata (especially 'num_qpd_bits') if present on the circuit.
    """
    quasi_dists = []
    metadata = []

    for i, qc in enumerate(circuits):
        counts = iqm_result.get_counts(i)  # standard Qiskit Result API
        quasi_dists.append(_counts_to_quasi_dist(counts))

        md = {}
        # The cutting addon expects num_qpd_bits in metadata. We copy it from circuit.metadata if present.
        if hasattr(qc, "metadata") and isinstance(qc.metadata, dict):
            if "num_qpd_bits" in qc.metadata:
                md["num_qpd_bits"] = qc.metadata["num_qpd_bits"]
        metadata.append(md)

    return SamplerResult(quasi_dists=quasi_dists, metadata=metadata)
def cut_and_knit_expectations_iqm(
    circuit: QuantumCircuit,
    partition_labels: str,
    pauli_strings: Sequence[str],
    *,
    # IQM execution hook:
    execute_on_iqm: Callable[..., Any],   # your run_on_iqm_hardware
    execute_kwargs: Dict[str, Any],       # server_url, quantum_computer, token, etc.
    # cutting params:
    shots: int = 4000,
    num_samples: Union[int, float] = np.inf,
    seed: Optional[int] = 7,
    # optional sanity check: verify which 2q gates are being cut
    expected_cut_pairs: Optional[Sequence[Tuple[int, int]]] = None,
) -> Dict[str, float]:
    """
    Same idea as your cut_and_knit_expectations, but executes subexperiments on IQM hardware
    via a provided function (e.g., run_on_iqm_hardware), and converts counts -> SamplerResult.

    execute_on_iqm must accept circuits=... and shots=... (your function does).
    """

    if circuit.num_qubits != len(partition_labels):
        raise ValueError(
            f"partition_labels must have length {circuit.num_qubits}, got {len(partition_labels)}."
        )

    N = circuit.num_qubits
    for p in pauli_strings:
        if len(p) != N:
            raise ValueError(f"Pauli string length must be {N}, got {len(p)} for {p!r}.")

    obs = PauliList(list(pauli_strings))

    # 1) Partition + auto-cut any cross-partition gates
    partitioned = partition_problem(
        circuit=circuit,
        partition_labels=partition_labels,
        observables=obs,
    )

    subcircuits = partitioned.subcircuits
    subobservables = partitioned.subobservables

    # Optional: verify actual cut edges induced by partition_labels
    if expected_cut_pairs is not None:
        expected = {tuple(sorted(pair)) for pair in expected_cut_pairs}
        actual = set()
        for inst, qargs, _ in circuit.data:
            if len(qargs) == 2:
                q0 = circuit.find_bit(qargs[0]).index
                q1 = circuit.find_bit(qargs[1]).index
                if partition_labels[q0] != partition_labels[q1]:
                    actual.add(tuple(sorted((q0, q1))))
        if actual != expected:
            raise ValueError(
                f"Cut pairs mismatch.\nExpected: {sorted(expected)}\nActual:   {sorted(actual)}"
            )

    # 2) Generate subexperiments + coefficients
    subexperiments, coefficients = generate_cutting_experiments(
        circuits=subcircuits,
        observables=subobservables,
        num_samples=num_samples,
    )

    # 3) Execute subexperiments on IQM, per partition label
    # reconstruct_expectation_values expects a dict[label -> SamplerResult]
    results_for_reconstruct: Dict[str, SamplerResult] = {}

    for label, expts in subexperiments.items():
        # Run a whole partition batch in one IQM job
        # Your function returns (backend, result)
        _, iqm_result = execute_on_iqm(
            circuits=list(expts),
            shots=shots,
            **execute_kwargs,
        )

        sampler_result = _iqm_result_to_sampler_result(iqm_result, list(expts))
        results_for_reconstruct[label] = sampler_result

    # 4) Reconstruct expectation values (one per Pauli string)
    reconstructed = reconstruct_expectation_values(
        results_for_reconstruct,
        coefficients,
        subobservables,
    )

    return {p: float(np.real(val)) for p, val in zip(pauli_strings, reconstructed)}


partition_labels = "A"*n +'B'*n

paulis = (
    ["X"*N] +
    ["I"*i + "ZZ" + "I"*(N-i-2) for i in range(N-1)]
)



cco = CircuitCompilationOptions(
    max_circuit_duration_over_t2=0.0,
    heralding_mode=HeraldingMode.ZEROS,
    move_gate_validation=MoveGateValidationMode.STRICT,
    move_gate_frame_tracking=MoveGateFrameTrackingMode.FULL,
    active_reset_cycles=2,
    dd_mode=DDMode.ENABLED,
    dd_strategy=STANDARD_DD_STRATEGY,
)

initial_layout = build_initial_layout_list(
    num_logical_qubits=n,
    excluded_qubits=["QB4", "QB9"]
)

execute_kwargs = dict(
    server_url="https://resonance.meetiqm.com",
    quantum_computer="emerald",
    token="rkSL242xyAzLHqJQrxHvHs2nREy4AbLE2aQcsA3unuoBnBZTQNV6spN3ItLtE2jJ",
    optimization_level=3,
    seed_transpiler=1,
    cco=cco
    # initial_layout=initial_layout
)


corrs = cut_and_knit_expectations_iqm(
    circuit=qc,
    partition_labels=partition_labels,
    pauli_strings=paulis,
    execute_on_iqm=run_on_iqm_hardware,
    execute_kwargs=execute_kwargs,
    shots=10000,
    num_samples=np.inf,  # exact coefficients
    expected_cut_pairs=[(0, b*n) for b in range(1, k)], 
)

print(corrs)


In [None]:
def ghz_fidelity_lower_bound(corrs, N):
    Sx = corrs["X"*N]
    Sz = sum(
        corrs["I"*i + "ZZ" + "I"*(N-i-2)]
        for i in range(N-1)
    ) / (N-1)
    return 0.5 * (Sx + Sz)

F_lower = ghz_fidelity_lower_bound(corrs, N)
print("F_lower =", F_lower)


In [None]:
no_cutting_circuit = ghz_nk_min_cuts(n=N, k=1)
paulis = (
    ["X"*N] +
    ["I"*i + "ZZ" + "I"*(N-i-2) for i in range(N-1)]
)

corrs = cut_and_knit_expectations_iqm(
    circuit=no_cutting_circuit,
    partition_labels=partition_labels,
    pauli_strings=paulis,
    execute_on_iqm=run_on_iqm_hardware,
    execute_kwargs=execute_kwargs,
    shots=10000,
    num_samples=np.inf,  # exact coefficients
    expected_cut_pairs=None,  # for your minimal-bridge GHZ construction
)

print(corrs)
F_lower = ghz_fidelity_lower_bound(corrs, N)
print("F_lower =", F_lower)

In [None]:
import inspect
from iqm.iqm_client import CircuitCompilationOptions

print(inspect.signature(CircuitCompilationOptions))
