## V1

初步实现我our decoder，精度和UF相当，但低于远低于BP+OSD

In [8]:
from ldpc.codes import rep_code, ring_code, hamming_code
from bposd.hgp import hgp
import time

construct surface code based on rep_code

In [9]:
h = rep_code(3)
surface_code = hgp(h1=h, h2=h, compute_distance=True)
# surface_code.test()

print(f"surface_code.hz = {surface_code.hz}")
print(f"surface_code.hx = {surface_code.hx}")
print(f"surface_code lz = {surface_code.lz}")
print(f"surface_code lx = {surface_code.lx}")

surface_code.hz = [[1 1 0 0 0 0 0 0 0 1 0 0 0]
 [0 1 1 0 0 0 0 0 0 0 1 0 0]
 [0 0 0 1 1 0 0 0 0 1 0 1 0]
 [0 0 0 0 1 1 0 0 0 0 1 0 1]
 [0 0 0 0 0 0 1 1 0 0 0 1 0]
 [0 0 0 0 0 0 0 1 1 0 0 0 1]]
surface_code.hx = [[1 0 0 1 0 0 0 0 0 1 0 0 0]
 [0 1 0 0 1 0 0 0 0 1 1 0 0]
 [0 0 1 0 0 1 0 0 0 0 1 0 0]
 [0 0 0 1 0 0 1 0 0 0 0 1 0]
 [0 0 0 0 1 0 0 1 0 0 0 1 1]
 [0 0 0 0 0 1 0 0 1 0 0 0 1]]
surface_code lz = [[1 0 0 1 0 0 1 0 0 0 0 0 0]]
surface_code lx = [[1 1 1 0 0 0 0 0 0 0 0 0 0]]


initialize decoder
+ BP+OSD
+ UFDecoder

In [10]:
import numpy as np
from ldpc import bposd_decoder
from mqt.qecc import *  # UFDecoder

p = 0.05  # 错误率

# BP+OSD
bposd_decoder = bposd_decoder(
    surface_code.hz,
    error_rate=p,
    channel_probs=[None],
    max_iter=surface_code.N,
    bp_method="ms",
    ms_scaling_factor=0,
    osd_method="osd_0",
    osd_order=7,
)

# UFDecoder
code = Code(surface_code.hx, surface_code.hz)
uf_decoder = UFHeuristic()
uf_decoder.set_code(code)

decode and calculate success rate
+ BP+OSD
+ UFDecoder (mqt-qecc)
+ Our method

In [11]:
# 上述surface_code.hz对应的正确的B
B = np.array(
    [
        [1, 0, 0, 1, 0, 0, 0],
        [1, 0, 0, 0, 1, 0, 0],
        [0, 1, 0, 1, 0, 1, 0],
        [0, 1, 0, 0, 1, 0, 1],
        [0, 0, 1, 0, 0, 1, 0],
        [0, 0, 1, 0, 0, 0, 1],
    ]
)
# 错误的B
# B = np.array(
#     [
#         [-1, 0, 0, 1, -1, 0, 0],
#         [1, 0, 0, 0, 1, 0, 0],
#         [0, -1, 0, 1, -1, 1, -1],
#         [0, 1, 0, 0, 1, 0, 1],
#         [0, 0, -1, 0, 0, 1, -1],
#         [0, 0, 1, 0, 0, 0, 1],
#     ]
# )

H_X = surface_code.hz

weights = [
    np.log((1 - p) / p) for i in range(H_X.shape[1])
]  # 初始每个qubit的对数似然比

W_f = weights[: H_X.shape[0]]
W_g = weights[H_X.shape[0] :]

W_f_B = np.dot(W_f, B)  # W_f * B
W_g_B_W_f = W_f_B + W_g  # W_f * B + W_g
# print(f"W_g_B_W_f = {W_g_B_W_f}")

