In [3]:
# ----------------------------- imports -----------------------------
import itertools
import random
import pickle
import h5py
import scipy
from scipy.linalg import sqrtm
from os import listdir
import random
import pandas as pd
from itertools import product

import numpy as np
import matplotlib.pyplot as plt
import cvxpy as cp

import torch
import jax
import jax.numpy as jnp

import qutip as qt
from qutip import (
    Qobj, basis, qeye, sigmax, sigmay, sigmaz,
    tensor, ket2dm, fidelity, rand_ket
)
import qutip.qip.operations.gates as qugate
from qutip.qip.operations.gates import rx, ry
from qutip.random_objects import rand_super_bcsz, rand_kraus_map, rand_unitary


class QPT:
    def __init__(
        self,
        N,
        measure_data,
        random_channel=None,
        rotation_order="q0slow_q1fast",
        U_tar=None,
    ):
        """
        N=2 CZ-QPT recommended setting:
          input states: |0>,|1>,|+x>,|+y>
          measurement rotations: [I, Ry(-pi/2), Rx(+pi/2)]
          observables ordering: for each rotation (3^N), for each Z-outcome projector (2^N)

        measure_data layout supported:
          - shape (n_inputs*n_rots, n_outcomes) e.g. (144,4) for N=2
            scanning order: input-major then rotation
            columns: outcomes (00,01,10,11) by default; can reorder via outcome_perm.
        """
        self.N = N
        self.measure_data = np.asarray(measure_data, dtype=float)
        self.random_channel = random_channel
        self.rotation_order = rotation_order
        self.U_tar = U_tar
        self.dim = [[2] * N] * N if N > 1 else [[2], [2]]

        # Pauli basis tensor products: {I,X,Y,Z}^{⊗N}
        pauli = [qeye(2), sigmax(), sigmay(), sigmaz()]
        self.pauli_sys = [tensor(*op) for op in product(pauli, repeat=N)]
        zero, one = basis(2, 0), basis(2, 1)
        # ------------------- input states & rotations -------------------
        # if notes is None:
        #     stat_rot = [
        #         qeye(2),
        #         qugate.rx(-np.pi / 2),
        #         qugate.ry(-np.pi / 2),
        #         qugate.ry(np.pi),
        #     ]
        #     inputst_1q = [ro * zero for ro in stat_rot]
        #     rotation_1q = [qeye(2), qugate.rx(np.pi / 2), qugate.ry(np.pi / 2)]
        # else:
        plus = (zero + one).unit()
        plusy = (zero + 1j * one).unit()
        inputst_1q = [zero, one, plus, plusy]
        rotation_1q = [qeye(2), qugate.ry(-np.pi / 2), qugate.rx(np.pi / 2)]

        self.psi_in_idea = [tensor(*psi) for psi in product(inputst_1q, repeat=N)]
        self.rho_in_idea = [ket2dm(psi) for psi in self.psi_in_idea]

        # tensor-product rotations (3^N)
        if rotation_order == "q0slow_q1fast":
            self.U_rotation = [tensor(*rot) for rot in product(rotation_1q, repeat=N)]
        elif rotation_order == "q1slow_q0fast":
            if N != 2:
                raise ValueError("rotation_order='q1slow_q0fast' currently implemented for N=2.")
            U_rotation = []
            for r1 in rotation_1q:
                for r0 in rotation_1q:
                    U_rotation.append(tensor(r0, r1))
            self.U_rotation = U_rotation
        else:
            raise ValueError("rotation_order must be 'q0slow_q1fast' or 'q1slow_q0fast'.")

        # ------------------------- Z projectors -------------------------
        e_sys = [
            tensor(*state)
            for state in product([basis(2, i) for i in range(2)], repeat=N)
        ]
        self.proj_qst = [ket2dm(e) for e in e_sys]

        # observables (rotation-major then outcome)
        self.observables = [
            self.U_rotation[m].dag() * self.proj_qst[n] * self.U_rotation[m]
            for m in range(len(self.U_rotation))
            for n in range(len(self.proj_qst))
        ]

        # ------------------- optional ideal output -------------------
        self.rho_out_idea = None
        if random_channel is not None:
            try:
                Ks = random_channel
                self.rho_out_idea = [
                    Qobj(
                        np.sum([K @ rho.full() @ K.conj().T for K in Ks], axis=0),
                        dims=self.dim,
                    )
                    for rho in self.rho_in_idea
                ]
            except Exception:
                self.rho_out_idea = None

    # ------------------------- chi estimation -------------------------
    def get_chi_LS_X(self, rho_in_list, proj_list, normalized_basis=True, return_raw=True):
        """
        Direct CPTP chi estimation via convex LS (cvxpy), in chi representation.
        """
        N = self.N
        dim_chi = 4 ** N
        d = 2 ** N

        E = [P / np.sqrt(d) for P in self.pauli_sys] if normalized_basis else self.pauli_sys
        md = np.asarray(self.measure_data, dtype=float)

        # if md.ndim == 2 and md.shape[1] == (2 ** N):
        #     md = md[:, self.outcome_perm]

        n_inputs = len(rho_in_list)
        n_proj = len(proj_list)

        if md.shape == (n_inputs, n_proj):
            Ob = md.reshape(-1)
        else:
            n_outcomes = 2 ** N
            n_rots = 3 ** N
            expected = (n_inputs * n_rots, n_outcomes)

            if md.shape != expected:
                raise ValueError(
                    f"measure_data shape {md.shape} not understood. "
                    f"Expected either {(n_inputs, n_proj)} or {expected}."
                )

            md3 = md.reshape(n_inputs, n_rots, n_outcomes)
            md2 = md3.reshape(n_inputs, n_rots * n_outcomes)

            if md2.shape[1] != n_proj:
                raise ValueError(
                    f"proj_list length {n_proj} doesn't match inferred "
                    f"(n_rots*n_outcomes)={n_rots*n_outcomes}."
                )
            Ob = md2.reshape(-1)

        coeff = np.empty((n_inputs * n_proj, dim_chi * dim_chi), dtype=complex)
        row = 0
        for i in range(n_inputs):
            for j in range(n_proj):
                col = 0
                for m in range(dim_chi):
                    for n in range(dim_chi):
                        coeff[row, col] = (E[m] * rho_in_list[i] * E[n].dag() * proj_list[j]).tr()
                        col += 1
                row += 1

        X = cp.Variable((dim_chi, dim_chi), hermitian=True)
        x_vec = cp.reshape(X, (dim_chi * dim_chi,), order="C")
        obj = cp.Minimize(cp.norm2(coeff @ x_vec - Ob))

        constraints = [X >> 0]

        I_d = np.eye(d, dtype=complex)
        EnEm = [[(E[n].dag() * E[m]).full() for n in range(dim_chi)] for m in range(dim_chi)]

        for a in range(d):
            for b in range(d):
                expr = 0
                for m in range(dim_chi):
                    for n in range(dim_chi):
                        expr += X[m, n] * EnEm[m][n][a, b]
                constraints.append(cp.real(expr) == np.real(I_d[a, b]))
                constraints.append(cp.imag(expr) == np.imag(I_d[a, b]))

        prob = cp.Problem(obj, constraints)
        prob.solve(solver=cp.SCS, eps=1e-8, warm_start=True)

        if X.value is None:
            raise RuntimeError(f"Optimization failed. status={prob.status}")

        chi_norm = X.value
        return (chi_norm / d) if (normalized_basis and return_raw) else chi_norm

    # ---------------------- output-state estimation ----------------------
    def get_noisy_rho(self, chi_matrix, normalized_basis=True, chi_is_raw=True):
        """
        Given a fixed chi_matrix (estimated CPTP chi), estimate output states rho_out.
        """
        N = self.N
        d = 2 ** N
        dim_chi = 4 ** N

        E = [P / np.sqrt(d) for P in self.pauli_sys] if normalized_basis else self.pauli_sys

        chi_matrix = np.asarray(chi_matrix, dtype=complex)
        if chi_matrix.shape != (dim_chi, dim_chi):
            raise ValueError(f"chi_matrix shape {chi_matrix.shape} != {(dim_chi, dim_chi)}")

        chi_use = d * chi_matrix if (normalized_basis and chi_is_raw) else chi_matrix
        chi_vec = chi_use.reshape(-1, order="C")

        observables = self.observables
        OmatT = np.vstack([ob.full().T.reshape(-1, order="C") for ob in observables])

        rho_out_list = []
        for i in range(len(self.rho_in_idea)):
            A_i = np.empty((len(observables), dim_chi * dim_chi), dtype=complex)
            rho = self.rho_in_idea[i]

            for j, M in enumerate(observables):
                col = 0
                for m in range(dim_chi):
                    for n in range(dim_chi):
                        A_i[j, col] = (E[m] * rho * E[n].dag() * M).tr()
                        col += 1

            p_target = (A_i @ chi_vec).real

            X = cp.Variable((d, d), hermitian=True)
            x_vec = cp.reshape(X, (d * d,), order="C")
            p_rho = cp.real(OmatT @ x_vec)

            prob = cp.Problem(
                cp.Minimize(cp.norm2(p_rho - p_target)),
                [X >> 0, cp.trace(X) == 1],
            )
            prob.solve(solver=cp.SCS, eps=1e-6, warm_start=True)

            if X.value is None:
                raise RuntimeError(f"State estimation failed at input index {i}. status={prob.status}")

            rho_out_list.append(Qobj(X.value, dims=self.dim))

        return rho_out_list

    # ---------------------- noisy POVM estimation ----------------------
    def get_noisy_proj(self, chi_matrix):
        """
        For a given chi matrix, return revised POVM projectors while assuming the projector is perfect.
        """
        N = self.N
        dim_chi = 2 ** (2 * N)

        proj = self.observables
        rho_in_idea = self.rho_in_idea

        proj_out_list = []
        for j in range(3 ** N):  # 3^N groups of POVMs
            coeff_with_spam = np.empty((2 ** N, len(rho_in_idea), dim_chi ** 2), dtype=complex)

            h = 0
            for jj in range(j * 2 ** N, (j + 1) * 2 ** N):
                row = 0
                for i in range(len(rho_in_idea)):
                    col = 0
                    for m in range(dim_chi):
                        for n in range(dim_chi):
                            coeff_with_spam[h, row, col] = (
                                self.pauli_sys[m]
                                * rho_in_idea[i]
                                * self.pauli_sys[n].dag()
                                * proj[jj]
                            ).tr()
                            col += 1
                    row += 1
                h += 1

            d, n = 2 ** N, 2 ** N
            E = [cp.Variable((d, d), complex=True) for _ in range(n)]

            constraints = [Ei >> 0 for Ei in E]
            constraints.append(sum(E) == np.eye(d))

            chi_vec = np.reshape(chi_matrix, (dim_chi ** 2,))
            rho_list = [rho.full().T for rho in rho_in_idea]
            rho_vec = np.reshape(rho_list, (len(rho_in_idea), dim_chi))

            loss = sum(
                cp.norm(
                    coeff_with_spam[i, :, :] @ chi_vec
                    - cp.reshape(E[i], ((2 ** N) ** 2,), order="F") @ rho_vec.T,
                    2,
                )
                for i in range(n)
            )

            prob = cp.Problem(cp.Minimize(loss), constraints)
            prob.solve(solver=cp.SCS, eps=1e-6, warm_start=True)

            for M in [Ei.value for Ei in E]:
                proj_out_list.append(Qobj(M.conj(), dims=self.dim))

        return proj_out_list

    # ----------------------- pure-state QST (torch) -----------------------
    def get_noisy_psi(
        self,
        measure_data,
        steps=2000,
        lr=5e-2,
        dtype=torch.complex128,
        device="cpu",
        eps=1e-12,
    ):
        assert measure_data.shape == (9, 4)

        observables_36 = self.observables

        p_tar = np.asarray(measure_data, dtype=np.float64)
        p_tar = np.clip(p_tar, 0.0, None)
        p_tar = p_tar / (p_tar.sum(axis=1, keepdims=True) + 1e-15)
        p_tar = torch.tensor(p_tar.reshape(-1), dtype=torch.float64, device=device)

        obs_list = []
        for obs in observables_36:
            obs_np = obs.full() if hasattr(obs, "full") else np.asarray(obs)
            obs_list.append(torch.tensor(obs_np, dtype=dtype, device=device))
        O = torch.stack(obs_list, dim=0)

        dim = 4
        z = torch.randn(dim, 1, device=device, dtype=dtype) + 1j * torch.randn(dim, 1, device=device, dtype=dtype)
        z = torch.nn.Parameter(z)

        opt = torch.optim.Adam([z], lr=lr)

        def psi_from_z(z_):
            return z_ / (torch.linalg.norm(z_) + 1e-15)

        def predict_probs(psi):
            rho = psi @ psi.conj().T
            p = torch.einsum("ij,nji->n", rho, O).real
            return torch.clamp(p, 0.0, 1.0)

        def loss_fn():
            psi = psi_from_z(z)
            p = predict_probs(psi)
            return -(p_tar * torch.log(p + eps)).sum()

        best, best_z = float("inf"), None
        for _ in range(steps):
            opt.zero_grad(set_to_none=True)
            loss = loss_fn()
            loss.backward()
            torch.nn.utils.clip_grad_norm_([z], 1.0)
            opt.step()

            lv = float(loss.detach().cpu())
            if lv < best:
                best, best_z = lv, z.detach().clone()
            if best < 1e-10:
                break

        with torch.no_grad():
            psi = psi_from_z(best_z if best_z is not None else z).detach()
            phase = torch.angle(psi[0, 0] + 1e-15)
            psi = psi * torch.exp(-1j * phase)

        return psi.detach().cpu().numpy()

    # ---------------------- unitary learning (jax) ----------------------
    def get_UL_GD(self, Utar=None, measure_data=None, psi_in_list=None):
        """
        Utar: target gate
        measure_data: whole dataset of QPT
        psi_in_list: revised psi (from identity QST of an identity QPT)
        """
        N = self.N
        if psi_in_list is None:
            psi_in_list = [psi.full() for psi in self.psi_in_idea]

        measure_data = self.measure_data if measure_data is None else measure_data
        observables = self.observables

        def stiefel_update(params, grads, step_size):
            U = jnp.hstack([grads, params])
            V = jnp.hstack([params, -grads])
            prod_term = V.T.conj() @ U
            invterm = jnp.eye(prod_term.shape[0]) + (step_size / 2.0) * prod_term
            A = step_size * (U @ jnp.linalg.inv(invterm))
            B = V.T.conj() @ params
            return params - A @ B

        def L2_norm(params, measure_data_, psi_list):
            i = 0
            B_pred_list, B_tar_list = [], []

            for psi in psi_list:
                B_tar = jnp.array(
                    measure_data_[i * 3 ** N : (i + 1) * 3 ** N, : 2 ** N].flatten()
                )
                B_tar_list.append(B_tar)

                psi_pred = params @ psi
                rho = psi_pred @ psi_pred.conj().T
                B_pred = jnp.array([jnp.trace(rho @ obs.full()) for obs in observables])
                B_pred_list.append(B_pred)

                i += 1

            B_pred_list = jnp.array(B_pred_list)
            B_tar_list = jnp.array(B_tar_list)
            return jnp.linalg.norm(B_pred_list - B_tar_list) / (12 ** N)

        def qst_lossfun(params, measure_data_, psi_list):
            return L2_norm(params, measure_data_, psi_list)

        def get_gatefidelity(U1, U2):
            dim = np.shape(U1)[0]
            F_ave = (dim * (np.abs(np.trace(U1 @ U2.conj().T) ** 2) / (dim) ** 2) + 1) / (dim + 1)
            return np.real(F_ave)

        def get_opt_U(measure_data_, psi_list):
            params = rand_unitary(2 ** N, density=0.01).full()
            loss_hist_qst = []

            lr = 0.05
            alpha = 1.1
            prev_loss = float("inf")

            for _ in range(100):
                grads = jax.grad(qst_lossfun)(params, measure_data_, psi_list)
                grads = jnp.conj(grads)
                grads = grads / jnp.linalg.norm(grads)

                params = stiefel_update(params, grads, lr)
                current_loss = qst_lossfun(params, measure_data_, psi_list)

                if current_loss >= prev_loss:
                    lr *= 0.1
                    if lr < 1e-7:
                        break
                else:
                    lr = min(alpha * lr, 0.2)

                prev_loss = current_loss

                if not np.isfinite(np.sum(params @ params.conj().T)):
                    break
                if current_loss < 1e-7:
                    break

                loss_hist_qst.append(qst_lossfun(params, measure_data_, psi_list))

            return loss_hist_qst, params

        losslist, U_pre = get_opt_U(measure_data, psi_in_list)
        
        F = get_gatefidelity(Utar, U_pre)

        return U_pre, F, losslist


