# RL agent for unitary gate building

We want to try and implement a RL agent (following various algorithms) that learn how to build a target unitary gate in a spin chain with control on the single qubits. The agent builds a parametrized circuit of N layers (with N initially fixed) and with constant structure (but variable parameters).

See also: https://github.com/ManuelGuatto/RL_4_Robust_QC/blob/main/PPO_Nominal_Damp.ipynb

In [1]:
import sys
print("Python version:", sys.version)
import torch

Python version: 3.10.11 (v3.10.11:7d4cc5aa85, Apr  4 2023, 19:05:19) [Clang 13.0.0 (clang-1300.0.29.30)]


In [2]:
from functools import partial
import pennylane as qml
import matplotlib.pyplot as plt
import numpy as np
import timeit
import gymnasium as gym
import stable_baselines3
from gymnasium import spaces
import qutip as q

## Define functions:

Here I use pennylane but all could be accompliched with just numpy. I define layers X rotations, parametrized by the angles, and free evolution times, parametrized by the time. The agent will pile these up in the attempt to find the target gate which in this example is the CNOT gate and the control is on a single qubit.

In [3]:
num_wires = 2

X, Y, Z = qml.PauliX, qml.PauliY, qml.PauliZ

one = 1/2*(Z(wires= [0]) + qml.Identity(wires=[0]))
two = 1/2*(-Z(wires= [0]) + qml.Identity(wires=[0]))


Id = qml.Identity(wires=[0,1]).matrix()
#param0 = -0.158
#param1 = 0.110
#param2 = -(0.158+0.152)

ops =  [one @ (Z(1))]
ops2 = [two @ ((X(1))) ]
ops3 = [two @ ((Z(1))) ]
#ops3 = [ qml.Identity(wires = 0) @ Z(1)]

couplings = [0.1]
couplings2 = [0.2]
couplings3=[0.01]

H = qml.dot(couplings,ops)+qml.dot(couplings2,ops2)+qml.dot(couplings3,ops3)
#H = ops + ops2
#H0 = qml.dot(couplings2,ops2)
#H1 = qml.dot(couplings3,ops3)

def Ufree(T):
    evo = qml.exp(H, -1j * T)
    return evo.matrix()

def Urot(params):

    ops = qml.Rot(params[0],params[1],wires=0) @ qml.Identity(wires = 1)
    return ops.matrix()

def PulseX(params):

    ops = qml.RX(params,wires=0) @ qml.Identity(wires = 1)
    return ops.matrix()

def PulseY(params):

    ops = qml.RY(params,wires=0) @ qml.Identity(wires = 1)
    return ops.matrix()

# In here, the params[0] is the Rabi frequency times the duration time, i.e. it is directly the rotation angle. The other 2 parameters are the phases
def MWcontrol(params):
    coeff=[params[0]*np.cos(params[1]),params[0]*np.sin(params[1])]
    op = [X(0) @ qml.Identity(1),Y(0) @ qml.Identity(1)]
    Hmw = qml.dot(coeff,op)

    ops = qml.exp(Hmw, -1j)
    return ops.matrix()



#one
#two
#ops3
#q.Qobj(H.matrix())
#MW=q.Qobj(MWcontrol([np.pi/2,np.pi]))
#MW*M
#q.Qobj(H.matrix())

In [4]:
num_wires = 2

X, Y, Z = qml.PauliX, qml.PauliY, qml.PauliZ

proj_0 = 1/2*(Z(wires= [0]) + qml.Identity(wires=[0]))
proj_1 = 1/2*(-Z(wires= [0]) + qml.Identity(wires=[0]))


Id = qml.Identity(wires=[0,1]).matrix()
#param0 = -0.158
#param1 = 0.110
#param2 = -(0.158+0.152)

ops =  [proj_0 @ (Z(1)/2),proj_1 @ (Z(1)/2),proj_1 @ (X(1)/2)]

couplings = [-2*np.pi*0.158,-2*np.pi*(0.158-0.152),-2*np.pi*0.110]

H = qml.dot(couplings,ops)

def Ufree(T):
    evo = qml.exp(H, -1j * T)
    return evo.matrix()

def UpulseV2(params):
    # parameters are: [time interval duration, phase]
    
    rabi_f = 0.5*2*np.pi
    coeff=[rabi_f*np.cos(params[1]),rabi_f*np.sin(params[1])]

    op = [X(0) @ qml.Identity(1),Y(0) @ qml.Identity(1)]
    Hmw = qml.dot(coeff,op)
    
    Htot = H+Hmw
    
    deltaT = np.abs(params[0])

    evo = qml.exp(Htot, -1j * deltaT)
    
    return evo.matrix()


