In [1]:
import gym
import gym.spaces
import numpy as np
from scipy.interpolate import interp1d
import numpy as np
from scipy.sparse import csc_matrix, diags
from scipy.sparse.linalg import eigs
from numpy import linalg as LA
from scipy.linalg import expm
import matplotlib.pyplot as plt
from scipy.optimize import minimize
import time

In [2]:
qubits = 2
SPECIAL_CONTROL_ONE = (1+0j)

GATES = {
    'I': lambda qb: tensor(np.array([[1+0j, 0j], [0j, 1+0j]]), qb),
    'H': lambda qb: tensor(1/np.sqrt(2) * np.array([[1+0j, 1+0j], [1+0j, -1+0j]]), qb),
    'X': lambda qb: tensor(np.array([[0j, 1+0j], [1+0j, 0j]]), qb),
    'Y': lambda qb: tensor(np.array([[0j, -1j], [1j, 0j]]), qb),
    'Z': lambda qb: tensor(np.array([[1+0j, 0j], [0j, -1+0j]]), qb),
    'T': lambda qb: tensor(np.array([[1+0j, 0], [0, np.exp(1j * np.pi / 4)]]), qb),
    'RX': lambda theta, qb: tensor(np.array([[np.cos(theta/2)+0j, -1j*np.sin(theta/2)], [-1j*np.sin(theta/2), np.cos(theta/2)+0j]]), qb),
    'RY': lambda theta, qb: tensor(np.array([[np.cos(theta/2)+0j, -np.sin(theta/2)+0j], [np.sin(theta/2)+0j, np.cos(theta/2)+0j]]), qb),
    'RZ': lambda phi, qb: tensor(np.array([[np.cos(phi/2) - 1j*np.sin(phi/2), 0j], [0j, np.cos(phi/2) + 1j*np.sin(phi/2)]]), qb),
    'CNOT': lambda ctrl, tgt: controlled_tensor(np.array([[0, 1], [1, 0]]), ctrl, tgt), # issue with imaginary numbers
    'CZ': lambda ctrl, tgt: controlled_tensor(np.array([[1, 0], [0, -1]]), ctrl, tgt)
}

def tensor(gate, qubit):
    seq = [np.eye(2) for i in range(qubits)]
    
    if qubit >= qubits:
        raise ValueError("Qubit out of range")

    seq[qubit] = gate
    if len(seq) == 1:
        return gate
    elif len(seq) == 2:
        return np.kron(seq[1], seq[0])
    else:
        kron_matrix = np.kron(seq[1], seq[0])
        for s in seq[2:]:
            kron_matrix = np.kron(s, kron_matrix)
        return kron_matrix
        

def controlled_tensor(gate, control, target):
    seq = [np.eye(2) for i in range(qubits)]
    
    if len(seq) == 1:
        raise ValueError("Must have at least two qubits to perform a controlled gate")
    if target >= qubits or control >= qubits:
        raise ValueError("Target or control qubit out of range")

    seq[target] = gate
    seq[control] = [[SPECIAL_CONTROL_ONE, 0],[0,1]]
    
    if len(seq) == 2:
        return np.array(kronecker_product(seq[1], seq[0])).real
    else:
        kron_matrix = kronecker_product(seq[1], seq[0])
        for s in seq[2:]:
            kron_matrix = kronecker_product(s, kron_matrix)
        return np.array(kron_matrix).real # clipping the imaginary part -- could be a problem if doing C-RY, CY, etc.

def kronecker_product(m1, m2):
    w1, h1 = len(m1), len(m1[0])
    w2, h2 = len(m2), len(m2[0])
    return [[
        controlled_product(m1[i1][j1], m2[i2][j2], i1, i2, j1, j2)
        for i1 in range(w1) for i2 in range(w2)]
        for j1 in range(h1) for j2 in range(h2)]

def controlled_product(v1, v2, i1, i2, j1, j2):
    if v1 is SPECIAL_CONTROL_ONE:
        return SPECIAL_CONTROL_ONE if i2==j2 else 0
    if v2 is SPECIAL_CONTROL_ONE:
        return SPECIAL_CONTROL_ONE if i1==j1 else 0
    return v1*v2


In [4]:
GATES['CNOT'](0,1)