def get_average(chi1, chi2, N):
    return (get_chiF(chi1, chi2) * (2**N) + 1) / (2**N + 1)

def get_unitaryF(U1,U2):
    return np.abs(np.trace(U1@U2.conj().T))/np.shape(U1)[0]

def get_chiF(chi1, chi2):
    sqrt_chi1 = sqrtm(chi1)
    return np.real(np.trace(sqrtm(sqrt_chi1 @ chi2 @ sqrt_chi1))**2)

def get_idea_chi_matrix(random_channel, N):
    pauli = [qeye(2), sigmax(), sigmay(), sigmaz()]
    Elist = [tensor(*op).full() for op in product(pauli, repeat=N)]
    dim = 2**(2*N)
    chi_exact = np.zeros((dim, dim), dtype=complex)
    for a in range(dim):
        Ea = Elist[a]
        for b in range(dim):
            Eb = Elist[b]
            chi_exact[a, b] = np.sum([
                np.trace(Ea @ gate) * np.trace(Eb.conj().T @ gate.conj().T) / (2**(2*N))
                for gate in random_channel
            ])
    return chi_exact

def get_QPT_anlysis(measure_data_gate, U_tar=np.array([[1,0,0,0],[0,1,0,0],[0,0,1,0],[0,0,0,-1]]), measure_data_id=None, qnum=2):
    N, measure_data, random_channel = qnum, measure_data_gate, [U_tar]
    qpt = QPT(N=N, measure_data=measure_data, random_channel=None, notes="CZ",
              outcome_perm=(0,1,2,3), rotation_order="q0slow_q1fast")
    chi_idea = get_idea_chi_matrix(random_channel, N)
    if measure_data_id is not None:
        #print("===================performing the SPAM error mitigation QPT==========================")
        qpt0 = QPT(N=N, measure_data=measure_data_id, random_channel=None, notes="CZ",
                   outcome_perm=(0,1,2,3), rotation_order="q0slow_q1fast")
        chi_pred_id = qpt0.get_chi_LS_X(qpt0.rho_in_idea, qpt0.observables, normalized_basis=True, return_raw=True)
        psi_pre_list = [qpt0.get_noisy_psi(measure_data_id[i*9:(i+1)*9, :]) for i in range(4**N)]
        #print("fidelity of revised pure states:",[np.abs(psi.conj().T @ psi1.full())**2 for psi, psi1 in zip(psi_pre_list, qpt0.psi_in_idea)])
        proj_noisy = qpt0.get_noisy_proj(chi_pred_id)
        rho_in_noisy = qpt0.get_noisy_rho(chi_pred_id)
        #print("fidelity of revised density matrix:",[qt.fidelity(rho, rho1) for rho, rho1 in zip(rho_in_noisy, qpt0.rho_in_idea)])
        chi_pred_EM = qpt.get_chi_LS_X(rho_in_noisy, proj_noisy)
        FEM = get_average(chi_pred_EM, chi_idea, N)
        unitary_pre, FU_EM, losslist_EM = qpt.get_UL_GD(U_tar, measure_data, psi_pre_list)
        #print("Fidelity of EM-UL,", FU_EM, "Fidelity of EM-QPT,", FEM)
    else:
        #print("===================performing the std QPT==========================")
        unitary_pre, FU, losslist = qpt.get_UL_GD(U_tar, measure_data)
        chi_pred = qpt.get_chi_LS_X(qpt.rho_in_idea, qpt.observables, normalized_basis=True, return_raw=True)
        F = get_average(chi_pred, chi_idea, N)
        #print("Fidelity of UL:", FU, "Fidelity of std-QPT,", F)
        chi_pred_EM = chi_pred

    return chi_pred_EM, unitary_pre#, chi_pred_EM, losslist

 
