In [194]:
from typing import List, Sequence, Tuple
import itertools
import functools
import numpy as np

In [195]:
import leaky
from leaky import LeakyPauliChannel
import stim

In [196]:
import leaky

In [197]:
from qutip import *
from qutip import to_choi

## Functions

In [19]:
LeakageStatus = Tuple[int, ...]
ProjectStatus = Tuple[Tuple[int], ...]

PAULIS = np.array(
    [
        np.array([[1, 0], [0, 1]], dtype=complex),
        np.array([[0, 1], [1, 0]], dtype=complex),
        np.array([[0, -1j], [1j, 0]], dtype=complex),
        np.array([[1, 0], [0, -1]], dtype=complex),
    ]
)
TWO_QUBITS_PAULIS = np.array(
    [np.kron(p1, p2) for p1, p2 in itertools.product(PAULIS, repeat=2)]
)

In [20]:
def kraus_to_leaky_pauli_channel(
    kraus_operators: Sequence[np.ndarray],
    num_qubits: int,
    num_level: int,
    safety_check: bool = True,
) -> LeakyPauliChannel:

    if num_qubits not in [1, 2]:
        raise ValueError("Only 1 or 2 qubits operators are supported.")
    channel = LeakyPauliChannel(is_single_qubit_channel=num_qubits == 1)
    
    all_status = list(itertools.product(range(num_level - 1), repeat=num_qubits))
    
    for kraus in kraus_operators:
        for initial_status, final_status in itertools.product(all_status, repeat=2):
            num_u = sum(
                s0 == 0 and s1 > 0 for s0, s1 in zip(initial_status, final_status)
            )
            q_in_r = [
                i
                for i, (s0, s1) in enumerate(zip(initial_status, final_status))
                if s0 == 0 and s1 == 0
            ]
            num_r = len(q_in_r)

            prefactor: float = 1.0 / 2**num_u
            projectors = _scatter_status(initial_status, final_status)
            
            for initial_projector, final_projector in projectors:
                projected_kraus = _project_kraus_with_initial_final(
                    kraus, num_qubits, num_level, initial_projector, final_projector)
                probability: float
                pauli_channel: List[Tuple[int, float]] = []
                
                if num_r == 0:
                    assert projected_kraus.shape == (1, 1)
                    probability = (
                        prefactor * np.abs(projected_kraus).astype(float) ** 2
                    ).item()
                    pauli_channel.append((0, probability))
                else:
                    dim = 2**num_r
                    assert projected_kraus.shape == (dim, dim)
                    for i, paulis in enumerate(itertools.product(PAULIS, repeat=num_r)):
                        probability = (
                            prefactor
                            * np.abs(
                                np.trace(
                                    projected_kraus @ functools.reduce(np.kron, paulis)
                                )
                                / dim
                            )
                            ** 2
                        )
                        idx = sum(
                            [
                                ((i >> (2 * (num_r - j - 1))) & 0b11)
                                << (2 * (num_qubits - q - 1))
                                for j, q in enumerate(q_in_r)
                            ] )
                        pauli_channel.append((idx, probability))

                    probability = sum([p for _, p in pauli_channel])
                if probability < 1e-9:
                    continue
                for idx, p in pauli_channel:
                    if p < 1e-9:
                        continue
                    channel.add_transition(
                        leakage_status_tuple_to_int(initial_status),
                        leakage_status_tuple_to_int(final_status),
                        idx,
                        p,
                    )
    if safety_check:
        channel.safety_check()
    return channel

In [21]:
# 输出 LeakageStatus 的整数表示
def leakage_status_tuple_to_int(status: LeakageStatus) -> int:
    """Convert a leakage status tuple to an integer representation.

    Args:
        status: A tuple of leakage status. Currently, only support up to two
            qubits.

    Returns:
        An integer representation of the leakage status.
    """
    return sum([s << (4 * (len(status) - i - 1)) for i, s in enumerate(status)])

