In [1]:
import os 
import numpy as np
import tensorflow as tf
import tensorflow.keras as keras
import matplotlib.pyplot as plt
import gymnasium as gym

In [2]:
class ActorCritic(keras.Model):
    
    """The neural network with two heads for the acrobot agent

    Args:
        keras (_type_): _description_
    """
    
    def __init__(self, input_dim, n_actions:int, 
                 n1:int = 512, n2:int = 512, 
                 chkpt_dir='models/acrobot', name='acrobot_actor_critic'):
        """_summary_

        Args:
            input_dim (_type_): The dimension of the feature
            n1 (int): The number of neurons of the the first layer
            n2 (int): The number of neurons of the the second layer
            n_actions (int): The number of possible actions
            chkpt_dir (str, optional): Directory path. Defaults to 'models/acrobot'.
            name (str, optional): Model's name. Defaults to 'acrobot_actor_critic'.
        """
        
        super().__init__()
        
        self.model_name = name
        self.chkpt_dir = chkpt_dir
        self.chkpt_file = os.path.join(chkpt_dir, name + '.weights.h5')
        
        self.layer1 = keras.layers.Dense(n1, activation='elu', input_shape=input_dim)
        self.layer2 = keras.layers.Dense(n2, activation='elu')
        
        self.actions = keras.layers.Dense(n_actions, activation=None)
        self.value = keras.layers.Dense(1, activation=None)
        
    
    def call(self, x): 
        """_summary_

        Args:
            x (tf.Tensor): The input value

        Returns:
            tuple:
                - The actions logit's probs
                - The valuation of the actions 
        """
        
        x = self.layer1(x)
        x = self.layer2(x)
        
        return self.actions(x), self.value(x)    

