Reference: https://huggingface.co/learn/deep-rl-course/unit4/hands-on

In [None]:
!pip install -q git+https://github.com/ntasfi/PyGame-Learning-Environment.git
!pip install -q git+https://github.com/simoninithomas/gym-games

!pip install -q imageio-ffmpeg
!pip install -q pyyaml==6.0

In [14]:
import numpy as np

from collections import deque

import matplotlib.pyplot as plt
%matplotlib inline

# PyTorch
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
from tqdm import tqdm
import seaborn as sns

# Gym
import gym # We are using gym instead of gymnasium for compatibility with gym-pygame
import gym_pygame

import imageio

import warnings
warnings.filterwarnings("ignore")

In [None]:
# Set device

DEVICE = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"You are on {DEVICE}")

You are on cuda:0


### Environment - PixelCopter
Please check the [documentation](https://colab.research.google.com/corgiredirector?site=https%3A%2F%2Fpygame-learning-environment.readthedocs.io%2Fen%2Flatest%2Fuser%2Fgames%2Fpixelcopter.html) for the specifics

Observation Space (7,):
* player y position
* player velocity
* player distance to floor
* player distance to ceiling
* next block x distance to player
* next blocks top y location
* next blocks bottom y location

Action Space (2,):
* Accelerate up
* Idle

Reward Function:
For each vertical block it passes through it gains a positive reward of +1. Each time a terminal state reached it receives a negative reward of -1.

In [None]:
env_id = "Pixelcopter-PLE-v0"
env = gym.make(env_id)
eval_env = gym.make(env_id)
s_size = env.observation_space.shape[0]
a_size = env.action_space.n

print("_____OBSERVATION SPACE_____ \n")
print("The State Space is: ", s_size)
print("Sample observation", env.observation_space.sample()) # Get a random observation

print("\n _____ACTION SPACE_____ \n")
print("The Action Space is: ", a_size)
print("Action Space Sample", env.action_space.sample()) # Take a random action

couldn't import doomish
Couldn't import doom
_____OBSERVATION SPACE_____ 

The State Space is:  7
Sample observation [ 1.59613    -1.6824756  -0.5159761   0.59397095  1.3134164   0.47040668
  0.67635375]

 _____ACTION SPACE_____ 

The Action Space is:  2
Action Space Sample 0


In [None]:
# Define our MLP Policy

class Policy(nn.Module):
    def __init__(self, s_size, a_size, h_size):
        super(Policy, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(s_size, h_size),
            nn.ReLU(),
            nn.Linear(h_size, h_size*2),
            nn.ReLU(),
            nn.Linear(h_size*2, a_size)
        )

    def forward(self, x):
        x = self.fc(x)
        return F.softmax(x, dim=1)

    def act(self, state):
        state = torch.from_numpy(state).float().unsqueeze(0).to(DEVICE)
        probs = self.forward(state).cpu()
        m = Categorical(probs) # for easier sampling and log_prob calculation
        action = m.sample()
        return action.item(), m.log_prob(action)

In [None]:
# Define the REINFORCE algorithm

def reinforce(policy, optimizer, n_training_episodes, max_t, gamma, print_every):
    # Help us to calculate the score during the training
    scores_deque = deque(maxlen=100)
    scores = []

    for i_episode in tqdm(range(1, n_training_episodes+1)):
        saved_log_probs = []
        rewards = []
        state = env.reset()

        for t in range(max_t):
            action, log_prob = policy.act(state)
            saved_log_probs.append(log_prob)
            state, reward, done, _ = env.step(action)
            rewards.append(reward)
            if done:
                break
        scores_deque.append(sum(rewards))
        scores.append(sum(rewards))

        # Calculate the return
        returns = deque(maxlen=max_t)
        n_steps = len(rewards)

        for t in range(n_steps)[::-1]:
            disc_return_t = (returns[0] if len(returns)>0 else 0)
            returns.appendleft(gamma*disc_return_t + rewards[t])

        ## standardization of the returns is employed to make training more stable
        eps = np.finfo(np.float32).eps.item()
        ## eps is the smallest representable float, which is
        # added to the standard deviation of the returns to avoid numerical instabilities
        returns = torch.tensor(returns)
        returns = (returns - returns.mean()) / (returns.std() + eps)

        policy_loss = []
        for log_prob, disc_return in zip(saved_log_probs, returns):
            policy_loss.append(-log_prob * disc_return)
        policy_loss = torch.cat(policy_loss).sum()

        optimizer.zero_grad()
        policy_loss.backward()
        optimizer.step()

        if i_episode % print_every == 0:
            print('Episode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_deque)))

    return scores

def evaluate_agent(env, max_steps, n_eval_episodes, policy):
  """
  Evaluate the agent for ``n_eval_episodes`` episodes and returns average reward and std of reward.
  :param env: The evaluation environment
  :param n_eval_episodes: Number of episode to evaluate the agent
  :param policy: The Reinforce agent
  """
  episode_rewards = []
  for episode in range(n_eval_episodes):
    state = env.reset()
    step = 0
    done = False
    total_rewards_ep = 0

    for step in range(max_steps):
      action, _ = policy.act(state)
      new_state, reward, done, info = env.step(action)
      total_rewards_ep += reward

      if done:
        break
      state = new_state
    episode_rewards.append(total_rewards_ep)
  mean_reward = np.mean(episode_rewards)
  std_reward = np.std(episode_rewards)

  return mean_reward, std_reward

In [None]:
pixelcopter_hyperparameters = {
    "h_size": 64,
    "n_training_episodes": 25000, # 50000
    "n_evaluation_episodes": 10,
    "max_t": 10000,
    "gamma": 0.99,
    "lr": 1e-4,
    "env_id": env_id,
    "state_space": s_size,
    "action_space": a_size,
}
torch.manual_seed(50)
pixelcopter_policy = Policy(pixelcopter_hyperparameters["state_space"], pixelcopter_hyperparameters["action_space"], pixelcopter_hyperparameters["h_size"]).to(DEVICE)
pixelcopter_optimizer = optim.Adam(pixelcopter_policy.parameters(), lr=pixelcopter_hyperparameters["lr"])

In [None]:
scores = reinforce(pixelcopter_policy,
                   pixelcopter_optimizer,
                   pixelcopter_hyperparameters["n_training_episodes"],
                   pixelcopter_hyperparameters["max_t"],
                   pixelcopter_hyperparameters["gamma"],
                   1000)

In [None]:
evaluate_agent(env, max_steps=10000, n_eval_episodes=100, policy=pixelcopter_policy)

In [None]:
plt.figure(figsize=(10, 6))
ax = plt.gca()

# Plot the reward curve
plt.plot(scores, label='Reward Curve', linestyle='-')

# Add labels and a title
plt.xlabel('Episode')
plt.ylabel('Reward')
plt.title('Reward Curve')

# Add gridlines
plt.grid(True)

# Customize Seaborn style
sns.set(style="whitegrid")

# Show the legend
plt.legend()

# Show the plot
plt.show()

### Check the rendered video

In [16]:
def record_video(env, policy, out_directory, fps=30):
  """
  Generate a replay video of the agent
  :param env
  :param Qtable: Qtable of our agent
  :param out_directory
  :param fps: how many frame per seconds (with taxi-v3 and frozenlake-v1 we use 1)
  """
  images = []
  done = False
  state = env.reset()
  img = env.render(mode='rgb_array')
  images.append(img)
  while not done:
    # Take the action (index) that have the maximum expected future reward given that state
    action, _ = policy.act(state)
    state, reward, done, info = env.step(action) # We directly put next_state = state for recording logic
    img = env.render(mode='rgb_array')
    images.append(img)
  imageio.mimsave(out_directory, [np.array(img) for i, img in enumerate(images)], fps=fps)

In [17]:
record_video(env, pixelcopter_policy, "replay.mp4")