# Pendulum - DDPG (Actor Critic)

In [None]:
import gym
import numpy as np
import tensorflow as tf
import tensorflow_quantum as tfq
import cirq
from collections import deque
import matplotlib.pyplot as plt

# Set random seeds for reproducibility
np.random.seed(0)
tf.random.set_seed(0)

In [None]:
def build_quantum_circuit(qubits, n_layers):
    # Define PQC architecture
    circuit = cirq.Circuit()
    for _ in range(n_layers):
        for i, qubit in enumerate(qubits):
            circuit += cirq.rx(np.pi / 2)(qubit)
            circuit += cirq.ry(np.pi / 2)(qubit)
            circuit += cirq.rz(np.pi / 2)(qubit)
            circuit += cirq.CNOT(qubits[i], qubits[(i + 1) % len(qubits)])
    return circuit

# Define observables (for Actor and Critic)
n_qubits = 4
n_layers = 5
qubits = cirq.GridQubit.rect(1, n_qubits)
ops_actor = [cirq.Z(q) for q in qubits]
ops_critic = [cirq.Z(q) for q in qubits]
observables_actor = [ops_actor[0] * ops_actor[1], ops_actor[2] * ops_actor[3]]
observables_critic = [ops_critic[0] * ops_critic[1], ops_critic[2] * ops_critic[3]]


In [None]:
def build_actor_model(qubits, n_layers, observables):
    input_tensor = tf.keras.Input(shape=(len(qubits),), dtype=tf.dtypes.float32, name='input')
    pqc = tfq.layers.PQC(build_quantum_circuit(qubits, n_layers), observables)(input_tensor)
    model = tf.keras.Model(inputs=[input_tensor], outputs=pqc)
    return model

def build_critic_model(qubits, n_layers, observables):
    input_tensor = tf.keras.Input(shape=(len(qubits),), dtype=tf.dtypes.float32, name='input')
    pqc = tfq.layers.PQC(build_quantum_circuit(qubits, n_layers), observables)(input_tensor)
    output = tf.keras.layers.Dense(1)(pqc)
    model = tf.keras.Model(inputs=[input_tensor], outputs=output)
    return model

actor_model = build_actor_model(qubits, n_layers, observables_actor)
critic_model = build_critic_model(qubits, n_layers, observables_critic)


In [None]:
actor_target = build_actor_model(qubits, n_layers, observables_actor)
critic_target = build_critic_model(qubits, n_layers, observables_critic)
actor_target.set_weights(actor_model.get_weights())
critic_target.set_weights(critic_model.get_weights())


In [None]:
actor_optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
critic_optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
gamma = 0.99
tau = 0.005
batch_size = 64


In [None]:
replay_memory = deque(maxlen=10000)


In [None]:
noise = 0.1
noise_clip = 0.5


In [None]:
def select_action(state):
    state = tf.convert_to_tensor([state], dtype=tf.float32)
    action = actor_model(state)
    action = action.numpy()[0]
    action += np.random.normal(0, noise, size=action.shape)
    action = np.clip(action, -1, 1)
    return action


In [None]:
@tf.function
def update_critic(states, actions, rewards, next_states, dones):
    next_actions = actor_target(next_states, training=True)
    target_Q = rewards + gamma * critic_target([next_states, next_actions], training=True) * (1 - dones)
    with tf.GradientTape() as tape:
        Q = critic_model([states, actions], training=True)
        critic_loss = tf.keras.losses.MeanSquaredError()(target_Q, Q)
    critic_grads = tape.gradient(critic_loss, critic_model.trainable_variables)
    critic_optimizer.apply_gradients(zip(critic_grads, critic_model.trainable_variables))
    
@tf.function
def update_actor(states):
    with tf.GradientTape() as tape:
        actions = actor_model(states, training=True)
        critic_value = critic_model([states, actions], training=True)
        actor_loss = -tf.reduce_mean(critic_value)
    actor_grads = tape.gradient(actor_loss, actor_model.trainable_variables)
    actor_optimizer.apply_gradients(zip(actor_grads, actor_model.trainable_variables))

In [None]:
env = gym.make("Pendulum-v0")
episode_reward_history = []

for episode in range(500):
    state = env.reset()
    episode_reward = 0
    for step in range(500):
        action = select_action(state)
        next_state, reward, done, _ = env.step(action)
        replay_memory.append((state, action, reward, next_state, done))
        if len(replay_memory) > batch_size:
            batch = np.array(random.sample(replay_memory, batch_size))
            states, actions, rewards, next_states, dones = batch[:,0], batch[:,1], batch[:,2], batch[:,3], batch[:,4]
            update_critic(states, actions, rewards, next_states, dones)
            update_actor(states)
            actor_weights = actor_model.get_weights()
            critic_weights = critic_model.get_weights()
            actor_target_weights = actor_target.get_weights()
            critic_target_weights = critic_target.get_weights()
            for i in range(len(actor_weights)):
                actor_target_weights[i] = tau * actor_weights[i] + (1 - tau) * actor_target_weights[i]
                critic_target_weights[i] = tau * critic_weights[i] + (1 - tau) * critic_target_weights[i]
            actor_target.set_weights(actor_target_weights)
            critic_target.set_weights(critic_target_weights)
        state = next_state
        episode_reward += reward
        if done:
            break
    episode_reward_history.append(episode_reward)
    print(f"Episode: {episode}, Reward: {episode_reward}")

# Plotting
plt.plot(episode_reward_history)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('DDPG Training Performance')
plt.show()


# Mountain Car - DDPG (Actor Critic)

In [None]:
import gym
import numpy as np
import tensorflow as tf
import tensorflow_quantum as tfq
import cirq
from collections import deque
import random
import matplotlib.pyplot as plt

