In [2]:
!pip install stable_baselines3[extra] -q
!pip install pyglet==1.5.27 -q
!pip install -U bposd -q

In [1]:
# Append the common library for CPC codes
import os
import sys
# TODO: lets do something better here like refactor the common parts and different learning mech parts
sys.path.append(os.getcwd() + "/src")

## Setup the RL Env

In [11]:
import os
from stable_baselines3.common.env_checker import check_env
import gym
from gym import spaces
import numpy as np
import torch
import utils
from global_params import params
from scoring import score_dataset
from CPC import cpc_code


def flatten(l):
    return [item for sublist in l for item in sublist]


"""
Some quick thoughts:
-- Should we start with a specific code each time or always a new random code?
"""


class SwapLDPCEnv(gym.Env):
    """Custom Environment that follows gym interface"""
    metadata = {'render.modes': ['human']}

    def __init__(self, save_best=True):
        super(SwapLDPCEnv, self).__init__()

        # self.target_succ_rate = target_succ_rate
        # Each action corresponds to choosing to parity checks and the corresponding edges to swap
        self.action_space = spaces.MultiDiscrete([
            3,  # select which matrix to operate on, m_b, m_p, or m_c
            # select which parity check to operate on
            params['n_check_qubits'],
            # select which data qubit to operate on. If m_c is selected, have choosing a data qubit
            params['n_data_qubits'],
            # higher than the check qubit index return a low reward
        ])
        self.n_steps = 0
        self.save_best = save_best
        self.last_save_diff = 0

        self.n_qubits = n_qubits = params['n_data_qubits'] + params['n_check_qubits']
        flattened_pc_size = 2 * \
            (n_qubits) * \
            params['n_check_qubits']

		# The first n qubits represent the noise distribution
        # TODO: THIS ALLOWS US TO TRAIN FOR "ADAPTIVE NOISE!!" (i.e. lets decrease connections...)
        # The quantum parity check matrix
        self.observation_space = spaces.Box(low=0, high=self.n - 1,
                                            shape=(n_qubits + flattened_pc_size), dtype=np.float32)
        self.best_succ = 0

    def step(self, action):
        self.n_steps += 1
        if action[0] == 0:
            self.m_b[action[1], action[2]] = 1 - self.m_b[action[1], action[2]]
        elif action[0] == 1:
            self.m_p[action[1], action[2]] = 1 - self.m_p[action[1], action[2]]
        elif action[0] == 2:
            if action[2] >= params['n_check_qubits']:
                flattened = np.array(self.code_pc_adj).astype(
                    np.int16).flatten()
                return flattened, -10, False, {}  # Return a very low reward
            self.m_c[action[1], action[2]] = 1 - self.m_c[action[1], action[2]]
        else:
            raise "Undefined selector action"

        code_pc = cpc_code.get_classical_code_cpc(self.m_b, self.m_p, self.m_c)
        # TODO: p_fail??
        p_fails = np.ones(self.n_qubits) * np.random.uniform(low=params['constant_error_rate_lower'], high=params['constant_error_rate_upper'])
        succ_rate = score_dataset.run_decoder(code_pc, p_fail=self.p_fail)

        reward = 0
        if self.save_best and succ_rate > self.best_succ and self.last_save_diff > 100:
            path = utils.get_best_scoring_model_path_rl()
            old_file_path = f"pcs/pc_{self.best_succ}"
            if self.best_succ != 0 and os.path.isfile(old_file_path):
                os.remove(old_file_path)
            print("Saving new best with succ_rate", succ_rate)
            filename_base = f"pc-[{self.n},{self.k}]-p{self.p_fail}"
            np.savetxt(f"pcs/{filename_base}-pc.txt", self.code_pc)
            f = open(f"pcs/{filename_base}-data.txt", "w")
            self.n_samples_no_change_max_runs_accum = 0
            self.last_save_diff = 0
        else:
            self.last_save_diff += 1

        flattened = np.array(code_pc).astype(np.float32).flatten()
        obs = np.concatenate(p_fails, flatten)
        return flattened, reward, succ_rate >= self.target_succ_rate, {}

    def reset(self):
        self.code_pc = self.start_pc
        self.code_pc_adj = self.start_pc_adj
        # reward, done, info can't be included
        npd = np.array(self.code_pc_adj).astype(np.int16)
        return npd.flatten()

    def render(self, mode='console'):
        pass

    def close(self):
        pass


n = GLOBAL_N
k = GLOBAL_K
degRowAvg = GLOBAL_DEG_ROW
# TODO: this gen_random_ldpc is NO GOOD FOR NOW: it starts with a random number of edges and because we never add or subract edges for now
# This poses a problem. TODO: there may be something smart to do here later with adding subbing edges


def mat_to_adj_list(H):
    def row_adj(i):
        return [j for j in range(H[i].shape[-1]) if H[i][j] > 0]
    adj_list = [row_adj(i) for i in range(H.shape[0])]
    return adj_list

# A fundamental problem we have here is the speed at which we determine what a "good" code is
# Maybe there is a faster way... this is where something like literature review would be necessary
# Alternatively, we an increase samples_per_step once the difference in performance is no longer visible
# Also an important lever here is p_fail. I.e. we may want it to be higher depending on certain factors (i.e. as codes get better
# we almost want p to be "adaptive")


H, G = gen_random_ldpc(n, k, deg_row=degRowAvg)
env = SwapLDPCEnv(H, mat_to_adj_list(H), degRowAvg,
                  samples_per_step=1_000, p_fail=0.1)
check_env(env, warn=True)


ImportError: attempted relative import with no known parent package