# The main Agent Cell

In [2]:
import numpy as np
import tensorflow as tf
import gymnasium as gym
from tensorflow.keras import layers

class PPOAgent:
    def __init__(self, input_dim, action_dim, lr=0.0003, gamma=0.99, clip_epsilon=0.2):
        self.input_dim = input_dim
        self.action_dim = action_dim
        self.gamma = gamma
        self.clip_epsilon = clip_epsilon

        # Define models
        self.policy_model = self._build_policy_model()
        self.value_model = self._build_value_model()

        # Optimizers
        self.policy_optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
        self.value_optimizer = tf.keras.optimizers.Adam(learning_rate=lr)

    def _build_policy_model(self):
        """Builds the policy network."""
        inputs = layers.Input(shape=(self.input_dim,))
        x = layers.Dense(64, activation='relu')(inputs)
        x = layers.Dense(64, activation='relu')(x)
        outputs = layers.Dense(self.action_dim, activation='softmax')(x)
        return tf.keras.Model(inputs, outputs)

    def _build_value_model(self):
        """Builds the value network."""
        inputs = layers.Input(shape=(self.input_dim,))
        x = layers.Dense(64, activation='relu')(inputs)
        x = layers.Dense(64, activation='relu')(x)
        outputs = layers.Dense(1)(x)
        return tf.keras.Model(inputs, outputs)

    def select_action(self, state):
        """Selects an action based on the policy."""
        state = np.expand_dims(state, axis=0)
        probabilities = self.policy_model(state).numpy().flatten()
        action = np.random.choice(self.action_dim, p=probabilities)
        return action, probabilities[action]

    def train(self, states, actions, rewards, dones, old_probs):
        """Trains the PPO agent."""
        # Compute discounted rewards
        discounted_rewards = self._compute_discounted_rewards(rewards, dones)

        # Convert to tensors
        states = tf.convert_to_tensor(states, dtype=tf.float32)
        actions = tf.convert_to_tensor(actions, dtype=tf.int32)
        old_probs = tf.convert_to_tensor(old_probs, dtype=tf.float32)
        discounted_rewards = tf.convert_to_tensor(discounted_rewards, dtype=tf.float32)

        with tf.GradientTape(persistent=True) as tape:
            # Compute current probabilities and value estimates
            logits = self.policy_model(states)
            values = tf.squeeze(self.value_model(states))
            action_masks = tf.one_hot(actions, self.action_dim)
            current_probs = tf.reduce_sum(action_masks * logits, axis=1)

            # PPO Policy Loss
            ratios = current_probs / old_probs
            advantages = discounted_rewards - values
            clipped_ratios = tf.clip_by_value(ratios, 1 - self.clip_epsilon, 1 + self.clip_epsilon)
            policy_loss = -tf.reduce_mean(tf.minimum(ratios * advantages, clipped_ratios * advantages))

            # Value Loss
            value_loss = tf.reduce_mean(tf.square(discounted_rewards - values))

            # Entropy Bonus
            entropy = -tf.reduce_mean(tf.reduce_sum(logits * tf.math.log(logits + 1e-10), axis=1))

            total_loss = policy_loss + 0.5 * value_loss - 0.01 * entropy

        # Apply gradients
        policy_grads = tape.gradient(policy_loss, self.policy_model.trainable_variables)
        value_grads = tape.gradient(value_loss, self.value_model.trainable_variables)
        self.policy_optimizer.apply_gradients(zip(policy_grads, self.policy_model.trainable_variables))
        self.value_optimizer.apply_gradients(zip(value_grads, self.value_model.trainable_variables))

    def _compute_discounted_rewards(self, rewards, dones):
        """Computes discounted rewards."""
        discounted_rewards = []
        cumulative = 0
        for reward, done in zip(reversed(rewards), reversed(dones)):
            if done:
                cumulative = 0
            cumulative = reward + self.gamma * cumulative
            discounted_rewards.insert(0, cumulative)
        return np.array(discounted_rewards)

    def save(self, path):
        """Saves the models."""
        self.policy_model.save(f"{path}_policy.h5")
        self.value_model.save(f"{path}_value.h5")

    def load(self, path):
        """Loads the models."""
        self.policy_model = tf.keras.models.load_model(f"{path}_policy.h5")
        self.value_model = tf.keras.models.load_model(f"{path}_value.h5")

