A notebook with the PolicyGradient content of the RL assignment (agent and REINFORCE algorithm)

#### Imports : 

In [1]:
import os
import sys
import gymnasium as gym
import time
import IPython
from tqdm import tqdm
import numpy as np
import random
from collections import defaultdict
import pickle as pkl
import matplotlib.pyplot as plt
from matplotlib.patches import Patch

import seaborn as sns

import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.distributions import Categorical

from collections import deque

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

  from .autonotebook import tqdm as notebook_tqdm


cpu


#### Agent : 

In [None]:
# The Policy Gradient agent class
class PolicyGradientAgent(nn.Module):
    def __init__(self, s_size, a_size, h_size):
        super(PolicyGradientAgent, self).__init__()
        self.fc1 = nn.Linear(s_size, h_size)
        self.fc2 = nn.Linear(h_size, a_size)

    def forward(self, x):
        # Pass the observation through the NN
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.softmax(x, dim=1)

    def policy(self, observation):
        # Chose an action to do stochastically for a given state
        observation = torch.from_numpy(np.array(observation)).unsqueeze(0).float().to(device)
        # Compute the probabilities of the actions for this state
        probs = self.forward(observation).cpu()
        # To categorical
        m = Categorical(probs)
        # Sample an action from the probability distribution of the output
        action = m.sample()
        return action.item(), m.log_prob(action)

    def greedy_policy(self, observation):
        # Chose the action with the highest probability for a given state
        observation = torch.from_numpy(np.array(observation)).unsqueeze(0).float().to(device)
        probs = self.forward(observation).cpu()
        return np.argmax(probs.detach().numpy())

    def save(self, path):
        # Save the agent policy (the state dict)
        torch.save(self.state_dict(), path)

    def load(self, path):
        # Load a saved policy (a state dict)
        self.load_state_dict(torch.load(path))


#### REINFORCE algorithm :

In [None]:
def reinforce(agent, env, optimizer, n_training_episodes, max_t, gamma, print_every=None, disp_tqdm=True, neg_reward=0):
    """This function trains a Policy Gradient agent over n_trainining_episodes, with some
        display parameter and hyperparameters. It returns the score monitoring of the
        agent over the episodes to plot it if desired afterward."""

    rewards_deque = deque(maxlen=100)
    scores_deque = deque(maxlen=100)
    scores = []

    # Iterate over the episodes
    if disp_tqdm:
        iterator = tqdm(range(1, n_training_episodes + 1), desc="Training")
    else:
        iterator = range(1, n_training_episodes + 1)
    for i_episode in iterator:
        # Initialise the episode history and the environment
        saved_log_probs = []
        rewards = []
        state = env.reset()[0]

        # Iterate over the time steps (bounded by max_t)
        for t in range(max_t):
            # Take an action
            action, log_prob = agent.policy(state)
            state, reward, done, _, info = env.step(action)
            reward = 1 if reward else neg_reward

            # Save the log probability and the reward
            saved_log_probs.append(log_prob)
            rewards.append(reward)

            if done:
                break

        # Save the score
        rewards_deque.append(sum(rewards))
        # scores.append(sum(rewards))
        scores_deque.append(info["score"])
        scores.append(info["score"])

        # Calculate the return
        returns = deque(maxlen=max_t)
        n_steps = len(rewards)

        for t in range(n_steps)[::-1]:
            disc_return_t = returns[0] if len(returns) > 0 else 0
            returns.appendleft(gamma * disc_return_t + rewards[t])

        # standardization of the returns is employed to make training more stable
        eps = np.finfo(np.float32).eps.item()
        returns = torch.tensor(returns)
        returns = (returns - returns.mean()) / (returns.std() + eps)

        # Compute the loss
        policy_loss = []
        for log_prob, disc_return in zip(saved_log_probs, returns):
            policy_loss.append(-log_prob * disc_return)
        policy_loss = torch.cat(policy_loss).sum()

        # Gradient descent
        optimizer.zero_grad()
        policy_loss.backward()
        optimizer.step()

        if print_every is not None and i_episode % print_every == 0:
            print("Episode {}\tAverage Score: {:.2f}".format(i_episode, np.mean(scores_deque)))
            print("Loss: {}".format(policy_loss.item()))

    return scores




#### Evaluation :

In [None]:

def reinforce_evaluate(agent, env, optimizer, n_training_episodes, max_t, gamma, print_every=None, disp_tqdm=True, neg_reward=0):
    """This function is almost the same as above, but performs evaluation of the training agent
    throughout its training loop, to monitor its improvement over time."""

    rewards_deque = deque(maxlen=100)
    scores_deque = deque(maxlen=100)
    scores = []
    episodes = []

    # Iterate over the episodes
    if disp_tqdm:
        iterator = tqdm(range(1, n_training_episodes + 1), desc="Training")
    else:
        iterator = range(1, n_training_episodes + 1)
    for i_episode in iterator:
        # Initialise the episode history and the environment
        saved_log_probs = []
        rewards = []
        state = env.reset()[0]

        # Iterate over the time steps (bounded by max_t)
        for t in range(max_t):
            # Take an action
            action, log_prob = agent.policy(state)
            state, reward, done, _, info = env.step(action)
            reward = reward if reward else neg_reward

            # Save the log probability and the reward
            saved_log_probs.append(log_prob)
            rewards.append(reward)

            if done:
                break

        # Save the score
        rewards_deque.append(sum(rewards))
        # scores.append(sum(rewards))
        scores_deque.append(info["score"])
        scores.append(info["score"])
        episodes.append(i_episode)

        # Calculate the return
        returns = deque(maxlen=max_t)
        n_steps = len(rewards)

        for t in range(n_steps)[::-1]:
            disc_return_t = returns[0] if len(returns) > 0 else 0
            returns.appendleft(gamma * disc_return_t + rewards[t])

        # standardization of the returns is employed to make training more stable
        eps = np.finfo(np.float32).eps.item()
        returns = torch.tensor(returns)
        returns = (returns - returns.mean()) / (returns.std() + eps)

        # Compute the loss
        policy_loss = []
        for log_prob, disc_return in zip(saved_log_probs, returns):
            policy_loss.append(-log_prob * disc_return)
        policy_loss = torch.cat(policy_loss).sum()

        # Gradient descent
        optimizer.zero_grad()
        policy_loss.backward()
        optimizer.step()

        if print_every is not None and i_episode % print_every == 0:
            print("Episode {}\tAverage Score: {:.2f}".format(i_episode, np.mean(scores_deque)))
            print("Loss: {}".format(policy_loss.item()))


        # Test every 100 episodes if agent converged
        if i_episode%100 == 0:
            # Test 10 episodes with greedy policy to see if the agent beats the game
            greedy_score, win = evaluate_score_reinforce(env, agent, n_episodes=10, disp_tqdm=False)
            # If agent wins 10 times, stop the learning
            if win==10:
                scores += [1000 for j in range(i_episode, n_training_episodes)]
                episodes += [j for j in range(i_episode, n_training_episodes)]
                break

    return scores, episodes


def evaluate_score_reinforce(env, agent, n_episodes=100, disp_tqdm=True):
    """ This function enables to evaluate 
        a policy gradient agent post-training"""

    scores = []
    win = 0

    if disp_tqdm:
        iterator = tqdm(range(n_episodes))
    else:
        iterator = range(n_episodes)
    
    for _ in iterator:
        obs = env.reset()
        obs = obs[0]
        done = False
        score = 0

        while not done:
            action = agent.greedy_policy(obs)
            next_observation, reward, done, _, info = env.step(action)
            obs = next_observation
            score = info["score"]

            if score > 1000:
                win += 1
                done = True
                score = np.nan

        scores.append(score)
    return np.nanmean(scores), win

#### Run :

In [None]:
# Hyperparameters
s_size = 2
h_size = 16

n_training_episodes = 500
max_t = 3000
gamma = 0.99
print_every = 100

# Print the device
print("Device: {}".format(device))

# Initialise the environment
env = gym.make('TextFlappyBird-v0', height=15, width=20, pipe_gap=4)

# Initialise the policy
agent = PolicyGradientAgent(s_size, env.action_space.n, h_size).to(device)
optimizer = torch.optim.Adam(agent.parameters(), lr=1e-2)

# Train the policy
scores = reinforce(agent, env, optimizer, n_training_episodes, max_t, gamma, print_every)

# Save the policy
agent.save("./policy_gradient_agent.pth")

print("Training completed")

plt.plot(scores)