def pauli_basis(num_qubits: int) -> list[qt.Qobj]:
    '''Generate the n-qubit Pauli tensor basis operators.

    Args:
        num_qubits: Number of qubits defining the tensor product space.

    Returns:
    List of tensor-product Pauli operators as 'Qobj' instances.
    '''
    single_qubit_paulis = [qt.qeye(2), qt.sigmax(), qt.sigmay(), qt.sigmaz()]
    return [qt.tensor(*ops) for ops in product(single_qubit_paulis, repeat=num_qubits)]

def chi_to_choi_pauli(
    chi , num_qubits: int
)  :
    '''Use ideal input states rho_k and evolved states E_rho_k to reconstruct the process Choi matrix.

    Args:
        E_rho_k: Output states after the process acts on 'rho_k'.
        num_qubits: Number of qubits in the system.

    Returns:
    Process Choi matrix as a 'Qobj'.
    '''
    
    dim = 2**num_qubits
    d2 = dim**2
    P = pauli_basis(num_qubits)
    P_vecs = [qt.operator_to_vector(p).full() for p in P]
    choi = sum(chi[m, n] * (P_vecs[m] @ P_vecs[n].conj().T) for m in range(d2) for n in range(d2))
    choi /= dim
    return qt.Qobj(choi,dims=[[[2]*num_qubits, [2]*num_qubits], [[2]*num_qubits, [2]*num_qubits]])