array([[1., 0., 0., 0.],
       [0., 0., 0., 1.],
       [0., 0., 1., 0.],
       [0., 1., 0., 0.]])

In [3]:
class Program():
    def __init__(self, qubits):
        self._qubits = qubits
        self._instructions = []
        self._inststr = []
        self._init = np.eye(2**self._qubits)[0]

    def inst(self, *instructions) -> "Program":
        for instruction in instructions:
            if isinstance(instruction, list):
                self.inst(*instruction)
            elif isinstance(instruction, tuple):
                if len(instruction) < 2:
                    raise ValueError("tuple should have at least two elements")
                elif len(instruction) == 2:
                    self._instructions.append(GATES[instruction[0]](instruction[1]))
                else:
                    self._instructions.append(GATES[instruction[0]](instruction[1], instruction[2]))
                self._inststr.append(instruction)

        return self

    def simulate(self) -> np.array:
        _ = self._init
        for g in self._instructions:
            _ = g @ _
        return _

    def __len__(self) -> int:
        return len(self._instructions)

    def __str__(self) -> str:
        """
        A string representation of the matrix program
        """
        return "\n".join([' '.join([str(i) for i in tup]) for tup in self._inststr])

    # def dagger(self):
    #     pass
    # 
    # def pop(self) -> np.array:
    #     res = self._instructions.pop()
    #     return res

In [4]:
# Do a sanity check first

# identify discrete gates on qubit 0
num_angles = 15
qubits = 2
angles = np.linspace(0.0, 2*np.pi, num_angles)
gates = []
gates = [('RX', theta, q) for theta in angles for q in range(qubits)]
gates += [('RZ', theta, q) for theta in angles for q in range(qubits)]
# gates += [('H', q) for q in range(qubits)]
gates += [('CNOT', 0, 1), ('CNOT', 1, 0)] #, ('CZ', 1, 2), ('CZ', 2, 1), ('CZ', 0, 2), ('CZ', 2, 0)]
gates = [('H', 0), ('H', 1), ('T', 0), ('T', 1), ('CNOT', 0, 1), ('CNOT', 1, 0)]
# for g in gates:
#     p = Program(qubits)
#     p.inst(g)
#     wfn = p.simulate()
#     if np.allclose(wfn, np.sqrt(np.array([0.5, 0.5])), atol=1e-2):
#         print("Found |+> state!!")
#         print(p)
#         print("*" * 30)

In [42]:
def bell_state(qb):
    p = Program(qb)
    p.inst(('RY', np.pi/2, 0))
    for i in range(qb-1):
        p.inst(('CNOT', i, i+1))
    wfn = p.simulate()
    dm = np.outer(wfn, wfn)
    state = np.moveaxis(np.stack([dm.real, dm.imag], axis=0), 0, 2)
    interp = interp1d([-1.001, 1.001], [0, 255])
    # return interp(state)
    return wfn

def random_unitary(dim):
    dim = 2**dim
    # follows the algorithm in https://arxiv.org/pdf/math-ph/0609050.pdf
    Z = np.array([np.random.normal(0, 1) + np.random.normal(0, 1) * 1j for _ in range(dim ** 2)]).reshape(dim, dim)
    Q, R = np.linalg.qr(Z)
    diag = np.diagonal(R)
    lamb = np.diag(diag) / np.absolute(diag)
    unitary = np.matmul(Q, lamb)

    assert np.allclose(unitary.conj().T @ unitary, np.eye(dim))

    return unitary @ np.eye(dim)[0] # wfn

def goal_state(qb, n):
    p = Program(qb)
    for i in range(n):
        p.inst(('H', 0))
        p.inst(('T', 0))
    for i in range(n):
        p.inst(('H', 1))
        p.inst(('T', 1))
    p.inst(('CNOT', 0, 1))
    for i in range(n):
        p.inst(('H', 0))
        p.inst(('T', 0))
    for i in range(n):
        p.inst(('H', 1))
        p.inst(('T', 1))
    wfn = p.simulate()
    return wfn

# def Loss(x, target):
#     psi_ansz = x
#     Sz_ansz = abs(np.conj(psi_ansz) @ (Sz[0]@Sz[1]) @ psi_ansz) / 2
#     Sz_ex = abs(np.conj(target) @ (Sz[0]@Sz[1]) @ target) / 2
#     return abs(Sz_ansz - Sz_ex)

