In [None]:
# Import required libraries

import argparse
import gym
import matplotlib.pyplot as plt
from matplotlib import animation
from IPython.display import HTML
import numpy as np
from itertools import count
from collections import namedtuple

import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.distributions import Categorical

In [None]:
# Set constants for training
seed = 9474
log_interval = 10
gamma = 0.99

env = gym.make('CartPole-v1')
env.reset(seed=seed)
torch.manual_seed(seed)

SavedAction = namedtuple('SavedAction', ['log_prob', 'value'])

num_i_nodes = 4
num_h_nodes = 64
num_o_nodes = 2

In [None]:
class Policy(nn.Module):
    """
    Implements the policy network for REINFORCE with baseline.
    """
    def __init__(self):
        super(Policy, self).__init__()
        self.affine1 = nn.Linear(num_i_nodes, num_h_nodes)
        self.action_head = nn.Linear(num_h_nodes, num_o_nodes)  # Output layer for actions

        # Initialize the weights
        self.init_weights()

        # Action and reward buffer (unused for REINFORCE)
        self.saved_actions = []
        self.rewards = []

    def init_weights(self):
        # Initialize the weights of the linear layers
        nn.init.kaiming_normal_(self.affine1.weight, nonlinearity='relu')
        nn.init.constant_(self.affine1.bias, 0)
        nn.init.kaiming_normal_(self.action_head.weight, nonlinearity='relu')
        nn.init.constant_(self.action_head.bias, 0)

    def forward(self, x):
        """
        Forward pass of the policy network.
        """
        x = F.relu(self.affine1(x))
        action_scores = F.softmax(self.action_head(x), dim=-1)  # Actor: Action probabilities
        return action_scores

In [None]:
# Create NN for value function
class ValueFunction(nn.Module):
    """
    Implements the value function network for REINFORCE with baseline.
    """
    def __init__(self):
        super(ValueFunction, self).__init__()
        self.affine1 = nn.Linear(num_i_nodes, num_h_nodes)
        self.value_head = nn.Linear(num_h_nodes, 1)    # Output layer for value function

        # Initialize the weights
        self.init_weights()

        # State value buffer (unused for REINFORCE)
        self.state_values = []

    def init_weights(self):
        # Initialize the weights of the linear layers
        nn.init.kaiming_normal_(self.affine1.weight, nonlinearity='relu')
        nn.init.constant_(self.affine1.bias, 0)
        nn.init.kaiming_normal_(self.value_head.weight, nonlinearity='relu')
        nn.init.constant_(self.value_head.bias, 0)

    def forward(self, x):
        """
        Forward pass of the value function network.
        """
        x = F.relu(self.affine1(x))
        state_value = self.value_head(x)
        return state_value

In [None]:
model = Policy()
value_model = ValueFunction()
optimizer = optim.Adam(model.parameters(), lr=1e-2)
value_optimizer = optim.Adam(value_model.parameters(), lr=1e-2)

In [None]:
def select_action(state):
    """
    Select an action based on the current state.
    """
    state = torch.from_numpy(state).float()
    probs = model(state)
    state_value = value_model(state)
    m = Categorical(probs)
    action = m.sample()
    model.saved_actions.append(SavedAction(m.log_prob(action), state_value))
    return action.item(), m.log_prob(action), state_value

In [None]:
def calculate_returns(rewards, gamma):
    """
    Calculate the discounted returns for a given episode.
    """
    returns = []
    R = 0
    for r in rewards[::-1]:
        R = r + gamma * R
        returns.insert(0, R)
    return returns

In [None]:
def finish_episode():
    """
    Finish the episode and update the policy.
    """
    R = 0
    policy_loss = []
    value_loss = []
    returns = calculate_returns(model.rewards, gamma)
    returns = torch.tensor(returns)
    # returns = (returns - returns.mean()) / (returns.std() + np.finfo(np.float32).eps.item())

    for saved_action, G, state_value in zip(model.saved_actions, returns, value_model.state_values):
        log_prob, _ = saved_action
        advantage = G - state_value
        policy_loss.append(-log_prob * advantage)
        value_loss.append(F.smooth_l1_loss(state_value, G))

    optimizer.zero_grad()
    value_optimizer.zero_grad()
    policy_loss = torch.stack(policy_loss).sum()
    value_loss = torch.stack(value_loss).sum()
    policy_loss.backward()
    value_loss.backward()
    optimizer.step()
    value_optimizer.step()
    del model.rewards[:]
    del model.saved_actions[:]
    del value_model.state_values[:]


