If you run in colab Stuff

In [1]:
%%capture
!apt install python-opengl
!apt install ffmpeg
!apt install xvfb
!pip install pyvirtualdisplay
!pip install pyglet==1.5.1

In [None]:
# Virtual display
from pyvirtualdisplay import Display

virtual_display = Display(visible=0, size=(1400, 900))
virtual_display.start()

In [None]:
virtual_display.is_alive()

In [None]:
!pip install -r https://raw.githubusercontent.com/huggingface/deep-rl-class/main/notebooks/unit4/requirements-unit4.txt

In [2]:
!pip list

Package                 Version
----------------------- ------------
asttokens               3.0.0
cloudpickle             3.1.1
colorama                0.4.6
comm                    0.2.2
contourpy               1.3.2
cycler                  0.12.1
debugpy                 1.8.14
decorator               5.2.1
executing               2.2.0
filelock                3.13.1
fonttools               4.58.4
fsspec                  2024.6.1
gym                     0.26.2
gym-notices             0.0.8
ipykernel               6.29.5
ipython                 9.4.0
ipython_pygments_lexers 1.1.1
jedi                    0.19.2
Jinja2                  3.1.4
jupyter_client          8.6.3
jupyter_core            5.8.1
kiwisolver              1.4.8
MarkupSafe              2.1.5
matplotlib              3.10.3
matplotlib-inline       0.1.7
mpmath                  1.3.0
nest-asyncio            1.6.0
networkx                3.3
numpy                   2.3.1
packaging               25.0
parso                  

Import libraries

In [1]:
import numpy as np

from collections import deque

import matplotlib.pyplot as plt
%matplotlib inline

# PyTorch
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

# Gym
import gym
# import gym_pygame

from ncps.torch import LTC, CfC



In [2]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cuda:0


Loading the environment

In [3]:
env_id = "CartPole-v1"
# Create the env
env = gym.make(env_id)

# Create the evaluation env
eval_env = gym.make(env_id)

# Get the state space and action space
s_size = env.observation_space.shape[0]
a_size = env.action_space.n

In [4]:
print("_____OBSERVATION SPACE_____ \n")
print("The State Space is: ", s_size)
print("Sample observation", env.observation_space.sample()) # Get a random observation

_____OBSERVATION SPACE_____ 

The State Space is:  4
Sample observation [ 3.5395584e+00  1.0673485e+38 -1.0412774e-01 -3.3007748e+38]


In [5]:
print("\n _____ACTION SPACE_____ \n")
print("The Action Space is: ", a_size)
print("Action Space Sample", env.action_space.sample()) # Take a random action


 _____ACTION SPACE_____ 

The Action Space is:  2
Action Space Sample 0


MLP policy

In [6]:
class Policy(nn.Module):
    def __init__(self, s_size, a_size, h_size):
        super(Policy, self).__init__()
        self.fc1 = nn.Linear(s_size, h_size)
        self.fc2 = nn.Linear(h_size, a_size)

    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = self.fc2(x)
        return F.softmax(x, dim=1)

    def act(self, state):
        state = torch.Tensor(state).unsqueeze(0).to(device)
        probs = self.forward(state).cpu()
        m = Categorical(probs)
        action = m.sample()
        return action.item(), m.log_prob(action)

LTCN policy

In [55]:
class Policy(nn.Module):
    def __init__(self, s_size, a_size, h_size):
        super(Policy, self).__init__()
        self.ltc = CfC(s_size, h_size)
        self.fc2 = nn.Linear(h_size, a_size)
        

    def forward(self, x):
        out, _ = self.ltc(x)
        x = F.relu(out)
        x = self.fc2(x)
        return F.softmax(x, dim=1)

    def act(self, state):
        state = torch.Tensor(state).unsqueeze(0).to(device)
        probs = self.forward(state).cpu()
        m = Categorical(probs)
        action = m.sample()
        return action.item(), m.log_prob(action)

Debug the policies

In [7]:
state= env.reset()
state

(array([-0.01257587, -0.03917114,  0.03150761,  0.01526853], dtype=float32),
 {})

In [9]:
debug_policy = Policy(s_size, a_size, 64).to(device)
state, _ = env.reset()
action, log_prob = debug_policy.act(state)
print(env.step(action))

(array([ 0.01545322,  0.1572115 ,  0.02805962, -0.27274933], dtype=float32), 1.0, False, False, {})


Reinforce algorithm
Psudocode
reinforce
    define the policy $pi_\theta$
    while done:
        Generate an episode following $pi_\theta$
        for t from T-1 to 0:
            calculate G_t
        end for
        calculate J_\theta
        optimise pi_\theta using delta J_\theta
end reinforce


