# PPO

In [2]:
import numpy as np
import tensorflow as tf
import gymnasium as gym
import os
import matplotlib.pyplot as plt

# Disable eager execution to use TF1 style
tf.compat.v1.disable_eager_execution()
tf.compat.v1.reset_default_graph()


In [3]:
class ValueNetwork:
    def __init__(self, num_features, hidden_size, learning_rate=0.0001):
        self.num_features = num_features
        self.hidden_size = hidden_size
        self.graph = tf.Graph()
        with self.graph.as_default():
            self.sess = tf.compat.v1.Session()
            # Placeholders for state observations and target returns
            self.obs_ph = tf.compat.v1.placeholder(tf.float32, [None, num_features], name='obs')
            self.targets_ph = tf.compat.v1.placeholder(tf.float32, [None], name='targets')
            # Hidden layer using Keras Dense layer
            hidden = tf.compat.v1.keras.layers.Dense(hidden_size, activation=tf.nn.relu, name='hidden')(self.obs_ph)
            # Output layer: predicts a single scalar value per state
            self.value = tf.squeeze(tf.compat.v1.keras.layers.Dense(1, name='value_output')(hidden), axis=1)
            # Loss: Mean Squared Error (manually computed)
            self.loss = tf.reduce_mean(tf.square(self.targets_ph - self.value))
            # Optimizer and training op
            self.optimizer = tf.compat.v1.train.AdamOptimizer(learning_rate=learning_rate)
            self.train_op = self.optimizer.minimize(self.loss)
            self.sess.run(tf.compat.v1.global_variables_initializer())
    
    def predict(self, states):
        return self.sess.run(self.value, feed_dict={self.obs_ph: states})
    
    def update(self, states, targets):
        loss_val, _ = self.sess.run([self.loss, self.train_op],
                                    feed_dict={self.obs_ph: states, self.targets_ph: targets})
        return loss_val


In [4]:
class PolicyNetwork:
    def __init__(self, num_features, num_actions, hidden_sizes, epsilon=0.2, learning_rate=0.0005):
        self.num_features = num_features
        self.num_actions = num_actions
        self.epsilon = epsilon  # PPO clipping parameter
        self.graph = tf.Graph()
        with self.graph.as_default():
            self.sess = tf.compat.v1.Session()
            # Placeholders for observations, actions taken, advantages, and old log probabilities
            self.obs_ph = tf.compat.v1.placeholder(tf.float32, [None, num_features], name='obs')
            self.actions_ph = tf.compat.v1.placeholder(tf.float32, [None, num_actions], name='actions')
            self.advantages_ph = tf.compat.v1.placeholder(tf.float32, [None], name='advantages')
            self.old_log_probs_ph = tf.compat.v1.placeholder(tf.float32, [None], name='old_log_probs')
            
            # Build hidden layers using Keras Dense layers
            out = self.obs_ph
            for i, size in enumerate(hidden_sizes):
                out = tf.compat.v1.keras.layers.Dense(size, activation=tf.nn.tanh, name=f'hidden_{i}')(out)
            # Output layer for the mean of the Gaussian distribution
            self.mean = tf.compat.v1.keras.layers.Dense(num_actions, activation=None, name='mean')(out)
            # Log standard deviation (trainable, state-independent)
            self.log_std = tf.compat.v1.get_variable("log_std", shape=[num_actions], initializer=tf.zeros_initializer())
            self.std = tf.exp(self.log_std)
            
            # Define a normal distribution for actions
            dist = tf.compat.v1.distributions.Normal(loc=self.mean, scale=self.std)
            # Sample actions from this distribution
            self.sampled_action = dist.sample()
            # Compute log probability of actions taken (summed over dimensions)
            self.log_prob = tf.reduce_sum(dist.log_prob(self.actions_ph), axis=1)
            
            # PPO loss: calculate the probability ratio and then clip it
            ratio = tf.exp(self.log_prob - self.old_log_probs_ph)
            surrogate1 = ratio * self.advantages_ph
            surrogate2 = tf.clip_by_value(ratio, 1 - self.epsilon, 1 + self.epsilon) * self.advantages_ph
            self.loss = -tf.reduce_mean(tf.minimum(surrogate1, surrogate2))
            
            
            # Optimizer and training operation
            self.optimizer = tf.compat.v1.train.AdamOptimizer(learning_rate=learning_rate)
            self.train_op = self.optimizer.minimize(self.loss)
            self.sess.run(tf.compat.v1.global_variables_initializer())
    

    def predict(self, states):
        # Return the mean and log_std given states
        mean, log_std = self.sess.run([self.mean, self.log_std], feed_dict={self.obs_ph: states})
        std = np.exp(log_std)
        return mean, std


    def get_action(self, state):
        # For a single state, sample an action and compute its log probability
        feed_dict = {self.obs_ph: state[np.newaxis, :]}
        action = self.sess.run(self.sampled_action, feed_dict=feed_dict)
        log_prob = self.sess.run(self.log_prob, feed_dict={self.obs_ph: state[np.newaxis, :],
                                                            self.actions_ph: action,
                                                            self.old_log_probs_ph: [0.0]})
        return action[0], log_prob[0]
    

    def update(self, states, actions, advantages, old_log_probs):
        loss_val, _ = self.sess.run([self.loss, self.train_op],
                                     feed_dict={self.obs_ph: states,
                                                self.actions_ph: actions,
                                                self.advantages_ph: advantages,
                                                self.old_log_probs_ph: old_log_probs})
        return loss_val


