In [1]:
import time
import numpy as np
from pymatching import Matching
import sys
from os.path import abspath, dirname
sys.path.append(abspath(dirname('test')).strip('experiments'))
from planar import Planar_rep_cir, subsamples
from codes import rep_cir
import stim

In [2]:
# MWPM decoding utilizing Pymatching

# Directly use the pymatching to decode DEM
def count_logical_errors(detector_error_model, detection_events, observable_flips):
    # Sample the circuit.
    num_shots = detection_events.shape[0]
    matcher = Matching.from_detector_error_model(detector_error_model)

    # Run the decoder.
    predictions = matcher.decode_batch(detection_events).squeeze()
    # Count the mistakes.
    num_errors = 0
    for shot in range(num_shots):
        actual_for_shot = observable_flips[shot][0]
        predicted_for_shot = predictions[shot]
        # print(actual_for_shot, predicted_for_shot)
        if not np.array_equal(actual_for_shot, predicted_for_shot):
            num_errors += 1
    return num_errors

# Use pymatching to decode DEM-code (consider the DEM as a code with code capacity noise)
def mwpm(pcm, lx, error_rates, syndrome):
    weights = np.log((1-np.array(error_rates))/np.array(error_rates))
    decoder = Matching(pcm, weights=weights)
    recover = decoder.decode(syndrome)
    L = (lx@recover.T)%2
    return L

# Simulation Rep-CLD

In [3]:
d = 3 # distance
r = 3 # rounds
error_prob = 0.05 # probability of errors generation
trial_num = 1000

#circuit
circuit = stim.Circuit.generated(code_task="repetition_code:memory",
                                        distance=d,
                                        rounds=r,
                                        after_clifford_depolarization=error_prob,
                                        before_measure_flip_probability=error_prob,
                                        after_reset_flip_probability=error_prob,
                                        )

# detector error model
dem = circuit.detector_error_model(decompose_errors=False, flatten_loops=True)
# define the DEM-code
rep = rep_cir(d, r)
rep.reorder(dem)

print('PCM(Hx) size:\n', rep.hx.shape)
print('Hz xize:\n', rep.hz.shape)
print('Commutation relation\n', rep.hx@rep.hz.T%2)

PCM(Hx) size:
 (8, 21)
Hz xize:
 (12, 21)
Commutation relation
 [[0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]
 [0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.]]


In [4]:
# define planar decoder
planar_decoder = Planar_rep_cir(rep.hx, rep.hz, rep.lx, rep.lz)
# extract priori probabilities from dem
num_ems = dem.num_errors   
er = []
for i in range(num_ems):
    er.append(dem[i].args_copy()[0])
# draw samples
sampler = circuit.compile_detector_sampler(seed=int(100*error_prob))
det, obv = sampler.sample(shots=trial_num, separate_observables=True)

# Directly use the pymatching to decode DEM
stim_result = count_logical_errors(dem, det, obv)

pym_result = 0
fail_num = 0
ts = 0
ne = 0
for trial in range(0, trial_num):
    syndrome = np.array(det[trial])
    try:
        t0 = time.perf_counter()
        recover = planar_decoder.decode(syndrome, er, rep.pebz)#
        Lr = (recover @ planar_decoder.lx.T)%2
        ts += time.perf_counter() - t0
        L = obv[trial][0]
        
        if Lr!=L:
            fail_num += 1
        
        Lmw = mwpm(rep.hx, rep.lx, er, syndrome)
        if Lmw!=L:
            pym_result += 1


    except Exception as e:
        ne += 1
        print('Error occurred in trial {}: {}'.format(trial, e))
        print('{} errors occurred'.format(ne))
        continue
    

    # if trial%100 == 0:
    #     print('trial:',trial, 'T:',ts/(trial+1-ne), 'mw:', pym_result/(trial+1-ne), 'pl:',fail_num/(trial+1-ne))
    
print('number of numerical errors', ne)
print('directly pym result:', stim_result/(trial_num-ne))
print('pym result:', pym_result/(trial_num-ne))
print('planar result:', fail_num/(trial_num-ne)) 
print('decoding time of planar:', ts/(trial_num-ne))


number of numerical errors 0
directly pym result: 0.068
pym result: 0.068
planar result: 0.064
decoding time of planar: 0.0025665260516107083


# Subsample simulaition

In [5]:
error_prob = 0.05 # probability of errors generation
trial_num = 10000

# higher distance code 
d = 5 # distance
r = 3 # rounds

circuit0 = stim.Circuit.generated(code_task="repetition_code:memory",
                                        distance=d,
                                        rounds=r,
                                        after_clifford_depolarization=error_prob,
                                        before_measure_flip_probability=error_prob,
                                        after_reset_flip_probability=error_prob,
                                        )

dem = circuit0.detector_error_model(decompose_errors=False, flatten_loops=True)
sampler = circuit0.compile_sampler(seed=int(100*error_prob)) #compile_detector_sampler(seed=int(100*error_prob))
samples = sampler.sample(shots=trial_num)
det, obv = circuit0.compile_m2d_converter().convert(measurements=samples, separate_observables=True)


In [6]:
# lower distance code 
ds = 3
subi=0

subsample = subsamples(ds, d, r, dem)
print(subsample[subi]['det'])
print(subsample[subi]['em'])
print(subsample[subi]['er'])
print(subsample[subi]['merge_e'])

