# Recurrent PPO for quantum control, Model Free
In this simulation we are going to test the PPO alorithm in a simulation of a quantum environment

## Import dependencies

In [None]:
import gym, scipy.linalg, tensorboard
import qiskit
import qiskit.quantum_info as qi
from qiskit.quantum_info.states import DensityMatrix
from qiskit.quantum_info import state_fidelity
import math
import numpy as np
from gym.spaces.box import Box
import matplotlib.pyplot as plt
import seaborn as sns; sns.set_theme()
import pandas as pd
import os
import optuna
import plotly.express as px

## Define the enviroment 

In this environment we will return as state the control and the observation. Due to this the control will start at time 1 and not at time 0. In this environment we keep as reward the fidelity.

In [None]:
class Quantum_dynamics(gym.Env):

    MAX_STEPS = 500
    FIDELITY_THRESHOLD = 0.95

    def __init__(self, env_config):

        #Define the action and observation spaces
        self.action_space = Box(-1,1,shape=(1,), dtype=np.float64)
        self.observation_space = Box(low=-1, high=1, shape=(3, 3), dtype=np.float64)

        #Define the epsilon variable for the measurement 
        self.eps = env_config["eps"]
        ##Define the probability of the noise
        self.gamma = env_config["alpha"]
        gamma_1 = 0
        gamma_2 = self.gamma/2
        gamma_3 = self.gamma/2
        #Define the target state
        self.rho_target = env_config["target"]

        #Define the measurement operators
        self.meas_operators = {
                                "M0": np.matrix([[np.sqrt(1-2*(self.eps)), 0, 0], [0, np.sqrt(self.eps), 0], [0, 0, np.sqrt(self.eps)]]),
                                "M1": np.matrix([[np.sqrt(self.eps), 0, 0], [0, np.sqrt(1-2*(self.eps)), 0], [0, 0, np.sqrt(self.eps)]]),
                                "M2": np.matrix([[np.sqrt(self.eps), 0, 0], [0, np.sqrt(self.eps), 0], [0, 0, np.sqrt(1-2*(self.eps))]]),
                              }
        self.meas_outcomes = np.array([0,1,2])
        
        self.projective_oper = {
                                "M0": np.matrix([[1, 0, 0], [0, 0, 0], [0, 0, 0]]),
                                "M1": np.matrix([[0, 0, 0], [0, 1, 0], [0, 0, 0]]),
                                "M2": np.matrix([[0, 0, 0], [0, 0, 0], [0, 0, 1]]),
                              }
        
        #Define flip channel kraus representation 
        self.K = {"k0": np.matrix([[1,0,0], [0, np.sqrt(1-gamma_1), 0], [0, 0, np.sqrt(1-gamma_1-gamma_2)]]),
                  "k01": np.matrix([[0,np.sqrt(gamma_1),0], [0, 0, 0], [0, 0, 0]]),
                  "k03": np.matrix([[0,0,0], [0, 0, 0], [0, 0, gamma_3]]),
                  "k12": np.matrix([[0, 0, 0], [0, 0, np.sqrt(gamma_2)], [0, 0, 0]])}
        
        #Hamiltonian components
        self.a = np.matrix([[0,1,0], [0,0,1], [0,0,0]])
        
    def reset(self):
        self.rho = qi.random_density_matrix(3)
        self.fidelity = 0
        self.count = 0
        self.reward = 0
        self.done = False
        self.info = {}
        
        #Compute the measurement
        outcome = self.measurement()

        # Initialization of the state as vector with the 1st entry --> control, 2nd entry -->outcome
        self.state = np.array([0])
        self.state = np.append(self.state, outcome)
        return self.state

    def step(self, action):
        if action[1] < 0:
            self.done = 0
        else:
            self.done = 1
        if self.done:
            self.beta = action[0]
            outcome = self.measurement()
            if outcome == 2:
                self.reward = 1
            else:
                self.reward = -1
                
            #Compute the reward
            self.fidelity = self.compute_fidelity()
            
            self.info = {"Fidelity": self.fidelity}
            
            #Build the state of the agent
            self.state[0] = self.beta
            self.state[1] = outcome
            
            return [self.state, self.reward, True, self.info]
            
        elif (self.count == self.MAX_STEPS):
            self.done = True;
            self.reward = -1
            
            #Compute the reward
            self.fidelity = self.compute_fidelity()
            
            self.info = {"Fidelity": self.fidelity}
            #Build the state of the agent
            return [self.state, self.reward, self.done, self.info]
        else:
            #print(action)
            #assert self.action_space.contains(action)
            self.count += 1

    
        #Compute the effect of the noise
        rho = self.rho
        rho_prime = np.zeros((3,3)).astype(complex)
        for k_i in self.K.values():
            rho_prime += k_i*rho*k_i.getH()
            
        self.rho = rho_prime
        self.rho = DensityMatrix(self.rho) 
        
        #Define the effect of the unitary operator
        self.beta = action[0]
        a_dagger = np.transpose(np.conjugate(self.a))
        self.control_hamiltonian = (1j)*np.matrix(self.beta*a_dagger - np.conj(self.beta)*self.a)
        self.u = np.matrix(scipy.linalg.expm((-1j)*self.control_hamiltonian))
        op = qi.Operator(self.u)

        # rho prime --> rho after the impulse control
        self.rho = self.rho.evolve(op)

        #Compute the measurement
        outcome = self.measurement()
        
        #Assign the reward
        self.reward = 0
        
        self.info = {"Fidelity": 0}
        
        #Build the state of the agent
        self.state[0] = self.beta
        self.state[1] = outcome
        return [self.state, self.reward, self.done, self.info]
    
    def measurement(self):

        #Initialize the probabilities
        self.outcomes_probabilities = np.array([1/3, 1/3, 1/3])

        if self.done:
            #Compute the probabilities for each m outcome
            for m, M_m in enumerate(self.projective_oper.values()):
                M_m_dagger = np.transpose(np.conjugate(M_m))
                self.outcomes_probabilities[m] = np.trace(M_m_dagger*M_m*np.matrix(self.rho))
            add_prob = 1-sum(self.outcomes_probabilities)
            self.outcomes_probabilities[1] += add_prob

            #Do the measure
            try:
                out = np.random.choice(self.meas_outcomes, 1, p=self.outcomes_probabilities)
            except ValueError:
                print("Probability error " + str(self.rho))
            
        else:
            #Compute the probabilities for each m outcome
            for m, M_m in enumerate(self.meas_operators.values()):
                M_m_dagger = np.transpose(np.conjugate(M_m))
                self.outcomes_probabilities[m] = np.trace(M_m_dagger*M_m*np.matrix(self.rho))
            add_prob = 1-sum(self.outcomes_probabilities)
            self.outcomes_probabilities[1] += add_prob

            #Do the measure
            try:
                out = np.random.choice(self.meas_outcomes, 1, p=self.outcomes_probabilities)
            except ValueError:
                print("Probability error " + str(self.rho))

            key_list=list(self.meas_operators.keys())
            key = key_list[out[0]]
            M_m = self.meas_operators[key]

            #Effect of the measurement on the state
            self.rho = np.matrix((M_m*np.matrix(self.rho)*M_m.getH())/np.trace(M_m*np.matrix(self.rho)*M_m.getH()))
        
        return out

    def compute_fidelity(self):
        rho = DensityMatrix(self.rho)
        target = DensityMatrix(self.rho_target)
        fidelity = state_fidelity(rho, target)
        
        if fidelity > self.FIDELITY_THRESHOLD:
            self.done = True

        return fidelity

