### A quantum implementation of an actor critic RL model

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow_quantum as tfq
from tensorflow import keras
import gym
import cirq
import sympy


from cirq.contrib.svg import SVGCircuit
from functools import reduce
from collections import deque, defaultdict

In [None]:
class Circuit(tf.keras.layers.Layer):
    """Implements a parameterised quantum circuit with input re-uploading as a layer."""
    def __init__(self, num_layers, name="PQC"):
        super().__init__(name=name)
        self.num_layers = num_layers

    def build(self, input_shape):
        self.num_qubits = input_shape[0][1]
        self.qubits = cirq.GridQubit.rect(1, self.num_qubits)
        
        # Symbols for PQC weights
        params = sympy.symbols(f'param(0:{3 * self.num_layers * self.num_qubits})') # three rotations per layer per qubit
        inputs = sympy.symbols(f'input(0:{self.num_layers})_(0:{self.num_qubits})')
        params = np.asarray(params).reshape((self.num_layers, self.num_qubits, 3))
        inputs = np.asarray(inputs).reshape((self.num_layers, self.num_qubits))

        # Circuit Definition
        circuit = cirq.Circuit()
        for l in range(self.num_layers):
            circuit += self.encode(self.qubits, inputs[l])
            circuit += self.entangle(self.qubits)
            circuit += self.rotate(self.qubits, params[l])

        self.rotation_weights = self.add_weight(shape=(3 * self.num_qubits * self.num_layers,),
                                     initializer='random_normal',
                                     trainable=True, name="Rotation_Weights")
        
        self.input_weights = self.add_weight(shape=(self.num_qubits * self.num_layers,),
                                     initializer='ones',
                                     trainable=True, name="Input_Weights")
        
        # Symbol order
        symbols = list(map(str, np.append(params.flatten(), inputs.flatten())))
        self.symbol_order = tf.constant([symbols.index(a) for a in sorted(symbols)])
        self.pqc = tfq.layers.ControlledPQC(circuit, [cirq.Z(q) for q in self.qubits])

    def get_config(self):
        conf = super().get_config().copy()
        conf.update({
            'num_layers': self.n_layers,
            'rotation_weights': self.rotation_weights,
            'input_weights': self.input_weights,
            'name': self.name,
        })
        return config
        
    @staticmethod
    def encode(qubits, inputs):
        """Returns a layer encoding the state"""
        return cirq.Circuit(cirq.rx(inputs[i])(q) for i, q in enumerate(qubits))

    @staticmethod
    def rotate(qubits, params):
        """Returns a layer rotating each qubit."""
        return cirq.Circuit([cirq.rx(params[i, 0])(q), cirq.ry(params[i, 1])(q), cirq.rz(params[i, 2])(q)] for i, q in enumerate(qubits))

    @staticmethod
    def entangle(qubits):
        """Returns a layer entangling the qubits with CZ gates."""
        return [cirq.CZ(q0, q1) for q0, q1 in zip(qubits, np.roll(qubits, -1, axis=0))]

    def call(self, inputs):
        # #batch_size = tf.shape(inputs[0])[0]
        # batch_size = tf.gather(tf.shape(inputs[0]), 0)
        
        # tiled_params = tf.tile([self.rotation_weights], multiples=[batch_size, 1])
        # tiled_input_weights = tf.tile([self.input_weights], multiples=[batch_size, 1])

        # tiled_inputs = tf.repeat(inputs[0], repeats=self.num_layers, axis=1)
        # # tiled_inputs = tf.tile(tiled_inputs, multiples=[batch_size, 1])
        # # tf.print(tiled_inputs.shape)


        # tiled_weighted_inputs = tf.multiply(tiled_input_weights, tiled_inputs)

        # parameters = tf.concat([tiled_params, tiled_weighted_inputs], axis=1)
        # parameters_ordered = tf.gather(
        #                 parameters,
        #                 self.symbol_order, axis=1)
        
        # circuits = tf.repeat(tfq.convert_to_tensor([cirq.Circuit()]),
        #                 repeats=batch_size)
        
        # return self.pqc([circuits, parameters_ordered])
                #batch_size = tf.shape(inputs[0])[0]
        batch_size = tf.gather(tf.shape(inputs[0]), 0)
        
        tiled_params = tf.tile([self.rotation_weights], multiples=[batch_size, 1])

        #input = tf.repeat(inputs[0], repeats=self.num_layers, axis=1)

        repeated_inputs = tf.repeat(inputs[0], repeats=self.num_layers, axis=1)
        weighted_inputs = tf.multiply(repeated_inputs, self.input_weights)
        tiled_weighted_inputs = tf.tile(weighted_inputs, multiples=[1, batch_size])

        parameters = tf.concat([tiled_params, tiled_weighted_inputs], axis=1)
        parameters_ordered = tf.gather(
                        parameters,
                        self.symbol_order, axis=1)
        
        circuits = tf.repeat(tfq.convert_to_tensor([cirq.Circuit()]),
                        repeats=batch_size)
        
        return self.pqc([circuits, parameters_ordered])