In [5]:
def suter_layer(params):
    # Here we can define a layer as an evolution without pulse control follow by a pulse control with fixed Rabi frequency
    U = UpulseV2([params[1],params[2]]) @ Ufree(params[0])
    return U

In [6]:
def layer(params):
    U = MWcontrol([params[1],params[2]]) @ Ufree(params[0])
    return U


def profit(op_mat,target_mat):
    """Compute the fidelity function for given parameters."""
    #op_mat = param_circ(Nfree,params)
    # Compute the fidelity between the target and the gate
    return np.abs(np.trace(target_mat.conj().T @ ( op_mat))) / 2**num_wires
#np.abs(np.trace(target_mat.conj().T @ op_mat)) / np.abs(np.trace(target_mat.conj().T @ target_mat)) #


#def profit2(op_mat,target_mat,alpha,duration):
    """Compute the fidelity function for given parameters."""
    #op_mat = param_circ(Nfree,params)
    #total_time = 0
    #for i in range(Nfree):
    #    total_time= total_time+params[i]
    # Compute the fidelity between the target and the gate
#    return np.abs(np.trace(target_mat.conj().T @ op_mat)) / 2**num_wires-alpha*duration 

#profit(target2,target2)

## Define the environment:

Here I use a continuos action space of shape (4,), where the first parameter is the time of free evolution while the other thee the Raboi frequency (times pulse duration) and the two phases of teh MW control.


The observtaion space could be defined as a dictionary in which to include the real and imaginary part of the gate matrix elemenets as vectors (16,). However, here I concatenate the vectors representing the real and imaginary part in a unique vector (32,). The entries of the observation space are already symmetric and normalized otherwise I would need to normalize them as for best practices. The action space is discrete since it should in principle be easier for the agent to navigate (but can be made continuos using the Box space and normalizing as for best practises).