def Fidelity(x, target):
    return abs(np.conj(target) @ x)**2

class OneQEnv(gym.Env):
    def __init__(self, gamma=0.8, max_steps=20, qubits=2):
        self.interp = interp1d([-1.001, 1.001], [0, 255])
        self.qubits = qubits
        self.goal = goal_state(self.qubits, 10000) #random_unitary(self.qubits) # bell_state(self.qubits)
        # discount factor
        self.gamma = gamma
        # identify the observation and action spaces
        self.observation_space = gym.spaces.Box(low=0, high=255, shape=(2**self.qubits, 2**self.qubits, 2), dtype=float)
        self._actions = gates
        self.action_space = gym.spaces.Discrete(len(self._actions))
        # the state will be the wavefunction probs
        p = Program(self.qubits)
        for i in range(self.qubits):
            p.inst(('I', i))
        self._program = p
        self._wfn = self._program.simulate()

        dm = np.outer(self._wfn, self._wfn)
        self.state = self.interp(np.moveaxis(np.stack([dm.real, dm.imag], axis=0), 0, 2))
        self.current_step = 0
        self.max_steps = max_steps
        self.info = {}
        
    def step(self, action):
        gate = self._actions[action]
        self._program.inst(gate)
        self._wfn = self._program.simulate()
        dm = np.outer(self._wfn, self._wfn)
        self.state = self.interp(np.moveaxis(np.stack([dm.real, dm.imag], axis=0), 0, 2))
        self.current_step += 1

        # detect if found terminal state
        # reward = max(0, -30*Loss(self._wfn, self.goal)+1)
        reward = Fidelity(self._wfn, self.goal)
        if reward > 0.80:
            done = True
        elif self.current_step >= self.max_steps:
            done = True
            reward = 0
        else:
            done = False
            reward = 0

        return self.state, reward, done, self.info
    
    def reset(self):
        p = Program(self.qubits)
        for i in range(self.qubits):
            p.inst(('I', i))
        self._program = p
        self._wfn = self._program.simulate()
        dm = np.outer(self._wfn, self._wfn)
        self.state = self.interp(np.moveaxis(np.stack([dm.real, dm.imag], axis=0), 0, 2))
        self.current_step = 0
        
        return self.state

In [6]:
from stable_baselines.deepq.policies import CnnPolicy
from stable_baselines.common.policies import ActorCriticPolicy, MlpPolicy
from stable_baselines.common.vec_env import DummyVecEnv
from stable_baselines import PPO2, DQN
from stable_baselines.common.tf_layers import conv, linear, conv_to_fc, lstm
import tensorflow as tf

The TensorFlow contrib module will not be included in TensorFlow 2.0.
For more information, please see:
  * https://github.com/tensorflow/community/blob/master/rfcs/20180907-contrib-sunset.md
  * https://github.com/tensorflow/addons
  * https://github.com/tensorflow/io (for I/O related ops)
If you depend on functionality not listed there, please file an issue.



In [9]:
def custom_cnn(scaled_images, **kwargs):
    activ = tf.nn.relu
    layer_1 = activ(conv(scaled_images, 'c1', n_filters=16, filter_size=2, stride=2, init_scale=np.sqrt(2), **kwargs))
    layer_2 = activ(conv(layer_1, 'c2', n_filters=32, filter_size=2, stride=2, init_scale=np.sqrt(2), **kwargs))
    # layer_3 = activ(conv(layer_2, 'c3', n_filters=64, filter_size=3, stride=1, init_scale=np.sqrt(2), **kwargs))
    layer_3 = conv_to_fc(layer_2)
    layer_4 = activ(linear(layer_3, 'fc1', n_hidden=256, init_scale=np.sqrt(2)))
    return activ(linear(layer_4, 'fc2', n_hidden=128, init_scale=np.sqrt(2)))