In [33]:
import numpy as np
from itertools import product

from qiskit import QuantumCircuit
from qiskit_aer import AerSimulator
from qiskit.quantum_info import Operator


class QPTDataFromUnitaryQiskit:
    """
    Generate QPT "measure_data" compatible with your QPT class, from an arbitrary N-qubit unitary U.

    Design matches your QPT(notes!=None) branch:
      - input states (per qubit): |0>, |1>, |+x>, |+y>
      - measurement rotations (per qubit): I, Ry(-pi/2), Rx(+pi/2)
      - output layout: (4^N * 3^N, 2^N), input-major then rotation-major

    Bit-order:
      - Qiskit count/prob keys are shown as q_{N-1}...q0 (MSB on left).
      - Your QuTiP tensor convention is q0...q_{N-1} (q0 as MSB).
      - We reorder probabilities accordingly (bit-reversal on the index).
    """

    def __init__(
        self,
        N: int,
        outcome_perm=None,
        simulator_method: str = "density_matrix",
    ):
        if N < 1:
            raise ValueError("N must be >= 1")
        self.N = N
        self.d = 2**N
        self.outcome_perm = tuple(range(self.d)) if outcome_perm is None else tuple(outcome_perm)
        if len(self.outcome_perm) != self.d:
            raise ValueError(f"outcome_perm must have length 2^N={self.d}")

        self._sim = AerSimulator(method=simulator_method)

        # QPT design (notes!=None)
        self.input_1q = ["0", "1", "+x", "+y"]
        self.rots_1q = ["I", "Ry(-pi/2)", "Rx(pi/2)"]

        self.inputs = list(product(self.input_1q, repeat=N))      # 4^N
        self.rotations = list(product(self.rots_1q, repeat=N))    # 3^N

        # Permutation: Qiskit displayed index (q_{N-1}...q0) -> QuTiP tensor index (q0...q_{N-1})
        self._qiskit_index_for_qutip_index = np.array(
            [self._bit_reverse(k, N) for k in range(self.d)], dtype=int
        )

    # ---------------- public API ----------------
    def simulate_measure_data(self, U, shots=None):
        """
        U: can be
           - np.ndarray (2^N x 2^N) unitary
           - qiskit.quantum_info.Operator
           - qiskit.QuantumCircuit (must be N qubits and unitary)

        shots: None -> exact probabilities (density_matrix simulator)
               int  -> sampling to match finite-shot experiment

        returns: measure_data with shape (4^N * 3^N, 2^N)
        """
        Uop = self._to_operator(U)
        if Uop.dim[0] != self.d:
            raise ValueError(f"Unitary dimension mismatch: expected {self.d}, got {Uop.dim[0]}")

        rows = []
        for inp in self.inputs:
            for rot in self.rotations:
                p = self._one_setting_probs(Uop, inp, rot, shots=shots)
                rows.append(p)

        return np.vstack(rows)

    # ---------------- internals: circuit + probs ----------------
    def _one_setting_probs(self, Uop: Operator, inp_labels, rot_labels, shots=None):
        qc = QuantumCircuit(self.N, self.N)

        # prepare input
        for q, lab in enumerate(inp_labels):
            self._prep_1q(qc, q, lab)

        # apply unitary
        qc.append(Uop, list(range(self.N)))

        # apply measurement rotations (before Z measurement)
        for q, rot in enumerate(rot_labels):
            self._meas_rot_1q(qc, q, rot)

        # measure in Z
        qc.measure(list(range(self.N)), list(range(self.N)))

        if shots is None:
            qc2 = qc.copy()
            qc2.save_probabilities_dict()
            result = self._sim.run(qc2).result()
            pd = result.data(0)["probabilities_dict"]

            # p_qiskit indexed by displayed bitstrings q_{N-1}...q0: '0...0'..'1...1'
            p_qiskit = np.array(
                [pd.get(format(i, f"0{self.N}b"), 0.0) for i in range(self.d)],
                dtype=float,
            )
        else:
            result = self._sim.run(qc, shots=shots).result()
            counts = result.get_counts(0)
            p_qiskit = np.array(
                [counts.get(format(i, f"0{self.N}b"), 0) / shots for i in range(self.d)],
                dtype=float,
            )

        # reorder to QuTiP tensor order q0...q_{N-1}
        p_qutip = p_qiskit[self._qiskit_index_for_qutip_index]

        # apply outcome permutation if your pipeline expects it
        p_qutip = p_qutip[list(self.outcome_perm)]
        return p_qutip

    # ---------------- helpers ----------------
    @staticmethod
    def _prep_1q(qc, qubit, label):
        if label == "0":
            return
        if label == "1":
            qc.x(qubit)
            return
        if label == "+x":
            qc.h(qubit)
            return
        if label == "+y":
            qc.h(qubit)
            qc.s(qubit)  # H then S: |0> -> |+y>
            return
        raise ValueError(f"Unknown input label: {label}")

    @staticmethod
    def _meas_rot_1q(qc, qubit, rot):
        if rot == "I":
            return
        if rot == "Ry(-pi/2)":
            qc.ry(-np.pi / 2, qubit)
            return
        if rot == "Rx(pi/2)":
            qc.rx(np.pi / 2, qubit)
            return
        raise ValueError(f"Unknown rotation: {rot}")

    def _to_operator(self, U):
        if isinstance(U, Operator):
            return U
        if isinstance(U, QuantumCircuit):
            if U.num_qubits != self.N:
                raise ValueError(f"Circuit qubits mismatch: expected N={self.N}, got {U.num_qubits}")
            return Operator(U)
        # assume numpy array
        U = np.asarray(U, dtype=complex)
        if U.shape != (self.d, self.d):
            raise ValueError(f"U must have shape ({self.d},{self.d}), got {U.shape}")
        return Operator(U)

    @staticmethod
    def _bit_reverse(x: int, nbits: int) -> int:
        y = 0
        for _ in range(nbits):
            y = (y << 1) | (x & 1)
            x >>= 1
        return y