def _l2p(leakage_status: LeakageStatus) -> ProjectStatus:
    return tuple((0, 1) if s == 0 else (s + 1,) for s in leakage_status)

#输出 ProjectStatus 的整数表示
def _get_projector_slice(
    num_level: int,
    project_status: ProjectStatus,
) -> List[int]:
    """Get slice into the matrix for the subspace projection defined by project_status."""
    num_qubits = len(project_status)
    status = project_status[0]
    if num_qubits == 1:
        return list(status)
    tail_slice = _get_projector_slice(num_level, project_status[1:])
    return [x + s * num_level ** (num_qubits - 1) for s in status for x in tail_slice]

#提取 kraus operator 的子空间部分
def _project_kraus_with_initial_final(
    kraus: np.ndarray,
    num_qubits: int,
    num_level: int,
    initial_project_status: ProjectStatus,
    final_project_status: ProjectStatus,
) -> np.ndarray:
    assert kraus.shape[0] == num_level**num_qubits
    initial_slice = _get_projector_slice(num_level, initial_project_status)
    final_slice = _get_projector_slice(num_level, final_project_status)
    return kraus[final_slice, :][:, initial_slice]

In [22]:
# _get_projector_slice( 4, ((0,1), (3,)) )

In [23]:
# 输入初始和最终的 LeakageStatus，输出的是 ProjectStatus，将 U 和 D 的 basis states (0和1) 分开考虑。
def _scatter_status(
    initial: LeakageStatus, final: LeakageStatus
) -> List[Tuple[ProjectStatus, ProjectStatus]]:
    initial_project_status = _l2p(initial)
    final_project_status = _l2p(final)

    up_indices = [
        i
        for i, (start, end) in enumerate(zip(initial, final))
        if start == 0 and end > 0
    ]
    down_indices = [
        i
        for i, (start, end) in enumerate(zip(initial, final))
        if start > 0 and end == 0
    ]

    initial_combinations = list(
        itertools.product(
            *[
                [status] if index not in up_indices else [(0,), (1,)]
                for index, status in enumerate(initial_project_status)
            ]
        )
    )
    
    final_combinations = list(
        itertools.product(
            *[
                [status] if index not in down_indices else [(0,), (1,)]
                for index, status in enumerate(final_project_status)
            ]
        )
    )
    
    return list(itertools.product(initial_combinations, final_combinations))

In [24]:
# _scatter_status([0,2], [1,2])

## Utils_test

In [None]:
import pytest

In [None]:
from leaky.utils import (
    _get_projector_slice,
    decompose_kraus_operators_to_leaky_pauli_channel,
    leakage_status_tuple_to_int,
    PAULIS,
    TWO_QUBITS_PAULIS,
)

In [None]:
def test_get_projector_slice():
    assert _get_projector_slice(2, ((0,),)) == [0]
    assert _get_projector_slice(2, ((0, 1),)) == [0, 1]
    assert _get_projector_slice(3, ((2,),)) == [2]
    assert _get_projector_slice(3, ((0, 1), (2,))) == [2, 5]
    assert _get_projector_slice(3, ((2,), (2,))) == [8]
    assert _get_projector_slice(4, ((0, 1), (0, 1))) == [0, 1, 4, 5]
    assert _get_projector_slice(4, ((0, 1), (2,))) == [2, 6]

def test_leakage_status_tuple_to_int():
    assert leakage_status_tuple_to_int((0,)) == 0
    assert leakage_status_tuple_to_int((1,)) == 1
    assert leakage_status_tuple_to_int((2,)) == 2
    assert leakage_status_tuple_to_int((0, 1)) == 0x01
    assert leakage_status_tuple_to_int((1, 0)) == 0x10
    assert leakage_status_tuple_to_int((1, 1)) == 0x11
    assert leakage_status_tuple_to_int((2, 1)) == 0x21

In [None]:
# test_single_qubit_depolarize_decompose()

In [None]:
P = 1e-2
RNG = np.random.default_rng()