class CustomPolicy(ActorCriticPolicy):
    def __init__(self, sess, ob_space, ac_space, n_env, n_steps, n_batch, reuse=False, layers=None, net_arch=None,
                 act_fun=tf.tanh, cnn_extractor=custom_cnn, feature_extraction="cnn", **kwargs):
        super(CustomPolicy, self).__init__(sess, ob_space, ac_space, n_env, n_steps, n_batch, reuse=reuse,
                                                scale=(feature_extraction == "cnn"))

        self._kwargs_check(feature_extraction, kwargs)

        with tf.variable_scope("model", reuse=reuse):
            if feature_extraction == "cnn":
                pi_latent = vf_latent = cnn_extractor(self.processed_obs, **kwargs)

            self._value_fn = linear(vf_latent, 'vf', 1)

            self._proba_distribution, self._policy, self.q_value = \
                self.pdtype.proba_distribution_from_latent(pi_latent, vf_latent, init_scale=0.01)

        self._setup_init()

    def step(self, obs, state=None, mask=None, deterministic=False):
        if deterministic:
            action, value, neglogp = self.sess.run([self.deterministic_action, self.value_flat, self.neglogp],
                                                   {self.obs_ph: obs})
        else:
            action, value, neglogp = self.sess.run([self.action, self.value_flat, self.neglogp],
                                                   {self.obs_ph: obs})
        return action, value, self.initial_state, neglogp

    def proba_step(self, obs, state=None, mask=None):
        return self.sess.run(self.policy_proba, {self.obs_ph: obs})

    def value(self, obs, state=None, mask=None):
        return self.sess.run(self.value_flat, {self.obs_ph: obs})

In [43]:
env = OneQEnv()
env_vec = DummyVecEnv([lambda: env])

# https://medium.com/aureliantactics/understanding-ppo-plots-in-tensorboard-cbc3199b9ba2
model = PPO2(CustomPolicy, env_vec, verbose=0, gamma=0.9) #, tensorboard_log="./circuit_tensorboard/")

model.learn(total_timesteps=100000)

# obs = env_vec.reset()
# for i in range(1000):
#     action, _states = model.predict(obs)
#     obs, rewards, dones, info = env_vec.step(action)

<stable_baselines.ppo2.ppo2.PPO2 at 0x22e4f91c208>

In [44]:
def wfn_to_dm(wfn):
    dm = np.outer(wfn, wfn)
    state = np.moveaxis(np.stack([dm.real, dm.imag], axis=0), 0, 2)
    interp = interp1d([-1.001, 1.001], [0, 255])
    return interp(state)

In [45]:
# program = Program(I(0)).inst(I(1))
# init_wfn = wfn_sim.wavefunction(program)
# dm = np.outer(init_wfn, init_wfn)
# state = wfn_to_dm(init_wfn)
# print(state)
# optimal_action, next_state = model.predict(state)
# prog = Program(gates[optimal_action])
# wfn = wfn_sim.wavefunction(prog)
# print(wfn)
# print(prog)

In [46]:
done = False
env.reset()
prog = Program(2)
prog.inst(('I', 0)).inst(('I', 1))
wfn = prog.simulate()
obs = wfn_to_dm(wfn)

while not done:
    optimal_action, _ = model.predict(obs)
    print(' '.join([str(i) for i in gates[optimal_action]]))
    prog.inst(gates[optimal_action])
    obs, rewards, done, info = env.step(optimal_action)
    # print(rewards,done)
    
wfn = prog.simulate()
print(f"Wavefunction: {wfn}")
print(Fidelity(env.goal, wfn))
# print(Loss(wfn, goal_state()))
# Sz_ansz = abs(np.conj(wfn) @ (Sz[0]@Sz[1]) @ wfn) / 2
# Sz_ex = abs(np.conj(goal_state()) @ (Sz[0]@Sz[1]) @ goal_state()) / 2
# print(abs(Sz_ansz - Sz_ex), Sz_ansz, Sz_ex)
# print(f"Density Matrix: {wfn_to_dm(wfn)}")

H 0
H 1
Wavefunction: [0.5+0.j 0.5+0.j 0.5+0.j 0.5+0.j]
0.8495185168393018


In [83]:
_program = Program(2)
_program.inst(('RY', np.pi/2, 0)).inst(('CNOT', 0, 1))
_wfn = _program.simulate()
#         self.state = self._wfn.amplitudes
print(_wfn)
# dm = np.outer(_wfn, _wfn)
# state = np.moveaxis(np.stack([dm.real, dm.imag], axis=0), 0, 2)

[0.70710678+0.j 0.        +0.j 0.        +0.j 0.70710678+0.j]