# Set random seeds for reproducibility
np.random.seed(0)
tf.random.set_seed(0)

# Part 1: Define Quantum Circuit and Observables

def build_quantum_circuit(qubits, n_layers):
    circuit = cirq.Circuit()
    for _ in range(n_layers):
        for i, qubit in enumerate(qubits):
            circuit += cirq.rx(np.pi / 2)(qubit)
            circuit += cirq.ry(np.pi / 2)(qubit)
            circuit += cirq.rz(np.pi / 2)(qubit)
            circuit += cirq.CNOT(qubits[i], qubits[(i + 1) % len(qubits)])
    return circuit

n_qubits = 3
n_layers = 3
qubits = cirq.GridQubit.rect(1, n_qubits)
ops_actor = [cirq.Z(q) for q in qubits]
ops_critic = [cirq.Z(q) for q in qubits]
observables_actor = [ops_actor[0] * ops_actor[1], ops_actor[1] * ops_actor[2]]
observables_critic = [ops_critic[0] * ops_critic[1], ops_critic[1] * ops_critic[2]]

# Part 2: Define Actor and Critic Models

def build_actor_model(qubits, n_layers, observables):
    input_tensor = tf.keras.Input(shape=(len(qubits),), dtype=tf.dtypes.float32, name='input')
    pqc = tfq.layers.PQC(build_quantum_circuit(qubits, n_layers), observables)(input_tensor)
    model = tf.keras.Model(inputs=[input_tensor], outputs=pqc)
    return model

def build_critic_model(qubits, n_layers, observables):
    input_tensor = tf.keras.Input(shape=(len(qubits),), dtype=tf.dtypes.float32, name='input')
    pqc = tfq.layers.PQC(build_quantum_circuit(qubits, n_layers), observables)(input_tensor)
    output = tf.keras.layers.Dense(1)(pqc)
    model = tf.keras.Model(inputs=[input_tensor], outputs=output)
    return model

actor_model = build_actor_model(qubits, n_layers, observables_actor)
critic_model = build_critic_model(qubits, n_layers, observables_critic)

# Part 3: Define Actor and Critic Target Models

actor_target = build_actor_model(qubits, n_layers, observables_actor)
critic_target = build_critic_model(qubits, n_layers, observables_critic)
actor_target.set_weights(actor_model.get_weights())
critic_target.set_weights(critic_model.get_weights())

# Part 4: Define Optimizers and Hyperparameters

actor_optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
critic_optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
gamma = 0.99
tau = 0.005
batch_size = 64

# Part 5: Define Experience Replay Memory

replay_memory = deque(maxlen=10000)

# Part 6: Define Exploration Noise

noise = 0.1
noise_clip = 0.5

# Part 7: Define Action Selection Function

def select_action(state):
    state = tf.convert_to_tensor([state], dtype=tf.float32)
    action = actor_model(state)
    action = action.numpy()[0]
    action += np.random.normal(0, noise, size=action.shape)
    action = np.clip(action, -1, 1)
    return action

# Part 8: Define Training Functions

@tf.function
def update_critic(states, actions, rewards, next_states, dones):
    next_actions = actor_target(next_states, training=True)
    target_Q = rewards + gamma * critic_target([next_states, next_actions], training=True) * (1 - dones)
    with tf.GradientTape() as tape:
        Q = critic_model([states, actions], training=True)
        critic_loss = tf.keras.losses.MeanSquaredError()(target_Q, Q)
    critic_grads = tape.gradient(critic_loss, critic_model.trainable_variables)
    critic_optimizer.apply_gradients(zip(critic_grads, critic_model.trainable_variables))
    
@tf.function
def update_actor(states):
    with tf.GradientTape() as tape:
        actions = actor_model(states, training=True)
        critic_value = critic_model([states, actions], training=True)
        actor_loss = -tf.reduce_mean(critic_value)
    actor_grads = tape.gradient(actor_loss, actor_model.trainable_variables)
    actor_optimizer.apply_gradients(zip(actor_grads, actor_model.trainable_variables))

# Part 9: Main Training Loop

env = gym.make("MountainCar-v0")
episode_reward_history = []

for episode in range(500):
    state = env.reset()
    episode_reward = 0
    for step in range(500):
        action = select_action(state)
        next_state, reward, done, _ = env.step(action)
        replay_memory.append((state, action, reward, next_state, done))
        if len(replay_memory) > batch_size:
            batch = np.array(random.sample(replay_memory, batch_size))
            states, actions, rewards, next_states, dones = batch[:,0], batch[:,1], batch[:,2], batch[:,3], batch[:,4]
            update_critic(states, actions, rewards, next_states, dones)
            update_actor(states)
            actor_weights = actor_model.get_weights()
            critic_weights = critic_model.get_weights()
            actor_target_weights = actor_target.get_weights()
            critic_target_weights = critic_target.get_weights()
            for i in range(len(actor_weights)):
                actor_target_weights[i] = tau * actor_weights[i] + (1 - tau) * actor_target_weights[i]
                critic_target_weights[i] = tau * critic_weights[i] + (1 - tau) * critic_target_weights[i]
            actor_target.set_weights(actor_target_weights)
            critic_target.set_weights(critic_target_weights)
        state = next_state
        episode_reward += reward
        if done:
            break
    episode_reward_history.append(episode_reward)
    print(f"Episode: {episode}, Reward: {episode_reward}")

# Part 10: Plotting

plt.plot(episode_reward_history)
plt.xlabel('Episode')
plt.ylabel('Total Reward')
plt.title('Actor-Critic Training Performance')
plt.show()