## Import the PPO

In [None]:
from sb3_contrib import RecurrentPPO
from datetime import datetime

## Start train

In [None]:
def evaluate_policy(model, env, episodes = 10):
    agent = model
    fidelity = np.zeros(episodes)
    rewards = np.zeros(episodes)
    length = np.zeros(episodes)
    for ep in range(episodes):
        fidelity_ep = 0
        length_ep = 0
        obs = env.reset()
        done = False
        lstm_states = None
        num_envs = 1
        episode_starts = np.ones((num_envs,), dtype=bool)
        while not done:
            length_ep +=1
            action, lstm_states = agent.predict(obs, state = lstm_states, episode_start=episode_starts, deterministic=True)
            obs, reward, done, info = env.step(action)
            episode_starts = done
            if done:
                fidelity_ep = info["Fidelity"]
                fidelity[ep] = fidelity_ep
                length[ep] = length_ep
                rewards[ep] = reward
    fidelity_mean = np.mean(fidelity)
    len_mean = np.mean(length)
    reward_mean = np.mean(rewards)
    return [fidelity_mean, len_mean, reward_mean, fidelity, length, rewards]

In [None]:
# Select the epsilon for which we have to train a model
eps = np.array([0.1, 0.15, 0.175, 0.2, 0.25, 0.3])
#Target state
rho_target = np.matrix([[0, 0, 0],[0,0,0], [0,0,1]])
#Training noise
alpha_train = 0

#Define total training timesteps
training_steps = 400_000