# 我们自己算出来的g
g = np.where(
    W_g_B_W_f > 0,
    0,
    np.where(W_g_B_W_f < 0, 1, np.random.randint(0, 2, size=W_g_B_W_f.shape)),
)
print(f"g = {g}")

B_g = np.dot(B, g)  # B * g
print(f"B_g = {B_g}")

g = [0 0 0 0 0 0 0]
B_g = [0 0 0 0 0 0]


In [12]:
np.random.seed(1)

num_trials = 10000

bposd_num_success = 0
uf_num_success = 0
our_num_success = 0

bposd_time = 0
uf_time = 0
our_time = 0

num_bp_fail = 0


for i in range(num_trials):

    # Generate random error
    error = np.zeros(surface_code.N).astype(int)
    for q in range(surface_code.N):
        if np.random.rand() < p:
            error[q] = 1

    # Obtain syndrome
    syndrome = surface_code.hz @ error % 2

    """Decode"""
    # 1. BP+OSD
    bp_start_time = time.time()
    bposd_decoder.decode(syndrome)

    bp_last_round_result = bposd_decoder.bp_decoding
    bp_log_prob_ratios = bposd_decoder.log_prob_ratios
    # print(f"bp_last_round_result = {bp_last_round_result}")
    # print(f"bp_log_prob_ratios = {bp_log_prob_ratios}")

    bp_end_time = time.time()

    # 2. UFDecoder
    uf_start_time = time.time()
    uf_decoder.decode(syndrome)
    uf_result = np.array(uf_decoder.result.estimate).astype(int)
    uf_end_time = time.time()

    # 3. Our Decoder
    bposd_result = bposd_decoder.osdw_decoding

    """不同方法的g"""
    bposd_g = np.array(
        [bposd_result[1], bposd_result[4], bposd_result[7]] + list(bposd_result[9:])
    )
    UF_g = np.array([uf_result[1], uf_result[4], uf_result[7]] + list(uf_result[9:]))
    True_g = np.array([error[1], error[4], error[7]] + list(error[9:]))
    random_g = np.zeros_like(g)
    for idx in range(len(random_g)):
        if np.random.rand() < p:
            random_g[idx] = 1

    """我们的g，带入bp的结果"""
    weights = bp_log_prob_ratios

    W_f = weights[: H_X.shape[0]]
    W_g = weights[H_X.shape[0] :]

    W_f_B = np.dot(W_f, B)  # W_f * B
    W_g_B_W_f = W_f_B + W_g  # W_f * B + W_g
    # print(f"W_g_B_W_f = {W_g_B_W_f}")

    # 我们自己算出来的g
    g = np.where(
        W_g_B_W_f > 0,
        0,
        np.where(W_g_B_W_f < 0, 1, np.random.randint(0, 2, size=W_g_B_W_f.shape)),
    )

    """拼接得到我们的结果"""
    our_start_time = time.time()
    f = (np.dot(B, g) + syndrome) % 2
    our_result = np.hstack((f, g))

    """对我们的解码结果进行列变换，与H的列变换保持一致，不然精度下降，比如从86.6% -> 72.02%"""
    trans_results = np.zeros_like(our_result, dtype=int)
    trans_results[0] = our_result[0]
    trans_results[1] = our_result[6]
    trans_results[2] = our_result[1]
    trans_results[3] = our_result[2]
    trans_results[4] = our_result[7]
    trans_results[5] = our_result[3]
    trans_results[6] = our_result[4]
    trans_results[7] = our_result[8]
    trans_results[8] = our_result[5]
    trans_results[9:] = our_result[9:]

    our_end_time = time.time()

    bposd_time += bp_end_time - bp_start_time
    uf_time += uf_end_time - uf_start_time
    our_time += our_end_time - our_start_time

    # assert ((surface_code.hz @ trans_results) % 2 == syndrome).all(), (
    #     trans_results,
    #     error,
    #     bposd_result,
    # )

    # He = np.dot(surface_code.hz, bposd_decoder.osdw_decoding) % 2
    # print(f"He = {He}, syndrome = {syndrome}, is equal: {np.array_equal(He, syndrome)}")

    """Calculate success rate for each decoder"""
    bposd_residual_error = (bp_last_round_result + error) % 2
    bposd_flag = (surface_code.lz @ bposd_residual_error % 2).any()
    if bposd_flag == 0:
        bposd_num_success += 1
    else:  # BP失败
        num_bp_fail += 1
        # 尝试BP+OSD，看能否成功
        bposd_residual_error = (bposd_decoder.osdw_decoding + error) % 2
        bposd_flag = (surface_code.lz @ bposd_residual_error % 2).any()
        if bposd_flag == 0:
            print(
                f"{bposd_decoder.converge}, BP failed, OSD win. error = {error}, BP = {bp_last_round_result}, BP+OSD = {bposd_decoder.osdw_decoding}. is equal: {np.array_equal(bp_last_round_result, bposd_decoder.osdw_decoding)}"
            )

    uf_residual_error = (uf_result + error) % 2
    uf_flag = (surface_code.lz @ uf_residual_error % 2).any()
    if uf_flag == 0:
        uf_num_success += 1

    our_residual_error = (trans_results + error) % 2
    our_flag = (surface_code.lz @ our_residual_error % 2).any()
    if our_flag == 0:
        our_num_success += 1

    # # 验证一下我们的g和bposd的g
    # if our_flag == 1 and bposd_flag == 0:
    #     print(f"our g = {g}, bposd g = {bposd_g}, True g = {True_g}")

    # 验证BP和BP+OSD的区别
    # print(f"BP: {bp_last_round_result}, BP+OSD: {bposd_decoder.osdw_decoding}, is equal: {np.array_equal(bp_last_round_result, bposd_decoder.osdw_decoding)}")
    # if np.array_equal(bp_last_round_result, bposd_decoder.osdw_decoding):
    #     num_equal += 1