In [41]:
class AcrobotAgent:
    """_summary_
    """
    
    def __init__(self, env:gym.Env, gamma:float, lr:float=1e-5, delta:float=1.0) -> None:
        """_summary_

        Args:
            env (gym.Env): _description_
            gamma (float): _description_
            lr (float, optional): _description_. Defaults to 1e-5.
            delta (float, optional): _description_. Defaults to 1.0.
        """
        
        self.env = env

        self.gamma = gamma

        self.actor_critic: ActorCritic = ActorCritic(input_dim=[env.observation_space.shape[0]], 
                                                     n_actions=int(env.action_space.n))    
        
        self.actor_critic_optimizer = keras.optimizers.Adam(learning_rate=lr)
        self.actor_critic_loss = keras.losses.Huber(delta=delta)
        
        self.total_rewards = []
        
        self.buffer = {
            'states': [],
            'next_states': [],
            'actions': [],
            'dones': [],
            'values': [],
            'rewards': []
        }
        
    
    def reset_buffer(self):
        """_summary_
        """
        self.buffer = {k:[] for k in self.buffer}    
        
    
    def normalize_input(self, x:np.array):
        """_summary_

        Args:
            x (np.array): _description_

        Returns:
            _type_: _description_
        """
        
        return (x - np.mean(x)) / np.std(x)
        
        
    def discount(self, rewards:list, dones:list):
        """_summary_

        Args:
            rewards (list): _description_

        Returns:
            _type_: _description_
        """
        discounted_rewards = []
        G = 0
        for i in reversed(range(len(rewards))):
            G =  rewards[i] + self.gamma * G * (1 - int(dones[i]))
            discounted_rewards.insert(0, G)
        return tf.convert_to_tensor(discounted_rewards, dtype=tf.float32)
    
    
    def normalize_discounts(self, rewards:list, dones:list):
        """_summary_

        Args:
            total_rewards (list): _description_

        Returns:
            _type_: _description_
        """
        discounts = self.discount(rewards, dones)
        
        print(discounts.shape)
        
        mean = np.mean(discounts, axis=0)
        std = np.std(discounts, axis=0)
        
        return (discounts - mean) / std 
    
    
    def play_one_step(self, state: np.array):
        state = self.normalize_input(state)
        state_tensor = tf.convert_to_tensor(state, dtype=tf.float32)
        
        state_tensor = tf.expand_dims(state_tensor, axis=0)
        
        actions_logs, value = self.actor_critic(state_tensor)
        
        best_action = tf.random.categorical(actions_logs, num_samples=1)
        best_int_action = int(best_action.numpy()[0, 0])
        
        next_state, reward, terminated, truncated, _ = self.env.step(action=best_int_action)
        
        next_state = tf.convert_to_tensor(next_state, dtype=tf.float32)
        
        done = terminated or truncated
        
        self.buffer['states'].append(state)
        self.buffer['next_states'].append(next_state)
        self.buffer['actions'].append(best_action)
        self.buffer['dones'].append(done)
        self.buffer['values'].append(value)
        self.buffer['rewards'].append(reward)
        
        return next_state, reward, done
    
    
    def train_step(self):
        """_summary_
        """
        states = tf.convert_to_tensor(self.buffer['states'], dtype=tf.float32)
        actions = tf.convert_to_tensor(self.buffer['actions'], dtype=tf.float32)
        dones = tf.convert_to_tensor(self.buffer['dones'], dtype=tf.float32)
        rewards = tf.convert_to_tensor(self.buffer['rewards'], dtype=tf.float32)
        
        variables = self.actor_critic.trainable_variables
        
        discounted_normalize_rewards = self.normalize_discounts(rewards=rewards, dones=dones)
        
        with tf.GradientTape() as tape:
            actions_logs, values_predicted = self.actor_critic(states)
            
            advantages = tf.stop_gradient(discounted_normalize_rewards - tf.squeeze(values_predicted))
            advantages = tf.expand_dims(advantages, axis=1)
            
            actions = tf.cast(actions, dtype=tf.int32)
            # actor loss
            logs = tf.gather_nd(actions_logs, actions)
            actor_loss = -tf.reduce_sum(logs * advantages)
            
            # critic loss
            critic_loss = self.actor_critic_loss(discounted_normalize_rewards, tf.squeeze(values_predicted))
            
            total_loss = actor_loss + critic_loss
            
        grads = tape.gradient(total_loss, variables)
        self.actor_critic_optimizer.apply_gradients(zip(grads, variables))
        
    
    def play_episode(self, n_step:int):
        """_summary_

        Args:
            n_step (_type_): _description_
        """
        
        state, _ = self.env.reset()
        
        episode_reward = 0
        
        for _ in range(n_step):
            next_state, rewards, done = self.play_one_step(state)
            episode_reward += rewards
            
            if done:
                break
            
        self.total_rewards.append(episode_reward)
        print(f"Episode reward: {episode_reward}")
        
    
    def play_all_episode(self, n_episode:int, n_step:int):
        """_summary_

        Args:
            n_episode (int): _description_
            n_step (int): _description_
        """
        
        print("-" * 25, 'Training starts', 25 * '-')
        
        for episode in range(n_episode):
            self.play_episode(n_step)
            self.train_step()
            self.reset_buffer()     
        
        print("-" * 25, 'Training ends', 25 * '-')
        
        
    def evaluate(self, n_episode:int, n_step:int):
        total_rewards = []
        
        print(25 * '-', 'Evaluation start', '-' * 25)
        
        for _ in range(n_episode):
            state, _ = self.env.reset()
            episode_reward = 0
            for _ in range(n_step):
                state_normal = self.normalize_input(state)
                state_tensor = tf.convert_to_tensor(state_normal, dtype=tf.float32)
                state_tensor = tf.expand_dims(state_tensor, axis=0)
                actions_log, _ = self.actor_critic(state_tensor)
                best_action = tf.argmax(actions_log, axis=1)
                best_action = int(best_action.numpy()[0])
                next_state, reward, terminated, truncated = self.env.step(best_action)
                done = terminated or truncated
                episode_reward += reward
                
                if done:
                    break
                
                state = next_state
                
            total_rewards.append(episode_reward)
        
        print(25 * '-', 'Evaluation start', '-' * 25)    
        
        print(f"The rewards mean is: {sum(total_rewards) / n_episode}")
        
        return sum(total_rewards) / n_episode
    
    
    def plot_rewards(self):
        plt.figure(figsize=(12, 8))
        plt.plot(self.total_rewards)
        plt.title("Rewards per episode")
        plt.xlabel("Episodes")
        plt.ylabel("Rewards")
        plt.grid(True)
        plt.show()   
        
    
    def save_model(self):
        
        print("-" * 25, 'Model is saving', '-' * 25)
        
        if os.path.exists(self.actor_critic.chkpt_dir):
            self.actor_critic.save_weights(self.actor_critic.chkpt_file) 
            print("-" * 25, 'Model saved', '-' * 25)
        else:
            os.makedirs(self.actor_critic.chkpt_dir)
            self.actor_critic.save_weights(self.actor_critic.chkpt_file) 
            print("-" * 25, 'Model is saving', '-' * 25)
    
    def load_model(self):
        
        print("-" * 25, 'Model is loading', '-' * 25)
        if os.path.exists(self.actor_critic.chkpt_dir):
            self.actor_critic.load_weights(self.actor_critic.chkpt_file)
        else:
            os.makedirs(self.actor_critic.chkpt_dir)
            print("The directory doesn't exist")
            print("Directory is created")                