In [58]:
from qiskit.quantum_info import random_unitary

N = 2
U = random_unitary(2**N).data  # numpy (4x4)

gen = QPTDataFromUnitaryQiskit(N=N)
md = gen.simulate_measure_data(U, shots=10000)  # 或 shots=None
print(md.shape)  # (4^N*3^N, 2^N) = (144,4)

random_channel=[U]
qpt0 = QPT(N, measure_data=md, random_channel=random_channel,rotation_order="q0slow_q1fast")
chi_pred = qpt0.get_chi_LS_X(qpt0.rho_in_idea, qpt0.observables) 
chi_idea = get_idea_chi_matrix(random_channel, N)
U_pred,F,loss = qpt0.get_UL_GD(Utar=U)

FU = get_unitaryF(U_pred,U)
print('the unitary fidelity:',FU)
FF_noEM = get_chiF(chi_pred, chi_idea)
print(f"Fidelity (no EM): {FF_noEM}")


(144, 4)
the unitary fidelity: 0.23639169335365295
Fidelity (no EM): 0.055801177973000826


In [50]:
U_pred

Array([[-0.13112748-0.55902445j,  0.6276813 -0.5256556j ],
       [-0.62768114-0.5256556j , -0.13112754+0.5590245j ]],      dtype=complex64)