In [5]:
class PPOAgent:
    def __init__(self, env, num_features, num_actions, gamma=0.99, lam=0.95,
                 policy_hidden_sizes=[256, 256], value_hidden_size=256,
                 policy_lr=0.0005, value_lr=0.001, epsilon=0.2):
        self.env = env
        self.gamma = gamma
        self.lam = lam
        self.num_features = num_features
        self.num_actions = num_actions
        
        self.policy_net = PolicyNetwork(num_features, num_actions, policy_hidden_sizes,
                                        epsilon=epsilon, learning_rate=policy_lr)
        self.value_net = ValueNetwork(num_features, value_hidden_size, learning_rate=value_lr)
    
    def discount_rewards(self, rewards):
        discounted = np.zeros_like(rewards)
        running = 0
        for t in reversed(range(len(rewards))):
            running = running * self.gamma + rewards[t]
            discounted[t] = running
        return discounted

    def compute_advantages(self, rewards, values):
        advantages = np.zeros_like(rewards)
        last_advantage = 0
        for t in reversed(range(len(rewards))):
            if t < len(rewards) - 1:
                delta = rewards[t] + self.gamma * values[t+1] - values[t]
            else:
                delta = rewards[t] - values[t]
            last_advantage = delta + self.gamma * self.lam * last_advantage
            advantages[t] = last_advantage
        # Normalize advantages
        advantages = (advantages - advantages.mean()) / (advantages.std() + 1e-8)
        return advantages

    def train(self, num_episodes=5000, max_steps=1000):
        episode = 0
        while episode < num_episodes:
            states, actions, rewards, log_probs = [], [], [], []
            state, _ = self.env.reset()
            done = False
            ep_reward = 0
            for step in range(max_steps):
                action, log_prob = self.policy_net.get_action(state)
                next_state, reward, done, truncated, _ = self.env.step(action)
                states.append(state)
                actions.append(action)
                rewards.append(reward)
                log_probs.append(log_prob)
                ep_reward += reward
                state = next_state
                if done or truncated:
                    break

            states = np.array(states)
            actions = np.array(actions)
            rewards = np.array(rewards)
            log_probs = np.array(log_probs)
            
            # Get value predictions from the critic
            values = self.value_net.predict(states)
            discounted_rewards = self.discount_rewards(rewards)
            advantages = self.compute_advantages(rewards, values)
            
            v_loss = self.value_net.update(states, discounted_rewards)
            p_loss = self.policy_net.update(states, actions, advantages, log_probs)
            
            episode += 1
            print(f"Episode: {episode}, Total Reward: {ep_reward}, Value Loss: {v_loss:.4f}, Policy Loss: {p_loss:.4f}")


In [7]:
# Create the Ant environment from Gymnasium
env = gym.make("HalfCheetah-v5")  # Ant-v5 has 105 observations and 8 continuous actions
num_features = env.observation_space.shape[0]  # Should be 105 for Ant
num_actions = env.action_space.shape[0]          # Should be 8 for Ant