In [None]:
#We are creating a model for each epsilon that we want to test, the entire training is without noise
evaluate_episodes = 10
df_train = pd.DataFrame(columns = ['epsilon', 'alpha', 'reward', 'timesteps', 'fidelity'])
df_train_mean = pd.DataFrame(columns = ['epsilon', 'alpha', 'reward', 'reward_std', 'timesteps', 'timesteps_std', 'fidelity', 'fidelity_std'])
for epsilon in eps:
    env_config = {"target": rho_target,
             "eps": epsilon,
             "alpha":alpha_train}
    env = Quantum_dynamics(env_config)
    model = RecurrentPPO("MlpLstmPolicy", env, learning_rate = 3e-4, verbose = 0,
                         batch_size = 512 , n_steps=512, tensorboard_log="./QOMDP_Depolarizing")
    
    print("##### START TRAINING FOR EPSILON = " + str(epsilon)+ " #####")
    now = datetime.now()
    current_time = now.strftime("%H:%M:%S")
    print("Starting_time =", current_time)
    model.learn(total_timesteps=training_steps, tb_log_name="LSTM_train_alpha_0_eps"+str(epsilon*100))
    
    #Test the agent
    mean_fidelity, mean_length, mean_reward, fidelity_per_episode, length_per_episode,reward_per_episode = evaluate_policy(model, env, episodes = evaluate_episodes)

    std_fidelity = np.std(fidelity_per_episode)
    std_reward = np.std(reward_per_episode)
    std_length = np.std(length_per_episode)


    df_session = pd.concat([pd.DataFrame([[epsilon,
                                           alpha_train,
                                           reward_per_episode[i],
                                           length_per_episode[i],
                                           fidelity_per_episode[i]]],
                                           columns = ['epsilon', 'alpha', 'reward', 'timesteps', 'fidelity']) 
                                           for i in range(evaluate_episodes)],
                                        ignore_index = True)
    df_train = pd.concat([df_train, df_session], ignore_index = True)
            
    df_session_mean = pd.concat([pd.DataFrame([[epsilon,
                                                alpha_train,
                                                mean_reward,
                                                std_reward,
                                                mean_length,
                                                std_length,
                                                mean_fidelity,
                                                std_fidelity]],
                                                columns = ['epsilon', 'alpha', 'reward', 'reward_std', 'timesteps', 'timesteps_std', 'fidelity', 'fidelity_std'])],
                                          ignore_index = True)
            
    df_train_mean = pd.concat([df_train_mean, df_session_mean], ignore_index = True)

    
    print("## EVALUATE REWARD FOR EPSILON = " + str(epsilon)+": mean fidelity: " + str(mean_fidelity) +": std fidelity: " +
         str(std_fidelity) + " #####")
    print("## EVALUATE LENGTH FOR EPSILON = " + str(epsilon)+": mean length: " + str(mean_length) +": std length: " +
         str(std_length) + " #####")
    
    # Save a trained model to a file
    print("--> Saving the model")
    model.save("LSTM_eps_"+str(epsilon*100)+"_train")
    print("--> Model saved in  a file")

    now = datetime.now()
    current_time = now.strftime("%H:%M:%S")
    print("Ending time =", current_time)

    print("##### END TRAINING #####")
    

In [None]:
df_train_mean

## Test the agent

In [None]:
def test_agent(epsilon_test, alpha_test, target = np.matrix([[0, 0, 0],[0,0,0], [0,0,1]]),  episodes = 100):
    df_test = pd.DataFrame(columns = ['epsilon', 'alpha', 'reward', 'timesteps', 'fidelity'])
    df_test_mean = pd.DataFrame(columns = ['epsilon', 'alpha', 'reward', 'reward_std', 'timesteps', 'timesteps_std', 'fidelity', 'fidelity_std'])
    evaluate_episodes = episodes
    for epsilon in epsilon_test:
        for alpha in alpha_test:
            #Configure the environement for the test
            env_config = {"target": target,
                 "eps": epsilon,
                 "alpha":alpha}
            #Build the environment
            env = Quantum_dynamics(env_config)
            #Load the trained agent
            agent = RecurrentPPO.load("LSTM_eps_"+str(epsilon*100)+"_train")
            print("##### START TEST FOR AGENT EPSILON: " + str(epsilon)+ " ALPHA: "+ str(alpha) + " #####")
            now = datetime.now()
            current_time = now.strftime("%H:%M:%S")
            print("Starting_time =", current_time)
            
            #Test the agent
            mean_fidelity, mean_length, mean_reward, fidelity_per_episode, length_per_episode, reward_per_episode = evaluate_policy(agent, env, episodes)

            std_fidelity = np.std(fidelity_per_episode)
            std_reward = np.std(reward_per_episode)
            std_length = np.std(length_per_episode)
            #percentage_solved_episodes = sum(reward > 0.95 for reward in reward_per_episode)
            #percentage_solved_episodes = percentage_solved_episodes/evaluate_episodes
            
            #Create the dataframe
            df_session = pd.concat([pd.DataFrame([[epsilon,
                                                alpha,
                                                reward_per_episode[i],
                                                length_per_episode[i],
                                                fidelity_per_episode[i]]],
                                                columns = ['epsilon', 'alpha', 'reward', 'timesteps', 'fidelity']) 
                                                for i in range(evaluate_episodes)],
                                      ignore_index = True)
            df_test = pd.concat([df_test, df_session], ignore_index = True)
            
            df_session_mean = pd.concat([pd.DataFrame([[epsilon,
                                                    alpha,
                                                    mean_reward,
                                                    std_reward,
                                                    mean_length,
                                                    std_length,
                                                    mean_fidelity,
                                                    std_fidelity]],
                                                    columns = ['epsilon', 'alpha', 'reward', 'reward_std', 'timesteps', 'timesteps_std', 'fidelity', 'fidelity_std'])],
                                          ignore_index = True)
            
            df_test_mean = pd.concat([df_test_mean, df_session_mean], ignore_index = True)

            
            print("## TEST RESULTS:  mean fidelity: " + str(mean_fidelity) +": std fidelity: " +
                 str(std_fidelity) + " #####")
            
            now = datetime.now()
            current_time = now.strftime("%H:%M:%S")
            print("Ending time =", current_time)

            print("##### END TEST #####")
            
    return df_test, df_test_mean