In [57]:
class Parametric_env(gym.Env):
    """
    Custom Environment that follows gym interface.
    """

    #metadata = {"render_modes": ["console"]}

    MAX_STEPS = 3
    INFIDELITY_THRESHOLD = 0.001


    def __init__(self, env_conf):
        #super(Parametric_env, self).__init__()
        #self.render_mode = render_mode

        self.target = env_conf["Target"]#qml.Toffoli([0, 1, 2]).matrix()
        self.alpha = env_conf["Lagrange_time"]

        self.U = Id

        #Define the action and observation spaces
        #self.action_space = spaces.Box(low=0, high=np.array([0.02, 
        #                                                     2*np.pi, 2*np.pi, 2*np.pi, 2*np.pi, 
        #                                                     2*np.pi, 2*np.pi, 2*np.pi, 2*np.pi, 
        #                                                     2*np.pi]), dtype=np.float32)
        #Here we normalize the action space
        self.action_space = spaces.Box(low=-1, high= 1, shape=(3,), dtype=np.float32)
        
        #self.action_space = spaces.Dict(
        #    {
        #        "angle_Z": spaces.Discrete(5, start=-2, seed=42),
        #        "angle_X": spaces.Discrete(5, start=-2, seed=42),
        #    }
        #)
        #self.action_space = spaces.MultiDiscrete([5,5],start=[-2,-2])
        #self.action_space = spaces.MultiDiscrete([10,9,9,9])
        self.observation_space = spaces.Box(low=-1, high=1, shape=(32,), dtype=np.float64)

        #self.observation_space = spaces.Dict(
        #    {
        #        "real_part": spaces.Box(low=-1, high=1, shape=(4,), dtype=np.float64),
        #        "imaginary_part": spaces.Box(low=-1, high=1, shape=(4,), dtype=np.float64),
        #    }
        #)

    # Function that gets the state and gives the observation in the correct format
    def _get_obs(self):
        #return {"real_part": np.real(self.U), "imaginary_part": np.imag(self.U)}
        obs = np.concatenate([np.real(self.U.reshape(16,)),np.imag(self.U.reshape(16,))])
        return obs
    #def _get_obs(self):
    #    #return {"real_part": np.real(self.U), "imaginary_part": np.imag(self.U)}
    #    return {"real_part": np.real(self.U.reshape(4,)), "imaginary_part": np.imag(self.U.reshape(4,))}
    
    def reset(self,seed=None, options=None):
        super().reset(seed=seed)
        self.U = Id
        #self.rho_hat = self.rho
        #self.true_fidelity = 0
        self.fidelity = profit(self.U,self.target)#0
        self.count = 0
        self.reward = 0
        self.done = False
        self.duration = 0
        observation = self._get_obs()
        self.info = {}

        return observation,{}
    
    # Function to convert the action sampled from action_space into angles for the rotations in the layer
    def _get_angle(self,para):
        az = np.pi*para
        
        return az
    
    # Function to renormlize the times in between [0,10]
    def _get_times(self,para):
        az = 1*np.abs(para)   #TIMES##########################################
        
        return az
    # Function to renormlize the times in between [0,10]
    def _get_times2(self,para):
        az = 1.6*np.abs(para)   #TIMES##########################################
        
        return az



    def step(self, action):
        #Check the state of the episode
        if self.done:
            print("EPISODE DONE!!!")
        elif (self.count == self.MAX_STEPS):
            self.done = True;
            #truncated = self.done; 
        else:
            assert self.action_space.contains(action)
            self.count += 1
            #self.duration += 4*np.pi*np.abs(action[0])
            
            
        #Define the effect of the unitary layer
        self.params = [self._get_times(action[0]),self._get_times2(action[1]),self._get_angle(action[2])] 
        op = suter_layer(self.params)

        Upre = self.U

        self.U = op @ self.U  ###!
        #print("unitary=", self.U)

        #Compute the reward
        #fid0 = self.fidelity
        #fid = profit(self.U,self.target)
        #fid = profit2(self.U,self.target,self.alpha,self.duration)
        self.fidelity = profit(self.U,self.target)
        
        #if self.done:
        #    self.reward = self.fidelity
        #else:
        #    self.reward = 0
        #self.reward = fid#-fid0#self.fidelity
        self.info = {"Fidelity": self.fidelity,"Pre":Upre,"Next":self.U}

        if 1-self.fidelity < self.INFIDELITY_THRESHOLD:
            self.done = True

        # We give reward only at the end of the episode
        if self.done:
            self.reward =  -np.log(1-self.fidelity) #-0.05*self.count
        else:
            self.reward = 0

        #Rewavrds at each step
        #self.reward = self.fidelity - fid0
        observation = self._get_obs()

        
        #try:
        #    assert self.observation_space.contains(self.rho)
        #except AssertionError:
        #    print("INVALID STATE", self.rho)
        #terminated = self.done
        return [observation, self.reward, self.done,self.done, self.info]

   

        
    def close(self):
        pass

## Check the compatibility of the environment with stable baseline 3:

In [58]:
from stable_baselines3.common.env_checker import check_env

In [59]:
rho_target = Id
alpha = 1

In [60]:
env = Parametric_env(env_conf={"Target": rho_target,"Lagrange_time":alpha})
# If the environment don't follow the interface, an error will be thrown
check_env(env, warn=True)

## Define target, instanciate the environment, and train a chosen model:

In [61]:
target = qml.CNOT(wires=[0,1]).matrix()
target2 = (qml.Identity(0) @ qml.Hadamard(wires=[1])).matrix()
alpha = 0
target_gate = target2
target_gate
#qml.RX(params,wires=0) @ qml.Identity(wires = 1) qml.Identity(0).matrix() @

array([[ 0.70710678,  0.70710678,  0.        ,  0.        ],
       [ 0.70710678, -0.70710678,  0.        , -0.        ],
       [ 0.        ,  0.        ,  0.70710678,  0.70710678],
       [ 0.        , -0.        ,  0.70710678, -0.70710678]])

In [62]:
from stable_baselines3 import PPO, A2C, DQN, TD3, DDPG
from stable_baselines3.common.env_util import make_vec_env

# Instantiate the env
#env_conf=dict(Target=Id,Lagrange_time=alpha)

vec_env = make_vec_env(lambda:Parametric_env(env_conf={"Target": target_gate,"Lagrange_time":alpha})
, n_envs=1)

#vec_env = make_vec_env(Parametric_env, n_envs=1, env_kwargs=dict(Target=Id,Lagrange_time=alpha))

In [63]:
%%time

env = Parametric_env(env_conf={"Target": target_gate,"Lagrange_time":alpha})