Episode 1/1000, Total Reward: 14.0
Episode 2/1000, Total Reward: 20.0
Episode 3/1000, Total Reward: 72.0
Episode 4/1000, Total Reward: 16.0
Episode 5/1000, Total Reward: 30.0
Episode 6/1000, Total Reward: 12.0
Episode 7/1000, Total Reward: 21.0
Episode 8/1000, Total Reward: 24.0
Episode 9/1000, Total Reward: 17.0
Episode 10/1000, Total Reward: 17.0
Episode 11/1000, Total Reward: 14.0
Episode 12/1000, Total Reward: 13.0
Episode 13/1000, Total Reward: 13.0
Episode 14/1000, Total Reward: 27.0
Episode 15/1000, Total Reward: 16.0
Episode 16/1000, Total Reward: 24.0
Episode 17/1000, Total Reward: 47.0
Episode 18/1000, Total Reward: 22.0
Episode 19/1000, Total Reward: 11.0
Episode 20/1000, Total Reward: 29.0
Episode 21/1000, Total Reward: 18.0
Episode 22/1000, Total Reward: 27.0
Episode 23/1000, Total Reward: 33.0
Episode 24/1000, Total Reward: 21.0
Episode 25/1000, Total Reward: 26.0
Episode 26/1000, Total Reward: 27.0
Episode 27/1000, Total Reward: 21.0
Episode 28/1000, Total Reward: 19.0
E

KeyboardInterrupt: 

# Initialize Gym environment and PPO agent

In [None]:
env = gym.make("CartPole-v1")
input_dim = env.observation_space.shape[0]  # State dimension
action_dim = env.action_space.n  # Number of possible actions

agent = PPOAgent(input_dim=input_dim, action_dim=action_dim)

# Training loop

In [4]:
episodes = 1000
for episode in range(episodes):
    state, _ = env.reset()  # Fix: reset() now returns a tuple (state, info)
    state = np.array(state)
    done = False
    total_reward = 0
    states, actions, rewards, dones, old_probs = [], [], [], [], []

    while not done:
        action, old_prob = agent.select_action(state)
        next_state, reward, done, _, _ = env.step(action)
        
        # Collect experience
        states.append(state)
        actions.append(action)
        rewards.append(reward)
        dones.append(done)
        old_probs.append(old_prob)

        state = next_state
        total_reward += reward

        if done:
            agent.train(states, actions, rewards, dones, old_probs)
            print(f"Episode {episode+1}/{episodes}, Total Reward: {total_reward}")

Episode 1/1000, Total Reward: 379.0
Episode 2/1000, Total Reward: 4224.0
Episode 3/1000, Total Reward: 638.0
Episode 4/1000, Total Reward: 938.0
Episode 5/1000, Total Reward: 1319.0
Episode 6/1000, Total Reward: 1144.0


KeyboardInterrupt: 

# Save the model after training

In [None]:
agent.save("ppo_cartpole_model")

# Test The Model On game

In [6]:
import gymnasium as gym
import numpy as np
import tensorflow as tf

# Test the trained PPO agent
def test_ppo_agent(agent, env_name="CartPole-v1", episodes=10):
    # Create the environment
    env = gym.make(env_name, render_mode="human")
    
    # Loop through the test episodes
    for episode in range(episodes):
        state, _ = env.reset()  # Reset the environment and get the initial state
        state = np.array(state)
        done = False
        total_reward = 0
        
        while not done:
            # Use the trained agent to select an action
            action, _ = agent.select_action(state)
            
            # Take the action in the environment
            next_state, reward, done, _, _ = env.step(action)
            
            # Update the state
            state = next_state
            total_reward += reward
            
            # Render the environment (this can be turned off if running headless)
            env.render()

            if done:
                print(f"Episode {episode+1}/{episodes}, Total Reward: {total_reward}")

    # Close the environment
    env.close()

# Assuming `agent` is the trained PPOAgent, use this to test the agent
test_ppo_agent(agent)


Episode 1/10, Total Reward: 1849.0


KeyboardInterrupt: 