# Policy Gradients Methods

(Google Notebook created by me inspired by courses on hugginface)


In this notebook we will implement the reinforce algorithm from scratch, then train and evaluate it the gymnasium environment “LunarLander-v2”.



### Installing and importing libraries

In [None]:
!sudo apt-get update
!apt install swig cmake
!pip3 install gymnasium[box2d] swig

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease
0% [Connecting to archive.ubuntu.com (91.189.91.81)] [Waiting for headers] [Con                                                                               Get:2 http://security.ubuntu.com/ubuntu jammy-security InRelease [110 kB]
0% [Waiting for headers] [2 InRelease 14.2 kB/110 kB 13%] [Connecting to ppa.la                                                                               Hit:3 http://archive.ubuntu.com/ubuntu jammy InRelease
0% [Waiting for headers] [2 InRelease 14.2 kB/110 kB 13%] [Connecting to ppa.la                                                                               Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease
0% [Waiting for headers] [2 InRelease 30.1 kB/110 kB 27%] [Connecting to ppa.la                                                                               Get:5 http://archive.ubuntu.com/u

In [None]:
from collections import deque
from IPython.display import Image
import torch
torch.manual_seed(0) # set random seed
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical
import gymnasium as gym



### Visualizing 🐪

In Google Colab, rendering Gym environments with graphical animations can be a bit tricky due to the limitations of the Colab environment. Therefore we're are rather saving and downloading a .mp4 file of the environment simulation.

In [None]:
from gymnasium.wrappers import RecordVideo

## The enviorment 🎮

The goal is to guide the lander (rocket) to land softly on the designated landing pad while managing limited fuel resources. The agent receives rewards or penalties based on its actions and the success of the landing

Rewards in the enviornment:

- is increased/decreased the closer/further the lander is to the landing pad.
- is increased/decreased the slower/faster the lander is moving
- is decreased the more the lander is tilted (angle not horizontal).
- is increased by 10 points for each leg that is in contact with the ground.
- is decreased by 0.03 points each frame a side engine is firing.
- is decreased by 0.3 points each frame the main engine is firing.

The episode receive an additional reward of -100 or +100 points for crashing or landing safely respectively.

Read more about the environment here: https://gymnasium.farama.org/environments/box2d/lunar_lander/


In [None]:
Image(url='https://gymnasium.farama.org/_images/lunar_lander.gif')


### Observation and Action Space

In [None]:
test_env = gym.make("LunarLander-v2", render_mode='rgb_array') # Create our environment called LunarLander-v2


observation_space = test_env.observation_space
action_space = test_env.action_space # Four discrete actions;  0 = do nothing, 1 = fire left orientation engine, 2 = fire main engine, 3 = fire right orientation engine

observation, info = test_env.reset()

print(f'Observation space: {observation_space.shape[0]}, action space: {action_space.n}')
print(f'Observation {observation}') # 8-dimensional vector of with info about the state

print("\nRandom sampling of action: ", action_space.sample())

Observation space: 8, action space: 4
Observation [-0.00411797  1.4216837  -0.41714048  0.47836992  0.00477868  0.09448856
  0.          0.        ]

Random sampling of action:  0


### Test of random action and video
After running the cell below, you should be able to download a video of the episode from the folder "lunar_videos/random/".

In [None]:
test_env = RecordVideo(test_env, './lunar_videos/random/') # Wrap test_env for video replay

# Then we reset this environment
observation, info = test_env.reset()
for _ in range(1000):
    action = test_env.action_space.sample()   # Take a random action

    observation, reward, terminated, truncated, info = test_env.step(action) # Perform the action in the environment

    # If the game is terminated (in our case we land, crashed) or truncated (timeout)
    if terminated or truncated:
        print("reset")
        break
test_env.close()

  logger.warn(


Moviepy - Building video /content/lunar_videos/random/rl-video-episode-0.mp4.
Moviepy - Writing video /content/lunar_videos/random/rl-video-episode-0.mp4



                                                              

Moviepy - Done !
Moviepy - video ready /content/lunar_videos/random/rl-video-episode-0.mp4
reset




# Reinforce Implementation

In [None]:
env = gym.make("LunarLander-v2", render_mode='rgb_array')
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(device)

cuda:0


## Policy

Given the state (8 dim vector) we want to create a nerual network to output a probability distribution over the 4 possible actions.

In [None]:
class Policy(nn.Module):
  def __init__(self, state_size, action_size, hidden_size):
    super().__init__()
    # SOLUTION
    self.f1 = nn.Linear(state_size, hidden_size)
    self.f2 = nn.Linear(hidden_size, action_size)

  def forward(self, x):
    # SOLUTION
    x = F.relu(self.f1(x))
    x = F.softmax(self.f2(x), dim=0) #  => softmax over cols ([ 0.1385, -0.0157,  0.1634, -0.2826])
    return x

  def select_action(self, state):
    state = torch.from_numpy(state).float().to(device)
    probs = self.forward(state)
    m = Categorical(probs) # makes it possible to use log_prob to implement REINFROCE (differentiable probability distribution)
    action = m.sample() # sample from the distribution
    return action.item(), m.log_prob(action) # Return action and log probability of the action, in order to do the gradient ascent(descent) later



### Test the policy network with a forward pass

In [None]:
state_size = env.observation_space.shape[0]
action_size = env.action_space.n
observation, infor = env.reset(seed=3)
policy = Policy(state_size, action_size, hidden_size=16).to(device)

print(observation)
policy.select_action(observation)

[-1.1074066e-03  1.4124206e+00 -1.1218278e-01  6.6686034e-02
  1.2900012e-03  2.5411118e-02  0.0000000e+00  0.0000000e+00]
self tensor([ 0.1385, -0.0157,  0.1634, -0.2826], device='cuda:0',
       grad_fn=<ViewBackward0>)


(3, tensor(-1.6848, device='cuda:0', grad_fn=<SqueezeBackward1>))

## Reinfroce Algorithm




<figure>
<center>
<img src='https://i.stack.imgur.com/D0K5F.png' />
<figcaption>Pseudocode of Reinforce</figcaption></center>
</figure>


Short recap of Reinforce Algo: *do more of the good actions and less the bad actions.*

This is done by multiplying the graident of the log probability of selecting action **a_t** in state **s_t** with the sum of discounted rewards (G_t) from time **t** (line 7).


Intuitively, if the action leads to a postive discounted cumulative reward, we would take a greater step in the direction of increasing the probability for this action by doing the mentioned multiplication. Likewise the opposite if negative reward.

Tips: We only want to "reinforce" the action with rewards cumulated after taking *that* action

In [None]:
def reinforce(policy, optimizer, n_training_episodes, max_t, gamma, print_every=100):
  """
  Parameters:
    policy (Policy): Our policy model
    optimizer (torch.optim): To perform backpropagation and update step
    n_training_episodes (int): Number of episodes during training
    max_t (int): Max amount of steps in one episode
    gamma (int): Discount factor of future reward
    print_every (int)

  """

  scores = []
  for episode in range(0, n_training_episodes):
    rewards = []
    log_probs = []

    observation, _ = env.reset()

    # Line 4
    for i in range(max_t): # until max timestap, terminated (completed/failure) or truncated
      action, log_prob = policy.select_action(observation) # SOLUTION select an action from the policy
      observation, reward, terminated, truncated, info = env.step(action) # take an action
      rewards.append(reward) # append rewards
      log_probs.append(log_prob)
      env.render()

      if terminated or truncated:
        break

    scores.append(sum(rewards))
    discounted_rewards = deque(maxlen=max_t) # deque with max length (list possible to append in both ends)
    discounted_reward = 0
    # Line 5
    for i in range(len(rewards)):
      discounted_reward = rewards[-i-1] + gamma*discounted_reward # SOLUTION N G_t = r_(t+1) + gammma*G_(t+1)
      discounted_rewards.appendleft(discounted_reward) # SOLTUION  append left [G_T] -> [G_(T-1), G_T]

    # Line 7
    objective = []
    for log_prob, discounted_reward in zip(log_probs, discounted_rewards):
      objective.append(-log_prob * discounted_reward) #
    objective1 = torch.tensor(objective,requires_grad=True).sum()¨

    objective.backward()
    optimizer.step()
    del rewards
    del log_probs

    if episode % print_every == 0:
      print(f'Episode {episode}\t Score: {scores[episode]}')

  #torch.save(policy.state_dict(), './models/reinforce_model.pt')

  return scores

For tips: https://github.com/pytorch/examples/blob/main/reinforcement_learning/reinforce.py

In [None]:
policy = Policy(state_size, action_size, hidden_size=16).to(device)
n_training_episodes = 1
max_t = 1000
gamma = 0.9
learning_rate = 1e-2
optimizer = optim.Adam(policy.parameters(), lr=learning_rate)

In [None]:
scores = reinforce(policy, optimizer, n_training_episodes, max_t, gamma)

# Get a video file of the trained policy 📷

In [None]:
env_eval = gym.make("LunarLander-v2", render_mode='rgb_array')
env_eval = RecordVideo(env_eval, './lunar_videos/reinforce/')

observation, _ = env_eval.reset()
for i in range(1000):
    action, _state = policy.select_action(observation)
    observation, reward, terminated, truncated, inf = env_eval.step(action)
    env_eval.render()
    if terminated or truncated:
        break
env_eval.close()