In [55]:
np.abs(np.trace(U@U_pred.conj().T))/2

0.9999904632568359

In [4]:
import numpy as np
from itertools import product
import cvxpy as cp
import qutip as qt
from qutip import Qobj, basis, qeye, sigmax, sigmay, sigmaz, tensor, ket2dm
import qutip.qip.operations.gates as qugate

from qiskit import QuantumCircuit
from qiskit_aer import AerSimulator
from qiskit.quantum_info import Operator


# ============================================================
# 1) QPT analyzer (QuTiP)  —— 统一在这里处理 outcome_perm
# ============================================================
class QPT:
    """
    Input states:  [|0>, |1>, |+x>, |+y>]  (plus=(|0>+|1>)/sqrt2, plusy=(|0>+i|1>)/sqrt2)
    Rotations:     [I, Ry(-pi/2), Rx(+pi/2)]
    measure_data:  shape (4^N * 3^N, 2^N), input-major then rotation-major
    Columns (canonical): outcomes in QuTiP tensor order q0..q_{N-1} i.e. (0,0,...),(0,0,...,1),...,(1,1,...)
    If you want a different outcome order, pass outcome_perm and QPT will reorder internally.
    """

    def __init__(self, N, measure_data, rotation_order="q0slow_q1fast", outcome_perm=None, random_channel=None, U_tar=None):
        self.N = int(N)
        self.measure_data = np.asarray(measure_data, dtype=float)
        self.rotation_order = rotation_order
        self.random_channel = random_channel
        self.U_tar = U_tar

        self.d = 2 ** self.N
        self.dim_chi = 4 ** self.N
        self.dim = [[2] * self.N] * self.N if self.N > 1 else [[2], [2]]

        self.outcome_perm = tuple(range(self.d)) if outcome_perm is None else tuple(outcome_perm)
        if len(self.outcome_perm) != self.d:
            raise ValueError(f"outcome_perm must have length 2^N={self.d}")

        # Pauli basis {I,X,Y,Z}^{⊗N}
        pauli = [qeye(2), sigmax(), sigmay(), sigmaz()]
        self.pauli_sys = [tensor(*op) for op in product(pauli, repeat=self.N)]

        # inputs & rotations (严格匹配你给的顺序)
        zero, one = basis(2, 0), basis(2, 1)
        plus = (zero + one).unit()
        plusy = (zero + 1j * one).unit()
        inputst_1q = [zero, one, plus, plusy]
        rotation_1q = [qeye(2), qugate.ry(-np.pi / 2), qugate.rx(np.pi / 2)]

        self.psi_in_idea = [tensor(*psi) for psi in product(inputst_1q, repeat=self.N)]
        self.rho_in_idea = [ket2dm(psi) for psi in self.psi_in_idea]

        # rotations: must match simulator scanning order
        if rotation_order == "q0slow_q1fast":
            self.U_rotation = [tensor(*rot) for rot in product(rotation_1q, repeat=self.N)]
        elif rotation_order == "q1slow_q0fast":
            if self.N != 2:
                raise ValueError("rotation_order='q1slow_q0fast' implemented only for N=2.")
            self.U_rotation = [tensor(r0, r1) for r1 in rotation_1q for r0 in rotation_1q]
        else:
            raise ValueError("rotation_order must be 'q0slow_q1fast' or 'q1slow_q0fast'.")

        # Z projectors in QuTiP tensor order q0..q_{N-1}: product([0,1], repeat=N)
        e_sys = [tensor(*state) for state in product([basis(2, i) for i in range(2)], repeat=self.N)]
        self.proj_qst = [ket2dm(e) for e in e_sys]

        # observables: rotation-major then outcome
        self.observables = [
            self.U_rotation[m].dag() * self.proj_qst[n] * self.U_rotation[m]
            for m in range(len(self.U_rotation))
            for n in range(len(self.proj_qst))
        ]

    def get_chi_LS_X(self, normalized_basis=True, return_raw=True):
        """
        CPTP chi estimation via convex LS (cvxpy), chi in Pauli basis.
        """
        N, d, dim_chi = self.N, self.d, self.dim_chi
        E = [P / np.sqrt(d) for P in self.pauli_sys] if normalized_basis else self.pauli_sys

        md = np.asarray(self.measure_data, dtype=float)
        n_inputs = len(self.rho_in_idea)
        n_outcomes = d
        n_rots = 3 ** N
        expected = (n_inputs * n_rots, n_outcomes)
        if md.shape != expected:
            raise ValueError(f"measure_data shape {md.shape} not understood. Expected {expected}.")

        # outcome reorder handled here (only once)
        md = md[:, self.outcome_perm]

        # flatten into Ob consistent with observables ordering:
        # input-major then (rotation-major then outcome)
        Ob = md.reshape(n_inputs, n_rots, n_outcomes).reshape(n_inputs, n_rots * n_outcomes).reshape(-1)

        proj_list = self.observables
        n_proj = len(proj_list)

        coeff = np.empty((n_inputs * n_proj, dim_chi * dim_chi), dtype=complex)
        row = 0
        for i in range(n_inputs):
            rho = self.rho_in_idea[i]
            for j in range(n_proj):
                M = proj_list[j]
                col = 0
                for m in range(dim_chi):
                    Em = E[m]
                    for n in range(dim_chi):
                        coeff[row, col] = (Em * rho * E[n].dag() * M).tr()
                        col += 1
                row += 1

        X = cp.Variable((dim_chi, dim_chi), hermitian=True)
        x_vec = cp.reshape(X, (dim_chi * dim_chi,), order="C")
        obj = cp.Minimize(cp.norm2(coeff @ x_vec - Ob))
        constraints = [X >> 0]

        # TP constraint: sum_{mn} chi_mn E_n^† E_m = I
        I_d = np.eye(d, dtype=complex)
        EnEm = [[(E[n].dag() * E[m]).full() for n in range(dim_chi)] for m in range(dim_chi)]
        for a in range(d):
            for b in range(d):
                expr = 0
                for m in range(dim_chi):
                    for n in range(dim_chi):
                        expr += X[m, n] * EnEm[m][n][a, b]
                constraints += [cp.real(expr) == np.real(I_d[a, b]), cp.imag(expr) == np.imag(I_d[a, b])]

        prob = cp.Problem(obj, constraints)
        prob.solve(solver=cp.SCS, eps=1e-6, warm_start=True)
        if X.value is None:
            raise RuntimeError(f"Optimization failed. status={prob.status}")

        chi_norm = X.value
        return (chi_norm / d) if (normalized_basis and return_raw) else chi_norm