In [None]:
class Scale(tf.keras.layers.Layer):
    def __init__(self, name="Scaling"):
        super().__init__(name=name)
        
    def build(self, input_shape):
        self.w = self.add_weight(shape=input_shape[0], initializer='ones',
                                 trainable=True)

    def call(self, inputs):
        return tf.multiply(inputs, self.w)

In [None]:
class ActorCritic():
  """A class encapsulating a hybrid actor critic network."""
  def __init__(self, actor_learning_rate,
               critic_learning_rate,
               discount_factor, num_states,
               num_actions, num_layers):
    #self.observables = [reduce((lambda x, y: x * y), [cirq.Z(q) for q in qubits])] # Z_0*Z_1*Z_2*Z_3
    self.gamma = discount_factor
    self.num_actions = num_actions
    self.memory = []
    self.actor, self.critic, self.model = self.create_model((actor_learning_rate, critic_learning_rate),
                                                num_states, num_actions,
                                                num_layers)

  def act(self, observation):
    #print(self.actor.predict(observation[np.newaxis, :], verbose=False).flatten())
    return np.random.choice(self.num_actions,
                            p=self.actor.predict(observation[np.newaxis, :],
                                                 verbose=False).flatten())
  def save(self, timestamp):
      self.actor.save(f"actor_{timestamp}.keras")
      
  # Could use experience replay here.
  # Use tensorflow graphs to optimise training and results
  # split into record and learn. just figure out tf graphs.
  #@tf.function
  def learn(self, observation, action, reward, new_observation, done):
    self.memory.append((observation, action, reward, new_observation, done))

    if not done:
      return

    states, actions, rewards, _, _ = zip(*self.memory)
    states = np.vstack(states)
    rewards = np.array(rewards)

    # Calculate Expected Return:
    returns = np.zeros_like(rewards)
    discounted_sum = 0

    for i in reversed(range(len(rewards))):
        returns[i] = rewards[i] + self.gamma * discounted_sum
        discounted_sum = returns[i]
    
    returns = (returns - np.mean(returns)) / np.std(returns)
    #print(returns)
      
    # Update the Critic:
    critic_rewards = self.critic.predict(states, verbose=False).flatten()
    self.critic.fit(states, returns, verbose=False, batch_size=len(self.memory))
    #print(critic_rewards)
      
    # Calculate Advantage:
    advantages = (returns - critic_rewards)

    # Update the Actor:
    actions_one_hot = np.zeros((len(actions), self.num_actions))
    actions_one_hot[np.arange(len(actions)), actions] = 1
    #target_actions_and_advantage = np.hstack([actions_one_hot, advantages[:, np.newaxis]])
    self.actor.fit(states, actions_one_hot, sample_weight=advantages, verbose=False, batch_size=len(self.memory))

    self.memory = []

  def create_model(self, alpha, num_states, num_actions, num_layers):
    """Creates the actor and critic models from the network dimensions"""
    input_tensor = tf.keras.Input(shape=num_states, dtype=tf.dtypes.float32, name='Input')
    re_uploading_pqc = Circuit(num_layers)([input_tensor])
    scaling_layer = Scale()(re_uploading_pqc)

    policy_logits = tf.keras.layers.Lambda(lambda x: x[:, :2], name="Policy-Split")(scaling_layer)  # First two for policy
    value_output = tf.keras.layers.Lambda(lambda x: x[:, 2] * x[:, 3], name="Value-Product")(scaling_layer)

    policy_output = tf.keras.layers.Softmax()(policy_logits)
    actor = tf.keras.Model(inputs=[input_tensor], outputs=policy_output)
    critic = tf.keras.Model(inputs=[input_tensor], outputs=value_output)
    model = tf.keras.Model(inputs=[input_tensor], outputs=[policy_output, value_output])

    actor.compile(optimizer=keras.optimizers.Adam(learning_rate=alpha[0]), loss="categorical_crossentropy")
    critic.compile(optimizer=keras.optimizers.Adam(learning_rate=alpha[1]), loss='huber')

    return actor, critic, model