bposd_success_rate = bposd_num_success / num_trials
uf_success_rate = uf_num_success / num_trials
our_success_rate = our_num_success / num_trials
print(f"\nTotal trials: {num_trials}")
print(f"BP Success rate: {bposd_success_rate * 100:.2f}%, total time: {bposd_time}s")
print(f"UF Success rate: {uf_success_rate * 100:.2f}%, total time: {uf_time }s")
print(f"Our Success rate: {our_success_rate * 100:.2f}%, total time: {our_time}s")

print(f"number od BP fail = {num_bp_fail}")

0, BP failed, OSD win. error = [1 0 0 0 1 0 0 0 0 0 0 0 0], BP = [0 0 0 0 1 0 0 0 0 1 0 0 0], BP+OSD = [1 0 0 0 1 0 0 0 0 0 0 0 0]. is equal: False
0, BP failed, OSD win. error = [0 0 0 1 0 0 0 0 0 0 0 0 1], BP = [0 0 0 0 1 0 0 0 0 0 0 0 1], BP+OSD = [0 0 0 1 0 0 0 0 0 0 0 0 1]. is equal: False
0, BP failed, OSD win. error = [0 0 0 1 0 0 0 0 0 0 0 0 1], BP = [0 0 0 0 1 0 0 0 0 0 0 0 1], BP+OSD = [0 0 0 1 0 0 0 0 0 0 0 0 1]. is equal: False
0, BP failed, OSD win. error = [1 0 0 0 0 0 0 0 0 1 0 0 1], BP = [0 0 0 0 1 0 0 0 0 0 0 0 1], BP+OSD = [0 0 0 1 0 0 0 0 0 0 0 0 1]. is equal: False
0, BP failed, OSD win. error = [0 0 0 1 0 0 0 1 0 0 0 0 0], BP = [0 0 0 0 0 0 0 1 0 0 0 1 0], BP+OSD = [0 0 0 1 0 0 0 1 0 0 0 0 0]. is equal: False
0, BP failed, OSD win. error = [1 0 0 0 0 0 0 0 0 0 1 0 0], BP = [0 1 0 0 0 0 0 0 0 0 1 0 0], BP+OSD = [1 0 0 0 0 0 0 0 0 0 1 0 0]. is equal: False
0, BP failed, OSD win. error = [1 0 0 0 0 0 0 0 0 0 1 0 0], BP = [0 1 0 0 0 0 0 0 0 0 1 0 0], BP+OSD = [1 0 0 0 

Perform Gaussian Elimination on parity check matrix Hz

下面的高斯消元代码结果不对，如果矩阵满秩，一定能化成[I, B]

In [13]:
import numpy as np


def gaussian_elimination(H, s):

    m, n = H.shape
    H_prime = H.copy()
    s_prime = s.copy()

    # Start Gaussian elimination
    for i in range(m):
        # Ensure the pivot element is 1
        if H_prime[i, i] == 0:
            # Swap rows if needed to make H_prime[i, i] == 1
            for j in range(i + 1, m):
                if H_prime[j, i] == 1:
                    H_prime[[i, j]] = H_prime[[j, i]]
                    s_prime[[i, j]] = s_prime[[j, i]]
                    break

        # Eliminate entries below the pivot
        for j in range(i + 1, m):
            if H_prime[j, i] == 1:
                H_prime[j] = (H_prime[j] + H_prime[i]) % 2
                s_prime[j] = (s_prime[j] + s_prime[i]) % 2

    # Perform back substitution to get the final form [I | B]
    for i in range(m - 1, -1, -1):
        for j in range(i - 1, -1, -1):
            if H_prime[j, i] == 1:
                H_prime[j] = (H_prime[j] + H_prime[i]) % 2
                s_prime[j] = (s_prime[j] + s_prime[i]) % 2

    return H_prime, s_prime


# Example usage:
# Suppose H is a 3x6 matrix and s is a 3x1 syndrome vector
H = np.array([[1, 1, 0, 1, 0, 1], [1, 0, 1, 0, 1, 1], [0, 1, 1, 1, 1, 0]], dtype=int)

s = np.array([1, 0, 1], dtype=int)

# Perform Gaussian elimination
H_prime, s_prime = gaussian_elimination(H, s)

print("Reduced matrix H':")
print(H_prime)
print("Adjusted syndrome vector s':")
print(s_prime)

"""
1 0 0 0 0 1
0 1 0 1 0 0
0 0 1 0 1 0
"""

Reduced matrix H':
[[1 0 1 0 1 1]
 [0 1 1 1 1 0]
 [0 0 0 0 0 0]]
Adjusted syndrome vector s':
[0 1 0]


'\n1 0 0 0 0 1\n0 1 0 1 0 0\n0 0 1 0 1 0\n'

验证矩阵是否满秩

In [17]:
import numpy as np


def is_full_rank(H_X):
    """
    判断校验矩阵 H_X 是否为满秩 (Full Rank)
    """
    rank = np.linalg.matrix_rank(H_X)  # 计算矩阵的秩
    print(f"rank = {rank}")
    num_rows = H_X.shape[0]  # 计算行数
    return rank == num_rows  # 满秩返回 True，否则 False


# 示例矩阵
h = ring_code(5)
surface_code = hgp(h1=h, h2=h, compute_distance=True)

H = surface_code.hz
# H_X = H

print("H 是否满秩:", is_full_rank(H))
print(H)

rank = 25
H 是否满秩: True
[[1 1 0 ... 0 0 0]
 [0 1 1 ... 0 0 0]
 [0 0 1 ... 1 0 0]
 ...
 [0 0 0 ... 1 0 0]
 [0 0 0 ... 0 1 0]
 [0 0 0 ... 0 0 1]]