# ============================================================
# 2) Qiskit simulator —— 输出 canonical 列顺序(QuTiP tensor order)
# ============================================================
class QPTDataFromUnitaryQiskit:
    """
    Produce measure_data for QPT from an arbitrary N-qubit unitary U.
    Output columns are canonical QuTiP tensor order q0..q_{N-1} (q0 as MSB).
    rotation_order must match QPT.rotation_order.
    """

    def __init__(self, N, rotation_order="q0slow_q1fast", simulator_method="density_matrix"):
        self.N = int(N)
        self.d = 2 ** self.N
        self.rotation_order = rotation_order
        self.sim = AerSimulator(method=simulator_method)

        # design (strict order)
        self.input_1q = ["0", "1", "+x", "+y"]
        self.rots_1q = ["I", "Ry(-pi/2)", "Rx(pi/2)"]
        self.inputs = list(product(self.input_1q, repeat=self.N))  # 4^N

        if rotation_order == "q0slow_q1fast":
            self.rotations = list(product(self.rots_1q, repeat=self.N))  # 3^N
        elif rotation_order == "q1slow_q0fast":
            if self.N != 2:
                raise ValueError("rotation_order='q1slow_q0fast' implemented only for N=2.")
            self.rotations = [(r0, r1) for r1 in self.rots_1q for r0 in self.rots_1q]
        else:
            raise ValueError("rotation_order must be 'q0slow_q1fast' or 'q1slow_q0fast'.")

        # index map: qiskit displayed bitstring is q_{N-1}...q0, we want q0...q_{N-1}
        self._idx = np.array([self._bit_reverse(k, self.N) for k in range(self.d)], dtype=int)

    def measure_data(self, U, shots=None):
        Uop = self._to_operator(U)
        rows = [self._one_setting(Uop, inp, rot, shots) for inp in self.inputs for rot in self.rotations]
        return np.vstack(rows)

    def _one_setting(self, Uop, inp_labels, rot_labels, shots):
        qc = QuantumCircuit(self.N, self.N)

        for q, lab in enumerate(inp_labels):
            if lab == "1":
                qc.x(q)
            elif lab == "+x":
                qc.h(q)
            elif lab == "+y":
                qc.h(q); qc.s(q)

        qc.append(Uop, list(range(self.N)))

        for q, rot in enumerate(rot_labels):
            if rot == "Ry(-pi/2)":
                qc.ry(-np.pi / 2, q)
            elif rot == "Rx(pi/2)":
                qc.rx(np.pi / 2, q)

        qc.measure(range(self.N), range(self.N))

        if shots is None:
            qc2 = qc.copy()
            qc2.save_probabilities_dict()
            pd = self.sim.run(qc2).result().data(0)["probabilities_dict"]
            p_qiskit = np.array([pd.get(format(i, f"0{self.N}b"), 0.0) for i in range(self.d)], float)
        else:
            counts = self.sim.run(qc, shots=shots).result().get_counts(0)
            p_qiskit = np.array([counts.get(format(i, f"0{self.N}b"), 0) / shots for i in range(self.d)], float)

        # reorder to canonical QuTiP tensor outcome order (q0..q_{N-1})
        return p_qiskit[self._idx]

    def _to_operator(self, U):
        if isinstance(U, Operator):
            return U
        if isinstance(U, QuantumCircuit):
            if U.num_qubits != self.N:
                raise ValueError(f"Circuit qubits mismatch: expected {self.N}, got {U.num_qubits}")
            return Operator(U)
        U = np.asarray(U, dtype=complex)
        if U.shape != (self.d, self.d):
            raise ValueError(f"U must have shape ({self.d},{self.d}), got {U.shape}")
        return Operator(U)

    @staticmethod
    def _bit_reverse(x, nbits):
        y = 0
        for _ in range(nbits):
            y = (y << 1) | (x & 1)
            x >>= 1
        return y