[0, 1, 4, 5, 8, 9, 12, 13]
[0, 1, 2, 3, 5, 4, 12, 13, 14, 15, 17, 16, 24, 25, 26, 27, 29, 28, 36, 37, 38]
[0.06216441442020722, 0.06216441442020722, 0.11659999999999988, 0.02666666666666654, 0.11659999999999988, 0.08551564565112939, 0.02666666666666654, 0.02666666666666654, 0.11659999999999988, 0.02666666666666654, 0.11659999999999988, 0.05191111111111087, 0.02666666666666654, 0.02666666666666654, 0.11659999999999988, 0.02666666666666654, 0.11659999999999988, 0.05191111111111087, 0.06216441442020721, 0.06216441442020721, 0.06216441442020721]
[[(4, 6), (1, 2), (1, 6)], [(16, 18), (5, 6), (5, 10)], [(28, 30), (9, 10), (9, 14)]]


In [7]:
# define the DEM-code and planar decoder
circuit1 = stim.Circuit.generated(code_task="repetition_code:memory", distance=ds, rounds=r,
                                    after_clifford_depolarization=error_prob,
                                    before_measure_flip_probability=error_prob,
                                    after_reset_flip_probability=error_prob,)
dems = circuit1.detector_error_model(decompose_errors=False, flatten_loops=True)
rep = rep_cir(ds, r)
rep.reorder(dems)
planar_decoder = Planar_rep_cir(rep.hx, rep.hz, rep.lx, rep.lz)


# simulation
pym_result = 0
fail_num = 0
for i in range(len(subsample)):
    detectors = subsample[i]['det']
    er = subsample[i]['er']

    dets = np.array(det)[:, detectors]
    obvs = samples[:, -d+ds-1+i]

    pym_result_sub = 0
    fail_num_sub = 0
    for trial in range(trial_num):
        syndrome = dets[trial]
        L = obvs[trial]

        t0 = time.perf_counter()
        recover = planar_decoder.decode(syndrome, er, rep.pebz)#
        Lr = (recover @ planar_decoder.lx.T)%2
        ts += time.perf_counter() - t0
        if Lr!=L:
            fail_num_sub += 1
        
        Lmw = mwpm(rep.hx, rep.lx, er, syndrome)
        if Lmw!=L:
            pym_result_sub += 1
    pym_result+=pym_result_sub
    fail_num+=fail_num_sub
    print('sub_idx:', i, 'mw:',pym_result_sub, 'pl:', fail_num_sub)
print('pym result:', pym_result/(d-ds+1)/trial_num)
print('planar result:', fail_num/(d-ds+1)/trial_num)


sub_idx: 0 mw: 757 pl: 731
sub_idx: 1 mw: 959 pl: 907
sub_idx: 2 mw: 803 pl: 771
pym result: 0.08396666666666666
planar result: 0.0803


# Decoding (Rotated) Surface Code

In [8]:
from planar import Planar, generate_error, cal_syndrome, pymatching_decoder_warpper, decoding_success
from codes import gen_surface_code, gen_rotated_surface_code
from scipy.linalg import block_diag


code_type = 'sc' # 'rsc'
d=5
trial_num = 1000
error_type = 'depolarizing'
error_rates = np.linspace(0.1, 0.2, 5)
# print(error_rates)

if code_type == 'sc':
    n, m, hx, hz, lx, lz = gen_surface_code(d)
elif code_type == 'rsc':
    n, m, hx, hz, lx, lz = gen_rotated_surface_code(d)
assert not (hx @ lz.T % 2).any()
assert not (hz @ lx.T % 2).any()
stabilizers = block_diag(*(hz, hx))


planar_decoder = Planar(hx, hz, lx, lz)
matching_decoder = pymatching_decoder_warpper(hx, hz)
decoders = [matching_decoder, planar_decoder]





mw = []
pl = []
for error_rate in error_rates:

    if error_type == 'depolarizing':
        error_prob = np.array([1-error_rate, error_rate/3, error_rate/3, error_rate/3])
    elif error_type == 'z':
        error_prob = np.array([1-error_rate, 0.0, error_rate, 0.0])
    elif error_type == 'x':
        error_prob = np.array([1-error_rate, error_rate, 0.0, 0.0])
    elif error_type == 'uncorrelated':
        error_prob = np.array([(1-error_rate)**2, error_rate*(1-error_rate), error_rate*(1-error_rate), error_rate**2])

    rng = np.random.default_rng(int(1000*error_rate))

    fail_num = [0] * len(decoders)
    ts = [0] * len(decoders)
    for decoder in decoders:
        if type(decoder) == pymatching_decoder_warpper:
            decoder._set_up_decoder(error_prob)

    for trial in range(trial_num):
        error = generate_error(n, error_prob, rng)
        syndrome = cal_syndrome(error, stabilizers)
        # print(error)
        for j, decoder in enumerate(decoders):
            t0 = time.perf_counter()
            recovery = decoder.decode(syndrome, error_prob)
            ts[j] += time.perf_counter() - t0
            success = decoding_success(lx, lz, recovery, error)
            if not success:
                fail_num[j] += 1
    mw.append(fail_num[0]/trial_num)
    pl.append(fail_num[1]/trial_num)

print('MWPM results:', mw)
print('planar results:', pl)

MWPM results: [0.107, 0.173, 0.269, 0.345, 0.395]
planar results: [0.098, 0.16, 0.239, 0.337, 0.389]