def test_single_qubit_depolarize_decompose():
    channel_probs_list = []
    for _ in range(10):
        probs = RNG.random(4)
        probs /= sum(probs)
        channel_probs_list.append(probs)
    for channel_probs in channel_probs_list:
        depolarize_channel = [
            np.sqrt(p) * pauli for p, pauli in zip(channel_probs, PAULIS)
        ]
        channel = decompose_kraus_operators_to_leaky_pauli_channel(
            depolarize_channel, 1, 2 )
        for i, p in enumerate(channel_probs):
            assert channel.get_prob_from_to(0, 0, i) == pytest.approx(p)
        print(channel)

In [None]:
def test_phase_damping_channel_decompose():
    k = 0.2
    kraus_operators = [
        np.array(
            [
                [1, 0],
                [0, np.sqrt(1 - k)],
            ],
            dtype=complex,
        ),
        np.array(
            [
                [0, 0],
                [0, np.sqrt(k)],
            ],
            dtype=complex,
        ),
    ]
    channel = decompose_kraus_operators_to_leaky_pauli_channel(kraus_operators, 1, 2)
    channel.get_prob_from_to(0, 0, 0) == pytest.approx((1 + np.sqrt(1 - k)) / 2)
    channel.get_prob_from_to(0, 0, 3) == pytest.approx((1 - np.sqrt(1 - k)) / 2)
    # print(channel)

In [None]:
# test_phase_damping_channel_decompose()

In [None]:
def test_two_qubit_depolarize_decompose():
    channel_probs_list = []
    for _ in range(10):
        probs = RNG.random(16)
        probs /= sum(probs)
        channel_probs_list.append(probs)
    for channel_probs in channel_probs_list:
        depolarize_channel = [
            np.sqrt(p) * pauli for p, pauli in zip(channel_probs, TWO_QUBITS_PAULIS)
        ]
        channel = decompose_kraus_operators_to_leaky_pauli_channel(
            depolarize_channel, 2, 2
        )

        for i, p in enumerate(channel_probs):
            assert channel.get_prob_from_to(0x00, 0x00, i) == pytest.approx(p)

In [None]:
THETA = np.pi / 6
U = np.array(
    [
        [1, 0, 0, 0],
        [0, np.cos(THETA), np.sin(THETA), 0],
        [0, -np.sin(THETA), np.cos(THETA), 0],
        [0, 0, 0, 1],
    ],
    dtype=complex,
)

In [None]:
def test_single_qubit_4level_unitary_decompose():
    channel = decompose_kraus_operators_to_leaky_pauli_channel([U], 1, 4)
    equality_check = {
        ((0,), (0,), 0): np.cos(THETA / 2) ** 4,
        ((0,), (0,), 3): np.sin(THETA / 2) ** 4,
        ((0,), (1,), 0): np.sin(THETA) ** 2 / 2,
        ((1,), (0,), 0): np.sin(THETA) ** 2,
        ((1,), (1,), 0): np.cos(THETA) ** 2,
        ((2,), (2,), 0): 1,
    }
    for initial, final in itertools.product(range(3), repeat=2):
        for pauli_idx in [0, 1, 2, 3]:
            expected_prob = equality_check.get(((initial,), (final,), pauli_idx), 0)
            computed_prob = channel.get_prob_from_to(initial, final, pauli_idx)
            assert computed_prob == pytest.approx(expected_prob)