In [15]:
# def get_idea_chi_matrix(random_channel, N):
#     pauli = [qeye(2), sigmax(), sigmay(), sigmaz()]
#     Elist = [tensor(*op).full() for op in product(pauli, repeat=N)]
#     dim = 2**(2*N)
#     chi_exact = np.zeros((dim, dim), dtype=complex)
#     for a in range(dim):
#         Ea = Elist[a]
#         for b in range(dim):
#             Eb = Elist[b]
#             chi_exact[a, b] = np.sum([
#                 np.trace(Ea @ gate) * np.trace(Eb.conj().T @ gate.conj().T) / (2**(2*N))
#                 for gate in random_channel
#             ])
#     return chi_exact

import numpy as np
from itertools import product
from qutip import qeye, sigmax, sigmay, sigmaz, tensor, Qobj

def get_ideal_chi_matrix_from_kraus(
    random_channel,
    N,
    normalized_basis=True,
    return_raw_like_qpt=True,
):
    """
    random_channel: list of Kraus operators {K_l}
      - each K can be np.ndarray (d,d) or qutip.Qobj
    N: number of qubits
    normalized_basis: match QPT.get_chi_LS_X(normalized_basis=...)
      - True  => E_m = P_m / sqrt(d)  (orthonormal)
      - False => E_m = P_m            (Tr(Pm^†Pn)=d δmn)
    return_raw_like_qpt:
      - True  => return chi/d when normalized_basis=True (matches your QPT.get_chi_LS_X default return_raw=True)
      - False => return the "chi_norm" used in the model equation directly
    """
    d = 2**N
    dim_chi = 4**N

    # Build Pauli tensor basis P_m in product([I,X,Y,Z], repeat=N) order (same as your QPT)
    pauli = [qeye(2), sigmax(), sigmay(), sigmaz()]
    P = [tensor(*ops).full() for ops in product(pauli, repeat=N)]

    if normalized_basis:
        E = [Pm / np.sqrt(d) for Pm in P]  # orthonormal: Tr(E_m^†E_n)=δ_mn
        norm_factor = 1.0
    else:
        E = P                               # Tr(P_m^†P_n)=d δ_mn
        norm_factor = d                     # a_m = Tr(P_m^†K)/d

    # Convert Kraus list to numpy
    Ks = []
    for K in random_channel:
        if isinstance(K, Qobj):
            K = K.full()
        K = np.asarray(K, dtype=complex)
        if K.shape != (d, d):
            raise ValueError(f"Kraus shape {K.shape} != ({d},{d}) for N={N}")
        Ks.append(K)

    # Coefficients a_{l,m} = Tr(E_m^† K_l)   (or /d if unnormalized basis)
    A = np.empty((len(Ks), dim_chi), dtype=complex)
    for l, K in enumerate(Ks):
        for m in range(dim_chi):
            A[l, m] = np.trace(E[m].conj().T @ K) / norm_factor

    # chi_{mn} = Σ_l a_{l,m} a_{l,n}^*
    chi = A.conj().T @ A

    # Match your QPT.get_chi_LS_X return convention if desired
    if normalized_basis and return_raw_like_qpt:
        return chi / d
    return chi


In [None]:
from qiskit.quantum_info import random_unitary
N = 2
U = random_unitary(2**N).data#np.array([[1,0,0,0],[0,1,0,0],[0,0,1,0],[0,0,0,-1]])# random_unitary(2**N).data  # numpy (4x4)
rotation_order = "q0slow_q1fast"   # 或 "q1slow_q0fast"(仅 N=2)
sim = QPTDataFromUnitaryQiskit(N, rotation_order=rotation_order)
md = sim.measure_data(U, shots=10000)  # shots=20000 也行

qpt0 = QPT(N, md, rotation_order=rotation_order, outcome_perm=None)
chi = qpt0.get_chi_LS_X(normalized_basis=True, return_raw=True)
chi_idea = get_ideal_chi_matrix_from_kraus(random_channel=[U], N=N)

FF_noEM = get_chiF(chi, chi_idea)   
print(f"Fidelity (no EM): {FF_noEM}")

Fidelity (no EM): 0.049194550556446254


In [17]:
chi_idea = get_ideal_chi_matrix_from_kraus(random_channel=[U], N=N)

FF_noEM = get_chiF(chi, chi_idea)
print(f"Fidelity (no EM): {FF_noEM}")

Fidelity (no EM): 0.049194550556446254