In [10]:
def reinforce(policy, optimizer, n_training_episodes, max_t, gamma, print_every):
    # Help us to calculate the score during the training
    scores_deque = deque(maxlen=100) # to print the average score every 100 episodes
    scores = []
  
    for i_episode in range(1, n_training_episodes+1):
        saved_log_probs = []
        rewards = []
        state, _ = env.reset()
        
        for t in range(max_t):
            action, log_prob = policy.act(state)
            saved_log_probs.append(log_prob)
            state, reward, done, _, _ = env.step(action)
            rewards.append(reward)
            if done:
                break
        scores_deque.append(sum(rewards))
        scores.append(sum(rewards))

        
        returns = deque(maxlen=max_t)
        n_steps = len(rewards)
  
        for t in range(n_steps)[::-1]:
            G_t = (returns[0] if len(returns)>0 else 0)
            returns.appendleft( gamma*G_t + rewards[t])

        ## standardization of the returns is employed to make training more stable
        eps = np.finfo(np.float32).eps.item()  ## eps is the smallest representable float

        # added to the standard deviation of the returns to avoid numerical instabilities
        returns = torch.tensor(returns)
        returns = (returns - returns.mean()) / (returns.std() + eps)

       
        policy_loss = []
        for log_prob, G_t in zip(saved_log_probs, returns):
            policy_loss.append(-log_prob * G_t) # negative sign because we want to maximize the expected return
        policy_loss = torch.cat(policy_loss).sum()

        
        optimizer.zero_grad()
        policy_loss.backward()
        optimizer.step()

        if i_episode % print_every == 0:
            print('Episode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_deque)))

    return scores

Hyperparameters for training

In [11]:
cartpole_hyperparameters = {
    "h_size": 16,
    "n_training_episodes": 300, #1000
    "n_evaluation_episodes": 10,
    "max_t": 1000,
    "gamma": 1.0,
    "lr": 1e-2,
    "env_id": env_id,
    "state_space": s_size,
    "action_space": a_size,
}

In [12]:
# Create policy and place it to the device
cartpole_policy = Policy(cartpole_hyperparameters["state_space"], cartpole_hyperparameters["action_space"], cartpole_hyperparameters["h_size"]).to(device)
cartpole_optimizer = optim.Adam(cartpole_policy.parameters(), lr=cartpole_hyperparameters["lr"])

Training

In [60]:
scores = reinforce(cartpole_policy,
                   cartpole_optimizer,
                   cartpole_hyperparameters["n_training_episodes"],
                   cartpole_hyperparameters["max_t"],
                   cartpole_hyperparameters["gamma"],
                   100)

Episode 100	Average Score: 32.24
Episode 200	Average Score: 176.65
Episode 200	Average Score: 176.65
Episode 300	Average Score: 832.59
Episode 300	Average Score: 832.59


Evaluate the Agent

In [13]:
def evaluate_agent(env, max_steps, n_eval_episodes, policy):
  """
  Evaluate the agent for ``n_eval_episodes`` episodes and returns average reward and std of reward.
  :param env: The evaluation environment
  :param n_eval_episodes: Number of episode to evaluate the agent
  :param policy: The Reinforce agent
  """
  episode_rewards = []
  for episode in range(n_eval_episodes):
    state,_ = env.reset()
    step = 0
    done = False
    total_rewards_ep = 0

    for step in range(max_steps):
      action, _ = policy.act(state)
      new_state, reward, done, info,_ = env.step(action)
      total_rewards_ep += reward

      if done:
        break
      state = new_state
    episode_rewards.append(total_rewards_ep)
  mean_reward = np.mean(episode_rewards)
  std_reward = np.std(episode_rewards)

  return mean_reward, std_reward

In [15]:
evaluate_agent(eval_env,
               cartpole_hyperparameters["max_t"],
               cartpole_hyperparameters["n_evaluation_episodes"],
               cartpole_policy)

(np.float64(20.7), np.float64(14.457178147895943))

Make the video

In [16]:
import imageio
video_env = gym.make(env_id, render_mode='rgb_array')

In [17]:
def record_video(env, policy, out_directory, fps=30):
  """
  Generate a replay video of the agent
  :param env
  :param Qtable: Qtable of our agent
  :param out_directory
  :param fps: how many frame per seconds (with taxi-v3 and frozenlake-v1 we use 1)
  """
  images = []  
  done = False
  state, _ = env.reset()
  img = env.render()
  images.append(img)
  for _ in range(200):
    # Take the action (index) that have the maximum expected future reward given that state
    action, _ = policy.act(state)
    state, reward, done, info, _ = env.step(action) # We directly put next_state = state for recording logic
    img = env.render()
    images.append(img)
    
  imageio.mimsave(out_directory, [np.array(img) for i, img in enumerate(images)], fps=fps)

In [20]:
video_path =  "./temp_files/replay.mp4"
record_video(video_env, cartpole_policy, video_path, 30)