#policy, env, 
#learning_rate=0.0003, 
#n_steps=2048, 
#batch_size=64, 
#n_epochs=10, 
#gamma=0.99, 
#gae_lambda=0.95, 
#clip_range=0.2, 
#clip_range_vf=None, 
#normalize_advantage=True, 
#ent_coef=0.0, 
#vf_coef=0.5, 
#max_grad_norm=0.5, 
#use_sde=False, 
#sde_sample_freq=-1, 
#rollout_buffer_class=None, 
#rollout_buffer_kwargs=None, 
#target_kl=None, 
#stats_window_size=100, 
#tensorboard_log=None, 
#policy_kwargs=None, 
#verbose=0, 
#seed=None, 
#device='auto',
#_init_setup_model=True)

# Best so far: model = PPO("MlpPolicy", env, gamma=0.9998,n_steps=1024,batch_size=64,clip_range=0.1, n_epochs=10,ent_coef=0.00, verbose=1).learn(4000_000)
# with parameters: t = 7# number = 5

# Train the agent
#MlpPolicy
model = PPO("MlpPolicy", env, learning_rate=0.00005,gamma=0.9996,n_steps=2048,batch_size=256,clip_range=0.2, n_epochs=10,ent_coef=0.001, verbose=1,tensorboard_log="./ppo_cartpole_tensorboard/").learn(4000_000)



Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.
Logging to ./ppo_cartpole_tensorboard/PPO_30
---------------------------------
| rollout/           |          |
|    ep_len_mean     | 3        |
|    ep_rew_mean     | 0.528    |
| time/              |          |
|    fps             | 1429     |
|    iterations      | 1        |
|    time_elapsed    | 1        |
|    total_timesteps | 2048     |
---------------------------------
-------------------------------------------
| rollout/                |               |
|    ep_len_mean          | 3             |
|    ep_rew_mean          | 0.676         |
| time/                   |               |
|    fps                  | 1368          |
|    iterations           | 2             |
|    time_elapsed         | 2             |
|    total_timesteps      | 4096          |
| train/                  |               |
|    approx_kl            | 0.00027953924 |
|    clip_fraction        | 0        

## Test the trained model:

In [56]:
# Test the trained agent
# using the vecenv
obs = vec_env.reset()
n_steps = 8
for step in range(n_steps):
    action, _ = model.predict(obs, deterministic= False)
    print(f"Step {step + 1}")
    print("Action: ", [action[0][i] for i in range(3)])
    obs, reward, done, info = vec_env.step(action)
    #print("obs=", obs, "reward=", reward, "done=", done)
    print("reward=", reward,"fidelity=",info[0]["Fidelity"], "done=", done)
    #vec_env.render()
    if done:
        # Note that the VecEnv resets automatically
        # when a done signal is encountered
        if reward > 0.99:
            print("Goal reached", "Fidelity=", info[0])
        else:
            print(" Max number of layers", "Fidelity=", info)
        break

Step 1
Action:  [0.0045866724, 0.11103537, 1.0]
reward= [0.] fidelity= 0.04699464632712025 done= [False]
Step 2
Action:  [-0.009253545, 1.0, 1.0]
reward= [0.] fidelity= 0.400783813301019 done= [False]
Step 3
Action:  [0.102987215, -1.0, 1.0]
reward= [0.] fidelity= 0.2822786015887279 done= [False]
Step 4
Action:  [0.056610852, -1.0, -1.0]
reward= [3.7939458] fidelity= 0.9774933790758147 done= [ True]
Goal reached Fidelity= {'Fidelity': 0.9774933790758147, 'Pre': array([[-0.13447503-0.25659702j, -0.00260434-0.15362235j,
         0.68060557-0.45129769j,  0.47436824-0.0234318j ],
       [ 0.00260434-0.15362235j, -0.13447503+0.25659702j,
         0.47436824+0.0234318j , -0.68060557-0.45129769j],
       [ 0.70271447-0.44047433j,  0.45176267+0.02351732j,
        -0.17485094-0.22247576j,  0.00206872-0.16570933j],
       [ 0.45176267-0.02351732j, -0.70271447-0.44047433j,
        -0.00206872-0.16570933j, -0.17485094+0.22247576j]]), 'Next': array([[ 0.01889487-0.8228448j , -0.01086648-0.56280273j

In [490]:
!tensorboard --logdir ./ppo_cartpole_tensorboard/

TensorFlow installation not found - running with reduced feature set.
Serving TensorBoard on localhost; to expose to the network, use a proxy or pass --bind_all
TensorBoard 2.17.1 at http://localhost:6006/ (Press CTRL+C to quit)
^C