In [None]:
# Set the seed.
seed = 42
tf.random.set_seed(seed)
np.random.seed(seed)
#tf.debugging.disable_traceback_filtering()
env = gym.make("CartPole-v1")
#agent = ActorCritic(0.01, 0.01, 0.9, env.observation_space.shape[0], env.action_space.n, 5)
agent = ActorCritic(0.0075, 0.0075, 0.9, env.observation_space.shape[0], env.action_space.n, 5)
tf.keras.utils.plot_model(agent.model, show_shapes=True, dpi=70)

#agent = ActorCritic(1e-2, 1e-2, 0, 0.9, 4, 2, [128], 'relu')

In [None]:
#tf.keras.utils.plot_model(agent.critic, show_shapes=True, dpi=70)
#SVGCircuit()

In [None]:
# get average reward during training episodes and plot.
total_rewards = []
average_rewards = []
episode_lengths = []

max_steps = 500
max_episodes = 5000
episode_average = 100
episode_threshold = 475

state_bounds = np.array([2.4, 2.5, 0.21, 2.5])

for episode in range(max_episodes):
  observation = env.reset()
  done = False
  total_reward = 0.0
  step = 0
  while step != max_steps and not done:
    # Calculate action
    action = agent.act(observation)

    # step forward
    new_observation, reward, done, info = env.step(action)
    new_observation = new_observation / state_bounds
    angle = observation[2]
    position = observation[0]
    #reward = 0.05 * (4.8 - np.abs(position)) / 4.8 + 0.95 * (12/np.pi) * (np.pi/12 - np.abs(angle)) 
    #reward = 0.1 * ((4.8 - np.abs(position))/4.8)**2 + 0.9 * ((np.pi/12 - np.abs(angle))/(np.pi/12))**3 
    #reward = 0.05 * ((4.8 - np.abs(position))/4.8)**2 + 0.95 * ((np.pi/12 - np.abs(angle))/(np.pi/12))**2 
    reward = 0.25 * (1 - np.abs(position))**3 + 0.75 * (1 - np.abs(angle))**3
    agent.learn(observation, action, reward, new_observation, done)

    # Update:
    total_reward += reward
    observation = new_observation
    step += 1

  agent.memory = []
  total_rewards.append(total_reward)
  average_rewards.append(total_reward / step)
  episode_lengths.append(step)
  average_episode_length = np.mean(episode_lengths[-episode_average:])

  if len(episode_lengths) > episode_average and average_episode_length > episode_threshold:
    print(f"\nSolved at Episode {episode}, Average Episode Length: {average_episode_length:.1f}")
    break
  else:
    print(f"\rEpisode: {episode}, Steps: {step: >3}, Average Episode Length: {average_episode_length: >4.1f}", end='')