# Instantiate the PPO agent with appropriate hyperparameters
agent = PPOAgent(env, num_features=num_features, num_actions=num_actions,
                 gamma=0.99, lam=0.95,
                 policy_hidden_sizes=[256, 256],
                 value_hidden_size=256,
                 policy_lr=0.0005, value_lr=0.001,
                 epsilon=0.2)

# Train the agent (training Ant can be computationally heavy)
agent.train(num_episodes=7500, max_steps=1000)


Instructions for updating:
The TensorFlow Distributions library has moved to TensorFlow Probability (https://github.com/tensorflow/probability). You should update all references to use `tfp.distributions` instead of `tf.distributions`.
Instructions for updating:
The TensorFlow Distributions library has moved to TensorFlow Probability (https://github.com/tensorflow/probability). You should update all references to use `tfp.distributions` instead of `tf.distributions`.


I0000 00:00:1742025534.936271  384872 mlir_graph_optimization_pass.cc:425] MLIR V1 optimization pass is not enabled


Episode: 1, Total Reward: -1141.1081082650492, Value Loss: 10786.2793, Policy Loss: -0.0000
Episode: 2, Total Reward: -982.9717672459786, Value Loss: 8537.9854, Policy Loss: -0.0000
Episode: 3, Total Reward: -991.1822057677507, Value Loss: 8575.7461, Policy Loss: 0.0000
Episode: 4, Total Reward: -898.6022318736241, Value Loss: 6951.9473, Policy Loss: -0.0000
Episode: 5, Total Reward: -1088.2012695739757, Value Loss: 10393.0615, Policy Loss: -0.0000
Episode: 6, Total Reward: -902.3540625103278, Value Loss: 6853.1030, Policy Loss: -0.0000
Episode: 7, Total Reward: -974.103979205937, Value Loss: 7735.9951, Policy Loss: 0.0000
Episode: 8, Total Reward: -1018.4517743611752, Value Loss: 8223.1504, Policy Loss: 0.0000
Episode: 9, Total Reward: -930.9428635397015, Value Loss: 6694.9648, Policy Loss: -0.0000
Episode: 10, Total Reward: -1044.482929680336, Value Loss: 9335.7500, Policy Loss: -0.0000
Episode: 11, Total Reward: -999.7458858083633, Value Loss: 7785.9790, Policy Loss: 0.0000
Episode:

In [8]:
import os
import gymnasium as gym
import imageio
import numpy as np

def create_video_manual(agent, env_id, filename, video_length, fps):
    """
    Create a video by manually capturing frames from the environment and
    saving them as a video file.
    
    Args:
        agent: The trained agent (with a policy network).
        env_id (str): Environment ID (e.g., "HalfCheetah-v5").
        filename (str): Output filename for the video (including path).
        video_length (int): Total number of steps to record.
        fps (int): Frames per second for the output video.
    """
    video_dir = './vid'
    if not os.path.exists(video_dir):
        os.makedirs(video_dir)
    
    # Create the environment with render_mode set to 'rgb_array'
    env = gym.make(env_id, render_mode='rgb_array')
    # Optionally, force the metadata to use your desired FPS:
    env.metadata['video.frames_per_second'] = fps

    frames = []
    state, _ = env.reset()

    for _ in range(video_length):
        # Capture the frame
        frame = env.render()  # This returns an RGB image
        frames.append(frame)
        
        # Get action from your agent's policy network.
        # (Make sure your agent is built with the correct observation/action sizes for HalfCheetah)
        action, _ = agent.policy_net.get_action(state)
        state, reward, done, truncated, _ = env.step(action)
        
        # If the episode terminates early, reset the environment
        if done or truncated:
            state, _ = env.reset()

    env.close()
    
    # Save the frames to a video file using imageio
    video_path = os.path.join(video_dir, filename)
    imageio.mimsave(video_path, frames, fps=fps)
    print("Video saved as:", video_path)

# Example usage:
# (Make sure your agent was created using the appropriate observation and action dimensions for HalfCheetah-v5.)
create_video_manual(agent, env_id="HalfCheetah-v5", filename="half_cheetah_agent.mp4", video_length=500, fps=30)


Video saved as: ./vid/half_cheetah_agent.mp4


In [7]:
import imageio_ffmpeg
print(imageio_ffmpeg.get_ffmpeg_version())


7.1