In [None]:
#Define the alpha for the test
alpha_test = np.array([0, 0.1, 0.2, 0.3, 0.4, 0.5, 0.6, 0.7, 0.8, 0.9, 1])

evaluate_episodes = 100

df_test, df_test_mean = test_agent(epsilon_test = eps, alpha_test = alpha_test, episodes = evaluate_episodes)

In [None]:
#Visualize the results of the test
for i, epsilon in enumerate(eps):
    i = i+1
    df_eps = df_test[(i-1)*evaluate_episodes*len(alpha_test):(i)*evaluate_episodes*len(alpha_test)]
    fig = px.box(df_eps, x="alpha", y="fidelity", points="all", title="Fidelity distribution test session with epsilon: "+
                str(epsilon))
    fig.show()

In [None]:
#Visualize the results of the test
for i, epsilon in enumerate(eps):
    i = i+1
    df_eps = df_test[(i-1)*evaluate_episodes*len(alpha_test):(i)*evaluate_episodes*len(alpha_test)]
    fig = px.box(df_eps, x="alpha", y="reward", points="all", title="Rewards distribution test session with epsilon: "+
                str(epsilon))
    fig.show()

In [None]:
#Visualize the results of the test
for i, epsilon in enumerate(eps):
    i = i+1
    df_eps = df_test[(i-1)*evaluate_episodes*len(alpha_test):(i)*evaluate_episodes*len(alpha_test)]
    fig = px.box(df_eps, x="alpha", y="timesteps", points="all", title="Timesteps distribution test session with epsilon: "+
                str(epsilon))
    fig.show()

In [None]:
pd.set_option("display.max_rows", None, "display.max_columns", None)
df_test_mean

In [None]:
fig = px.line(df_test_mean, x='alpha', y='fidelity', color='epsilon', symbol="epsilon",
             title = 'Fidelity in function of Noise')
fig.show()

fig = px.line(df_test_mean, x='alpha', y='reward', color='epsilon', symbol="epsilon",
             title = 'Reward in function of Noise')
fig.show()

fig = px.line(df_test_mean, x='alpha', y='timesteps', color='epsilon', symbol="epsilon",
             title = 'Timesteps in function of Noise')
fig.show()

In [None]:
heatmap_table = pd.pivot_table(df_test_mean, values='fidelity', index=['epsilon'],
                    columns=['alpha'])

sns.set(rc={"figure.figsize":(12, 6)}) #width=8, height=4

In [None]:
sns.heatmap(heatmap_table,  cbar_kws={'label': 'Fidelity'})

In [None]:
heatmap_table = pd.pivot_table(df_test_mean, values='reward', index=['epsilon'],
                    columns=['alpha'])

In [None]:
sns.heatmap(heatmap_table,  cbar_kws={'label': 'Reward'})

In [None]:
heatmap_table = pd.pivot_table(df_test_mean, values='timesteps', index=['epsilon'],
                    columns=['alpha'])

In [None]:
sns.heatmap(heatmap_table, cbar_kws={'label': 'Timesteps'}, cmap="Blues")

In [None]:
os.makedirs('data/df_test', exist_ok=True)  
os.makedirs('data/df_test_mean', exist_ok=True)  
df_test.to_csv('data/df_test/data.csv') 
df_test_mean.to_csv('data/df_test_mean/data.csv')