In [42]:
env = gym.make("Acrobot-v1")

In [43]:
def create_agent(env: gym.Env, gamma:float, delta:float) -> AcrobotAgent:
    return AcrobotAgent(env=env, gamma=gamma, delta=delta)

In [44]:
def training_loop(agent: AcrobotAgent, n_episode:int = 500, n_step:int = 200):
    agent.play_all_episode(n_episode=n_episode, n_step=n_step)
    agent.save_model()

In [45]:
def evaluate_model(agent: AcrobotAgent, n_episode:int, n_step:int):
    agent.evaluate(n_episode=n_episode, n_step=n_step)

In [46]:
agent = create_agent(env=env, gamma=0.975, delta=1.0)

In [None]:
training_loop(agent=agent, n_episode=1000, n_step=150)

------------------------- Training starts -------------------------
Episode reward: -150.0
(300,)
Episode reward: -150.0
(150,)
Episode reward: -150.0
(150,)
Episode reward: -150.0
(150,)
Episode reward: -150.0
(150,)
Episode reward: -150.0
(150,)
Episode reward: -150.0
(150,)
Episode reward: -150.0
(150,)
Episode reward: -150.0
(150,)
Episode reward: -150.0
(150,)
Episode reward: -150.0
(150,)
Episode reward: -150.0
(150,)
Episode reward: -150.0
(150,)
Episode reward: -150.0
(150,)
Episode reward: -150.0
(150,)
Episode reward: -150.0
(150,)
Episode reward: -150.0
(150,)
Episode reward: -150.0
(150,)
Episode reward: -150.0
(150,)
Episode reward: -150.0
(150,)
Episode reward: -150.0
(150,)
Episode reward: -150.0
(150,)
Episode reward: -150.0
(150,)
Episode reward: -150.0
(150,)
Episode reward: -150.0
(150,)
Episode reward: -150.0
(150,)
Episode reward: -150.0
(150,)
Episode reward: -150.0
(150,)
Episode reward: -150.0
(150,)
Episode reward: -150.0
(150,)
Episode reward: -150.0
(150,)
Ep

In [None]:
agent.evaluate(n_episode=10, n_step=25)

------------------------- Evaluation start -------------------------


InvalidArgumentError: {{function_node __wrapped__Pack_N_2_device_/job:localhost/replica:0/task:0/device:CPU:0}} Shapes of all inputs must match: values[0].shape = [1,3] != values[1].shape = [1,1] [Op:Pack] name: packed