In [None]:
# ts = '2024.07.04-17.27'
# results = np.load(f'results_{ts}.npy', allow_pickle=True)
# total_rewards = results.item().get("Total Reward")
# average_rewards = results.item().get("Average Reward")
# episode_lengths = results.item().get("Episode Length")

fig, ax = plt.subplots(2, 2, figsize=(10, 10))

ax[0][0].set_xlabel('Episode')
ax[0][0].set_ylabel('Total Reward')
ax[0][0].plot(total_rewards)
#total_rewards_rolling = np.convolve(total_rewards, np.ones(len(total_rewards)//10), 'same') / (len(total_rewards)//10)
# cumsum_vec = np.cumsum(np.insert(total_rewards, 0, 0)) 
# total_rewards_rolling = (cumsum_vec[len(total_rewards)//10:] - cumsum_vec[:-len(total_rewards)//10 + 1]) / (len(total_rewards)//10)
# ax[0][0].plot(total_rewards_rolling)

ax[0][1].set_xlabel('Episode')
ax[0][1].set_ylabel('Average Reward')
ax[0][1].plot(average_rewards)
# cumsum_vec = np.cumsum(np.insert(average_rewards, 0, 0)) 
# average_rewards_rolling = (cumsum_vec[len(average_rewards)//10:] - cumsum_vec[:-len(average_rewards)//10 + 1]) / (len(average_rewards)//10)
# ax[0][1].plot(average_rewards_rolling)

ax[1][0].set_xlabel('Episode')
ax[1][0].set_ylabel('Cumulative Reward')
cumulative_rewards = np.cumsum(total_rewards)
ax[1][0].plot(cumulative_rewards)

ax[1][1].set_xlabel('Episode')
ax[1][1].set_ylabel('Episode Length')
ax[1][1].plot(episode_lengths, label='Lengths')
# cumsum_vec = np.cumsum(np.insert(episode_lengths, 0, 0)) 
# episode_lengths_rolling = (cumsum_vec[len(episode_lengths)//10:] - cumsum_vec[:-len(episode_lengths)//10 + 1]) / (len(episode_lengths)//10)
# ax[1][1].plot(episode_lengths_rolling, 'b--', label='Smoothed Lengths')
# ax[1][1].legend()

fig.tight_layout()
plt.show()

In [None]:
import time
timestamp = time.strftime("%Y.%m.%d-%H.%M")
#np.save(f'total_reward_{timestamp}.npy', total_rewards)
#np.save(f'running_reward_{timestamp}.npy', running_rewards)
#np.save(f'average_reward_{timestamp}.npy', average_rewards)
results = {"Total Reward":total_rewards,
           "Cumulative Reward": np.cumsum(total_rewards),
           "Average Reward": average_rewards,
           "Episode Length": episode_lengths}
np.save(f'qresults_{timestamp}.npy', results)
agent.save(timestamp)

In [None]:
# Play:
import matplotlib.animation as anim
import tensorflow_docs.vis.embed as embed

actor = agent.actor #keras.models.load_model(f"actor_{timestamp}.keras", compile=False)

env = gym.make('CartPole-v1')
observation = env.reset()

frames=[]
for i in range(500):
  frames.append(env.render(mode='rgb_array'))
  observation = observation / state_bounds
  action = np.random.choice(env.action_space.n,
                                p=actor.predict(observation[np.newaxis, :],
                                                 verbose=False).flatten())
  observation, _, done, _ = env.step(action)
  if done:
      break

env.close()

def init():
    global im, text
    im = ax.imshow(frames[0])
    text = ax.text(.05, .95, '1', ha='left',
                   va='top', transform=ax.transAxes) 

def animate(i):
  global im, text
  im.set_data(frames[i])
  text.set_text(str(i))

fig = plt.figure()
ax = plt.gca()
ax.axis('off')

animation = anim.FuncAnimation(fig, animate, init_func=init, frames=len(frames))
animation.save(f'animation_{timestamp}.gif', writer='ffmpeg', fps=30)
plt.close(fig)
embed.embed_file(f'animation_{timestamp}.gif')