In [None]:
def train(episodes):
    """
    Train the policy network.
    """
    running_reward = 10

    # Keep track of the rewards for plotting
    reward_history = []

    for episode in range(episodes):
        state = env.reset()
        ep_reward = 0

        for t in range(10000):  # Don't infinite loop while learning

            # Select action
            action, log_prob, state_value = select_action(state)

            # Take action
            state, reward, done, _ = env.step(action)

            # Save reward and state value
            model.rewards.append(reward)
            value_model.state_values.append(state_value)

            ep_reward += reward
            
            if done:
                break

        running_reward = 0.05 * ep_reward + (1 - 0.05) * running_reward
        reward_history.append(running_reward)
        finish_episode()
        if episode % log_interval == 0:
            print('Episode {}\tLast reward: {:.2f}\tRunning reward: {:.2f}'.format(episode, ep_reward, running_reward))
        if running_reward > env.spec.reward_threshold:
            print("Solved! Running reward is now {} and the last episode ({}) runs to {} time steps!".format(running_reward, episode, t))
            break

    return reward_history

In [None]:
reward_history = train(1000)

In [None]:
# Plot the reward history

plt.plot(reward_history)
plt.title('Reward history')
plt.xlabel('Episode')
plt.ylabel('Reward')
plt.legend(['Reward'])
plt.show()

In [None]:
# Evaluate the policy using total regret

def total_regret(episodes):
    """
    Evaluate the policy using total regret.
    """
    total_regret = 0

    # Use reward history to calculate total regret
    for reward in reward_history:
        total_regret += env.spec.reward_threshold - reward

    return total_regret

In [None]:
# Hyperparameter tuning

# Set the hyperparameters
hyperparameters = {
    'lr': [1e-2, 1e-3, 1e-4],
    'num_h_nodes': [16, 32, 64]
}

log_interval = 100
episodes = 2000
best_total_regret = float('inf')
best_hyperparameters = {}
regret_storage = []
reward_history_storage = np.zeros((len(hyperparameters['lr']), len(hyperparameters['num_h_nodes']), episodes))

# Loop through the hyperparameters
for lr in hyperparameters['lr']:
    for num_h_nodes in hyperparameters['num_h_nodes']:
        
        # Set the hyperparameters
        model = Policy()
        value_model = ValueFunction()
        optimizer = optim.Adam(model.parameters(), lr=lr)
        value_optimizer = optim.Adam(value_model.parameters(), lr=lr)

        # Train the model
        reward_history = train(episodes)
        #Fill the rest with env.spec.reward_threshold
        reward_history += [env.spec.reward_threshold] * (episodes - len(reward_history))
        reward_history_storage[hyperparameters['lr'].index(lr), hyperparameters['num_h_nodes'].index(num_h_nodes), :] = reward_history

        # Calculate the total regret
        total_regret = total_regret(episodes)

        # Store the total regret
        regret_storage.append([lr, num_h_nodes, total_regret])

        # Check if this is the best total regret
        if total_regret < best_total_regret:
            best_total_regret = total_regret
            best_hyperparameters = {'lr': lr, 'num_h_nodes': num_h_nodes}

In [None]:
# Print the best hyperparameters
print('Best hyperparameters:', best_hyperparameters)
print('Best total regret:', best_total_regret)

In [None]:
# Plot the reward history for the best hyperparameters

plt.plot(reward_history_storage[hyperparameters['lr'].index(best_hyperparameters['lr']), hyperparameters['num_h_nodes'].index(best_hyperparameters['num_h_nodes']), :])
plt.title('Reward history for the best hyperparameters')
plt.xlabel('Episode')
plt.ylabel('Reward')
plt.legend(['lr: {}, num_h_nodes: {}'.format(best_hyperparameters['lr'], best_hyperparameters['num_h_nodes'])])
plt.show()