In [None]:
def test_two_qubit_4level_unitary_decompose():
    channel = decompose_kraus_operators_to_leaky_pauli_channel([np.kron(U, U)], 2, 4)
    equality_check = {
        (0x00, 0x00, 0): np.cos(THETA / 2) ** 8,
        (0x00, 0x00, 15): np.sin(THETA / 2) ** 8,
        (0x00, 0x00, 10): 0,
        (0x00, 0x11, 0): np.sin(THETA) ** 4 / 4,
        (0x00, 0x01, 0): np.cos(THETA / 2) ** 4 * np.sin(THETA) ** 2 / 2,
        (0x00, 0x01, 12): np.sin(THETA / 2) ** 4 * np.sin(THETA) ** 2 / 2,
        (0x01, 0x01, 0): np.cos(THETA / 2) ** 4 * np.cos(THETA) ** 2,
        (0x00, 0x20, 0): 0,
        (0x02, 0x12, 0): np.sin(THETA) ** 2 / 2,
        (0x11, 0x11, 0): np.cos(THETA) ** 4,
        (0x12, 0x12, 0): np.cos(THETA) ** 2,
        (0x20, 0x20, 0): np.cos(THETA / 2) ** 4,
        (0x21, 0x21, 0): np.cos(THETA) ** 2,
        (0x22, 0x22, 0): 1,
    }
    for check, p in equality_check.items():
        assert channel.get_prob_from_to(*check) == pytest.approx(p)

## Leaky_pybind_test

In [None]:
def test_leaky_pauli_channel():
    channel = leaky.LeakyPauliChannel(is_single_qubit_channel=True)
    channel.add_transition(0, 1, 0, 1.0)
    channel.safety_check()
    assert channel.num_transitions == 1
    with pytest.raises(
        RuntimeError,
        match="The sum of probabilities for each initial status should be 1, but get ",
    ):
        channel.add_transition(1, 2, 0, 0.5)
        channel.safety_check()
    with pytest.raises(
        RuntimeError,
        match="The attached pauli of transitions for the qubits in D/U/L should be I",
    ):
        channel.add_transition(1, 1, 1, 0.5)
        channel.safety_check()

In [None]:
# test_leaky_pauli_channel()

In [None]:
def test_simulator_noiseless_bell_states():
    s = leaky.Simulator(4)
    s.do(leaky.Instruction("R", [0, 1, 2, 3]))
    s.do(leaky.Instruction("H", [0, 2]))
    s.do(leaky.Instruction("CNOT", [0, 1, 2, 3]))
    s.do(leaky.Instruction("M", [0, 1, 2, 3]))
    record = s.current_measurement_record()
    assert record[0] ^ record[1] == 0
    assert record[2] ^ record[3] == 0

In [None]:
def test_simulator_do_noiseless_bell_circuit():
    circuit = stim.Circuit(
        """R 0 1 2 3
        H 0 2
        CNOT 0 1 2 3
        M 0 1 2 3"""    )
    s = leaky.Simulator(4)
    s.do_circuit(circuit)
    record = s.current_measurement_record()
    assert record[0] ^ record[1] == 0
    assert record[2] ^ record[3] == 0

In [None]:
def test_simulator_do_leaky_channel():
    s = leaky.Simulator(4)
    channel_2q = leaky.LeakyPauliChannel(is_single_qubit_channel=False)
    channel_2q.add_transition(0x00, 0x10, 1, 1.0)
    s.do(leaky.Instruction("CZ", [0, 1, 2, 3]))
    s.apply_2q_leaky_pauli_channel([0, 1, 2, 3], channel_2q)
    s.do(leaky.Instruction("M", [0, 1, 2, 3]))
    assert s.current_measurement_record().tolist() == [2, 1, 2, 1]

    channel_1q = leaky.LeakyPauliChannel(is_single_qubit_channel=True)
    channel_1q.add_transition(1, 0, 0, 1.0)
    s.do(leaky.Instruction("H", [0, 2]))
    s.apply_1q_leaky_pauli_channel([0, 2], channel_1q)
    s.do(leaky.Instruction("M", [0, 1, 2, 3]))
    assert s.current_measurement_record().tolist()[-4:][0] in [0, 1]
    assert s.current_measurement_record().tolist()[-4:][2] in [0, 1]

    s.clear()
    assert s.current_measurement_record().size == 0

