In [1]:
#!pip install gymnasium
#!pip install stable-baselines3[gymnasium]
#!pip install swig
#!pip install gymnasium[box2d]

In [2]:
import tensorflow as tf
from tensorflow.keras import layers, Model, initializers
from tensorflow.keras.initializers import Orthogonal, Constant
import gymnasium as gym
from gymnasium.wrappers import RecordVideo, RecordEpisodeStatistics
import numpy as np
import tensorflow_probability as tfp
from tensorflow.keras.optimizers import Adam







ValueError: Arg specs do not match: original=FullArgSpec(args=['input', 'dtype', 'name', 'layout'], varargs=None, varkw=None, defaults=(None, None, None), kwonlyargs=[], kwonlydefaults=None, annotations={}), new=FullArgSpec(args=['input', 'dtype', 'name'], varargs=None, varkw=None, defaults=(None, None), kwonlyargs=[], kwonlydefaults=None, annotations={}), fn=<function ones_like_v2 at 0x0000015685C60C10>

In [69]:
class PPO(Model):

    """ PPO model for continuous action space. See Sutton & Barto 13.7 Continuous Action Spaces """

    def __init__(self, env,ppo_clip_val=0.2, learning_rate=1e-3, epsilon=1e-5):
        super(PPO, self).__init__()

        self.env = env

        self.observation_shape = self.env.observation_space.shape
        self.action_shape = self.env.action_space.shape

        self.__create_shared_layers()
        self.__create_actor()
        self.__create_critic()

        self.learning_rate = learning_rate
        self.epsilon = epsilon
        self.optimizer = Adam(learning_rate=self.learning_rate, epsilon=self.epsilon)

        # PPO PARAMS
        self.ppo_clip_val = ppo_clip_val

    def __layer_init(self, units, std=np.sqrt(2), bias_const=0.0, activation=None):
        """ Initialize layers with orthogonal weights and constant biases """
        return layers.Dense(units, kernel_initializer=Orthogonal(gain=std),
                            bias_initializer=Constant(value=bias_const), activation=activation)

    def __create_shared_layers(self):
        """ Create shared layers for both actor and critic networks """
        self.shared_input = layers.Input(shape=self.observation_shape)
        x = self.__layer_init(64, activation='tanh')(self.shared_input)
        x = self.__layer_init(64, activation='tanh')(x)
        self.shared_output = x

    def __create_actor(self):
        """ Create continuous actor neural network """
        mu_output = self.__layer_init(self.action_shape[0], std=0.01, activation='tanh')(self.shared_output)
        log_std_output = layers.Dense(self.action_shape[0], activation='softplus')(self.shared_output)  # todo - IS THIS CORRECT?
        self.actor = Model(inputs=self.shared_input, outputs=[mu_output, log_std_output], name='actor')

    def __create_critic(self):
        """ Create critic neural network """
        critic_output = self.__layer_init(1, std=1.0)(self.shared_output)
        self.critic = Model(inputs=self.shared_input, outputs=critic_output, name='critic')

    def get_value(self, observation):
        """ Get value vector """
        return self.critic(observation)

    def get_policy(self, observation):
        """ Get action vector and log std """
        return self.actor(observation)

    def sample_action(self, observation):

        """ Sample action from the policy distribution """
        mu, log_std = self.get_policy(observation)
        std = tf.exp(log_std)
        action = tf.random.normal(shape=mu.shape, mean=mu, stddev=std)  # todo - IS THIS CORRECT?
        return action
    
    def clip(self, inputs):
        
        return tf.clip_by_value(inputs, 1-self.ppo_clip_val, 1+self.ppo_clip_val)

    
    
    
    
    
    
    
    

    ## ============ ** check from here ** ============
    
    # PSEUDOCODE ONLY - MISSING VARIABLES 
    def L_clip(self):

        policy_ratio = self.policy_ratio(observation, old_prob)

        clipped_ratio = self.clip(policy_ratio)

        clipped_loss = clipped_ratio * gaes

        full_loss = policy_ratio * gaes

        policy_loss = min(full_loss, clipped_loss).mean()  # todo - if minimising, make this negative


    def train_policy_network(self, observations, actions, old_action_probs, advantages):
        with tf.GradientTape() as tape:
            mu, log_std = self.actor_critic.actor(observations)
            std = tf.exp(log_std)
            dist = tfp.distributions.Normal(mu, std)
            new_action_probs = dist.prob(actions)

            ratios = new_action_probs / old_action_probs
            clipped_ratios = tf.clip_by_value(ratios, 1.0 - self.ppo_clip_val, 1.0 + self.ppo_clip_val)
            loss = -tf.reduce_mean(tf.minimum(ratios * advantages, clipped_ratios * advantages))

        grads = tape.gradient(loss, self.actor_critic.actor.trainable_variables)
        self.policy_optimizer.apply_gradients(zip(grads, self.actor_critic.actor.trainable_variables))
        return loss

    def train_value_network(self, observations, returns):
        with tf.GradientTape() as tape:
            values = tf.squeeze(self.actor_critic.critic(observations))
            loss = tf.reduce_mean(tf.square(returns - values))

        grads = tape.gradient(loss, self.actor_critic.critic.trainable_variables)
        self.value_optimizer.apply_gradients(zip(grads, self.actor_critic.critic.trainable_variables))
        return loss

    def compute_gae(self, rewards, values, dones, next_value):
        values = np.append(values, next_value)
        gaes = np.zeros_like(rewards)
        last_gae_lam = 0
        for t in reversed(range(len(rewards))):
            delta = rewards[t] + self.gamma * values[t + 1] * (1 - dones[t]) - values[t]
            gaes[t] = last_gae_lam = delta + self.gamma * self.alpha * (1 - dones[t]) * last_gae_lam
        return gaes

    def train(self, env, total_timesteps, batch_size=64):

        # Lists to store the trajectory data for a single batch
        observations = []  # To store observations (states) encountered during the episode
        actions = []  # To store actions taken by the agent
        rewards = []  # To store rewards received after taking actions
        dones = []  # To store done flags indicating whether the episode has ended
        values = []  # To store value estimates from the critic network
        action_probs = []  # To store the probabilities of the actions taken by the policy network

        observation, _ = env.reset()
        for timestep in range(total_timesteps):
            action, action_prob = self.select_action(observation)
            next_observation, reward, done, _, _ = env.step(action)

            value = self.actor_critic.get_value(observation)

            observations.append(observation)
            actions.append(action)
            rewards.append(reward)
            dones.append(done)
            values.append(value)
            action_probs.append(action_prob)

            observation = next_observation

            if done:
                observation, _ = env.reset()

            if (timestep + 1) % batch_size == 0:
                next_value = self.actor_critic.get_value(observation)
                returns = self.compute_gae(rewards, values, dones, next_value)

                observations = np.array(observations)
                actions = np.array(actions)
                action_probs = np.array(action_probs)
                returns = np.array(returns)
                advantages = returns - np.array(values)

                for _ in range(self.max_policy_updates):
                    policy_loss = self.train_policy_network(observations, actions, action_probs, advantages)
                    if policy_loss < self.target_kl:
                        break

                for _ in range(self.max_value_updates):
                    value_loss = self.train_value_network(observations, returns)

                observations = []
                actions = []
                rewards = []
                dones = []
                values = []
                action_probs = []
    
    # todo - not working on jupyter notebook
    def play(self, render=False, record=False, video_folder='videos', episode_trigger=lambda e: True, name_prefix="rl-video"):

        env = RecordEpisodeStatistics(self.env)

        if record:
            env = RecordVideo(env, video_folder)
            env.metadata['render_fps'] = 30  # Set the FPS to 30
            
        observation, _ = env.reset()
        observation = np.array([observation])
        print(observation.shape)

        done = False
        total_reward = 0

        while not done:

            if render:
                env.render()

            action = self.sample_action(observation)[0]
            observation, reward, done, _, _ = env.step(action)
            observation = np.array([observation])

            total_reward += reward

        env.close()


###### https://www.gymlibrary.dev/environments/box2d/bipedal_walker/


In [59]:
env = gym.make('BipedalWalker-v3', render_mode='rgb_array')

obs, _ = env.reset()

# e.g. test multiple observations being fed to the neural net for when batch training
obs = np.array([obs])

a1 = PPO(env)

In [60]:
# get the policy output (mu vector, log_std vector)
print(a1.get_policy(obs))

[<tf.Tensor: shape=(1, 4), dtype=float32, numpy=
array([[-0.00184107,  0.00191982,  0.00068387, -0.00229931]],
      dtype=float32)>, <tf.Tensor: shape=(1, 4), dtype=float32, numpy=array([[0.3325026 , 0.56307745, 0.5883078 , 0.65620816]], dtype=float32)>]


In [67]:
# sample an action
print(a1.sample_action(obs), np.array(a1.sample_action(obs)))

tf.Tensor([[ 2.2131956  -0.67672855 -0.10302535  5.2636023 ]], shape=(1, 4), dtype=float32) [[-1.2076478   0.6972355   0.2103979   0.65136325]]
