# Exercise: Reinforcement Learning with a Policy Network

![CartPole](https://www.gymlibrary.dev/_images/cart_pole.gif)

[CartPole](https://www.gymlibrary.dev/environments/classic_control/cart_pole/) is a control problem. The goal is to balance a pendulum mounted on a cart.

It's well suited to start with as the state space has only four dimensions

- position of the cart,
- speed of the cart,
- angle of the pendulum, and
- angular velocity of the pendulum

and there are only two possible actions

- push the cart to the right,
- push the cart to the left.


The first exercise is to balance the pendulum with a simple *policy network*.

In [None]:
!pip install jdc
!pip install gymnasium[classic-control]

## Initialization

First we create an *environment*.

In [None]:
import gymnasium as gym
import random 
import torch
import numpy as np
from torch import nn
import torch.nn.functional as F
from torch.distributions import Categorical

from gymnasium import wrappers
import matplotlib.pyplot as plt
from IPython import display
from tqdm.notebook import tqdm

# for split class definitions
import jdc

## Rendering in a Jupyter notebook

Normally you need a "real" application for rendering. We make do here with `matplotlib`.

In [None]:
def render(env, img):
    img.set_data(env.render())
    display.display(plt.gcf())
    display.clear_output(wait=True)

## Baseline: `RandomPolicy`

The following policy simply makes random actions.

In [None]:
class RandomPolicy:
    
    def __call__(self, observation):
        return random.choice([0, 1])
    
    def update(self, *args):
        # Do nothing
        pass
    
    def init_game(self, observation):
        pass
    

In [None]:
def play_game(policy, episodes=2000, do_render = False, seed=100):
    random.seed(seed)
    torch.manual_seed(seed)
    if do_render:
        env = gym.make("CartPole-v1", render_mode="rgb_array")
    else:
        env = gym.make("CartPole-v1")
    observation, info = env.reset(seed=seed)
    policy.init_game(observation)

    if do_render:
        plt.ion()
        plt.axis('off')
        img = plt.imshow(env.render())
   
    status = {}
    episode = 0
    status['steps'] = 0
    status['episode_reward'] = 0
    status['average_reward'] = 0
    total_reward = 0
    

    with tqdm(total=episodes) as pbar:
        pbar.set_postfix(status)
        while True:
            try:
                action = policy(observation)
                observation, reward, terminated, truncated, info = env.step(action)
                status['steps'] += 1
                status['episode_reward'] += reward
                if do_render:
                    render(env, img)
                policy.update(observation, reward, terminated, truncated, info, pbar)

                if terminated or status['steps'] > 1000:
                    episode += 1
                    if episode > pbar.total:
                        break
                    total_reward += status['episode_reward']
                    status['average_reward'] = 0.05 * status['episode_reward'] + (1 - 0.05) * status['average_reward']
                    if status['average_reward'] > env.spec.reward_threshold:
                        print(f"Solved! Running reward is now {status['average_reward']} and "
                              f"the last episode runs to {status['steps']} time steps!")
                        break

                    pbar.set_postfix(status, refresh=episode % 10 == 0)
                    pbar.update()
                    status['steps'] = 0
                    
                    status['episode_reward'] = 0
                    observation, info = env.reset()
                    policy.init_game(observation)

            except KeyboardInterrupt:
                break
    env.close()

In [None]:
policy = RandomPolicy()
play_game(policy, episodes=10, do_render=True)

## Task 1: Policy Network

For our first model, we need a policy network that translates the four-dimesional state into two actions. 
The model should look like this:

1. A `Linear` layer with `hidden_size` as target dimension and `ReLU` as activation function.
2. A `Linear` layer with `n_actions` as target dimension and `Softmax` as activation function used as a `policy_head` 
   (this is a more general approach than the one I used in my presentation). 
   
We use two separate parts because we want to add another output "head" later on for an *actor critic approach*.

In [None]:
class PolicyNetwork(nn.Module):

class PolicyNetwork(nn.Module):

    def __init__(self, hidden_size=32, n_actions=2):
        super().__init__()
        ## YOUR CODE HERE
        
        
    def forward(self, x):
        ## YOUR CODE HERE
        return probs

In [None]:
from collections import namedtuple
SavedAction = namedtuple('SavedAction', ['log_prob'])
    
class SimplePolicy:
    
    def __init__(self, gamma=0.99, lr=5e-3):
        # Two possible actions 0, 1
        self.ACTIONS = [0, 1]       
        self.net = PolicyNetwork()
        self.optimizer = torch.optim.Adam(self.net.parameters(), lr=lr)
        self.mean_reward = None
        self.games = 0
        self.gamma = gamma
        self.eps = np.finfo(np.float32).eps.item()
 

### Strategy

The function `__call__(self, observation)` calculates the action as follows:

- The probabilities `probs` are calculated with `PolicyNet` (here accessible as `self.net`),
- a suitable probability distribution is generated with `torch.distrib.Categorial` and an action is diced with `sample()`,
- in `self.memory` the logarithm of the probability (`m.log_prob(acttion)`) is stored (for later training).

In [None]:
%%add_to SimplePolicy   
      
    def __call__(self, observation):
 
        probs = ### YOUR CODE HERE
        m = ### YOUR CODE HERE
        action = ### YOUR CODE HERE
        
        self.memory.append(SavedAction(m.log_prob(action)))
        
        return self.ACTIONS[action.item()]
        
    def init_game(self, observation):
        self.memory = []
        self.rewards = []
        self.total_reward = 0
        
        
    def discount_rewards(self, r):
        discounted = torch.zeros(len(r))
        summe = 0
        for t in reversed(range(0, len(r))):
            summe = summe * self.gamma + r[t]
            discounted[t] = summe
        return discounted
   

### Update of the model

Training takes place at the end of each game episode:

- First, the *discounted rewards* are calculated,
- these are scaled so that they are normally distributed.
- The loss of the policy is given as `- reward * log_prob`,
- an `optimizer.step()` is performed with the sum of the losses.

In [None]:
%%add_to SimplePolicy  
    
    def update(self, observation, reward, terminated, truncated, info, status):
        self.total_reward += reward
        self.rewards.append(reward)
        if terminated:
            self.games += 1
            if self.mean_reward is None:
                self.mean_reward = self.total_reward
            else:
                self.mean_reward = self.mean_reward * 0.95 + self.total_reward * (1.0 - 0.95)
            
            self.optimizer.zero_grad()
                
            # calculate discounted reward and make it normal distributed
            discounted = []
            R = 0
            for r in self.rewards[::-1]:
                R = r + self.gamma * R
                discounted.insert(0, R)
            discounted = torch.tensor(discounted)
            discounted = (discounted - discounted.mean()) / (discounted.std() + self.eps)
            
            policy_losses = []
            for mem, discounted_reward in zip(self.memory, discounted):
                policy_losses.append(### YOUR CODE HERE ###)
                
            loss = torch.stack(policy_losses).sum()
            ### YOUR CODE HERE
            
            if self.games % 1000 == 0:
                self.save(f"model_{self.games}.pt")
    
    
    def load(self, PATH):
        checkpoint = torch.load(PATH)
        self.net.load_state_dict(checkpoint['model_state_dict'])
        self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        self.games = checkpoint['games']
        self.mean_reward = checkpoint['mean_reward']
        
    def save(self, PATH):
        torch.save({
                    'games': self.games,
                    'model_state_dict': self.net.state_dict(),
                    'optimizer_state_dict': self.optimizer.state_dict(),
                    'mean_reward': self.mean_reward}, PATH)

In [None]:
policy = SimplePolicy()
play_game(policy)

## Task 2: Actor-Critic-Model

For the actor-critic model, our network is given another output `critic`, which is to provide an estimate of the value of a state.
This creates better feedback for the policy and improves training.

The critic head also receives its input from the hidden layer. Since the feedback value can be any number, *there should be no activation function* for the critic head.

In [None]:
class ActorCriticNetwork(nn.Module):

    def __init__(self, hidden_size=128, n_actions=2):
        super().__init__()
        ### YOUR CODE HERE
        
        
    def forward(self, x):
        ### YOUR CODE HERE
        probs = self.policy(x)
        value = self.critic(x)
        return probs, value

In [None]:
from collections import namedtuple
SavedAction = namedtuple('SavedAction', ['log_prob', 'value'])
    
class ACPolicy:
    
    def __init__(self, gamma=0.99, lr=5e-3):
        # Two possible actions 0, 1
        self.ACTIONS = [0, 1]       
        self.net = ActorCriticNetwork()
        self.optimizer = torch.optim.Adam(self.net.parameters(), lr=lr)
        self.mean_reward = None
        self.games = 0
        self.gamma = gamma
        self.eps = np.finfo(np.float32).eps.item()

### Strategy

The strategy now also calculates the value of the state using the network and saves it for training.

In [None]:
%%add_to ACPolicy  

   def __call__(self, observation):
 
        probs, value = ### YOUR CODE HERE
        m = ### YOUR CODE HERE
        action = ### YOUR CODE HERE
        
        self.memory.append(SavedAction(m.log_prob(action), value))
        
        return self.ACTIONS[action.item()]
        
    def init_game(self, observation):
        self.memory = []
        self.rewards = []
        self.total_reward = 0

### Training

The loss function now consists of two parts:

1. The `policy_loss` sums `log_prob * advantage`, where `advantage = discounted_reward - value`. 
   This difference is also called the *temporal difference`.
2. The `value_loss` sums the difference between `value` and discounted reward. 
   Mostly `F.smooth_l1_loss` is used.

In [None]:
  %%add_to ACPolicy  
        
    def update(self, observation, reward, terminated, truncated, info, status):
        self.total_reward += reward
        self.rewards.append(reward)
        if terminated:
            self.games += 1
            if self.mean_reward is None:
                self.mean_reward = self.total_reward
            else:
                self.mean_reward = self.mean_reward * 0.95 + self.total_reward * (1.0 - 0.95)
                
            # calculate discounted reward and make it normal distributed
            discounted = []
            R = 0
            for r in self.rewards[::-1]:
                R = r + self.gamma * R
                discounted.insert(0, R)
            discounted = torch.tensor(discounted)
            discounted = (discounted - discounted.mean()) / (discounted.std() + self.eps)
            
            policy_losses = []
            value_losses = []
            for mem, discounted_reward in zip(self.memory, discounted):
                ### YOUR CODE HERE
               
            self.optimizer.zero_grad()
            ### YOUR CODE HERE
            
            if self.games % 1000 == 0:
                self.save(f"model_{self.games}.pt")
    
    
    def load(self, PATH):
        checkpoint = torch.load(PATH)
        self.net.load_state_dict(checkpoint['model_state_dict'])
        self.optimizer.load_state_dict(checkpoint['optimizer_state_dict'])
        self.games = checkpoint['games']
        self.mean_reward = checkpoint['mean_reward']
        
    def save(self, PATH):
        torch.save({
                    'games': self.games,
                    'model_state_dict': self.net.state_dict(),
                    'optimizer_state_dict': self.optimizer.state_dict(),
                    'mean_reward': self.mean_reward}, PATH)

In [None]:
policy = ACPolicy()
play_game(policy)

In [None]:
play_game(policy, do_render = True)