In [None]:
def test_simulator_bind_leaky_channel():
    s = leaky.Simulator(4)
    channel_1q_1 = leaky.LeakyPauliChannel(is_single_qubit_channel=True)
    channel_1q_1.add_transition(1, 0, 0, 1.0)
    channel_1q_2 = leaky.LeakyPauliChannel(is_single_qubit_channel=True)
    channel_1q_2.add_transition(1, 2, 0, 1.0)
    channel_2q = leaky.LeakyPauliChannel(is_single_qubit_channel=False)
    channel_2q.add_transition(0x00, 0x10, 1, 1.0)
    s.bind_leaky_channel(leaky.Instruction("H", [0]), channel_1q_1)
    s.bind_leaky_channel(leaky.Instruction("H", [2]), channel_1q_2)
    s.bind_leaky_channel(leaky.Instruction("CNOT", [0, 1]), channel_2q)
    s.bind_leaky_channel(leaky.Instruction("CNOT", [2, 3]), channel_2q)
    
    s.do_circuit(stim.Circuit("X 0 2\nCNOT 0 1 2 3\nM 0 1 2 3"))
    
    assert len(s.bound_leaky_channels) == 4
    assert s.current_measurement_record().tolist() == [2, 0, 2, 0]
    s.do(leaky.Instruction("H", [0, 2]))
    s.do(leaky.Instruction("M", [0, 1, 2, 3]))
    assert s.current_measurement_record().tolist()[-4] in [0, 1]
    assert s.current_measurement_record().tolist()[-3:] == [0, 3, 0]

    s.clear(clear_bound_channels=True)
    assert len(s.bound_leaky_channels) == 0
    s.do_circuit(stim.Circuit("X 0 2\nCNOT 0 1 2 3\nM 0 1 2 3"))
    assert s.current_measurement_record().tolist() == [1, 1, 1, 1]

## Test

In [239]:
channel_1q = leaky.LeakyPauliChannel(is_single_qubit_channel=True)
channel_1q.add_transition(0, 1, 0, 1)

channel_1q_2 = leaky.LeakyPauliChannel(is_single_qubit_channel=True)
channel_1q_2.add_transition(0, 2, 0, 1.0)

channel_2q = leaky.LeakyPauliChannel(is_single_qubit_channel=False)
channel_2q.add_transition(0x00, 0x01, 0, 1.0)

In [240]:
s = leaky.Simulator(4)

s.bind_leaky_channel(leaky.Instruction("H", [0]), channel_1q)
# s.bind_leaky_channel(leaky.Instruction("H", [1]), channel_1q)
# s.bind_leaky_channel(leaky.Instruction("I", [1]), channel_1q_2)
# s.bind_leaky_channel(leaky.Instruction("II", [0,1]), channel_2q)

# s.do(leaky.Instruction("ISWAP", [0, 1]))
# s.do(leaky.Instruction("CX", [0,1]))
# s.apply_1q_leaky_pauli_channel([0], channel_1q)

# s.apply_2q_leaky_pauli_channel([0, 1], channel_2q)

# s.do(leaky.Instruction("M", [0, 1, 2, 3]))

s.do_circuit(stim.Circuit("H 0 1 \n H 0 1\nM 0 1 2 3") )

s.current_measurement_record().tolist()

[2, 0, 0, 0]

In [None]:
# simulator.clear(clear_bound_channels=True)
simulator = leaky.Simulator(num_qubits)

simulator.bind_leaky_channel(leaky.Instruction("H", [0,1]), channel_1q_1)

# simulator.do_circuit( circuit )
# print(simulator.current_measurement_record().tolist())
# simulator.sample_batch(circuit, shots=10)
# simulator.bound_leaky_channels

In [None]:
# circuit.diagram('timeline-svg')

In [241]:
circuit = stim.Circuit()

circuit.append("DEPOLARIZE2", [0,1], 1)
circuit.append("M", [0,1])

s = leaky.Simulator(2)
# s.apply_1q_leaky_pauli_channel([0], channel_1q)

s.do_circuit( circuit )
print(s.current_measurement_record().tolist())

[0, 1]
