Vanilla Policy Gradient Implementation for Cart Pole V1 (**Shadowing sabrinahirani 's Jupyter Book**)

In [None]:
import random
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import seaborn as sns
import torch
import torch.nn as nn
import torch.nn.functional as F
from typing import *

from torch.distributions import Categorical #unlike Normal, it has discrete probability distribution--best for action--rather than regression tasks/uncertainties

import gym # initiallizing env & agent

environment = gym.make("CartPole-v1")

obs_space = environment.observation_space.shape[0]
action_space = environment.action_space.n

print(f"=== CartPole-v1 Environment ===")
print(f" Observation Space:", obs_space)
print(f" Action Space:", action_space)

agent = Agent(obs_space, action_space)

n_average = 50 # episodes used for avg loss calc. 
threshold_loss = -15 #allowable limit of loss during any traj. 

history = [] #to keep track of losses
n_trajectories = 800 #maximum trajectories 

for i in range(n_trajectories + 1):

    state = environment.reset() # starting new trajectory

    rewards = [] # record sequence of rewards for policy update
    log_probs = [] # record sequence of log_probs for policy update
    entropies = [] # record sequence of entropies for entropy regularization

    T = 1000
    for t in range(1, T):
        
        # select action
        action_selected, log_prob, entropy = agent.action(state)

        # get feedback from environment based on the action selected
        state, reward, done, _ = environment.step(action_selected)
        rewards.append(reward)
        log_probs.append(log_prob)
        entropies.append(entropy)

        # terminate if needed (ie. max trajectories reached || threshold crossed)
        if done:
            break

    # update policy with latest trajectory
    loss = agent.learn(rewards, log_probs, entropies)
    reward = rewards[-1]
    history.append(loss)

    if i % 50 == 0:
        avg_loss = np.mean(history[max(0, i-n_average):(i+1)])
        print(f"Episode: {i:4} | Average Loss: {avg_loss:3.4f} | Current Loss: {loss:3.4f} | Current Reward {reward}")

        if avg_loss <= threshold_loss:

            print(f'Reached loss threshold in {i} episodes')
            break

In [12]:
class Network(nn.Module): #making the policy 
    def __init__(self, n_obs_dim: int, n_action_dim: int, n_hidden_dim = 128): 
        super().__init__() #accessing constructor of Vanilla aka nn.module to make the neural network layers
        self.fc1 = nn.Linear(n_obs_dim, n_hidden_dim) #fully connected linear layer takes inputs from observation dims and outputs the hidden dim values 
        self.fc2 = nn.Linear(n_hidden_dim, n_action_dim) # FCLL that takes inputs from the hidden dim inputs and outputs action dim values
        
    def forward(self, x: torch.Tensor) -> torch.Tensor: 
        """sources: 
        https://medium.com/aimonks/a-comprehensive-guide-to-activation-functions-in-deep-learning-ff794f87c184
        https://lightning.ai/courses/deep-learning-fundamentals/unit-6-overview-essential-deep-learning-tips-tricks/unit-6.4-choosing-activation-functions/"""
        x = F.relu(self.fc1(x)) #function for the hidden dimension
        x = F.softmax(self.fc2(x), dim=1)  #function for output layer (prefered for discrete action spaces)
        return x

Source: https://spinningup.openai.com/en/latest/algorithms/vpg.html#key-equations

The policy's parameters are updated to maximize expected rewards by estimating how sensitive the policy's performance is to changes in its parameters. This update uses the concept of the advantage function, which helps determine whether a particular action was better or worse than expected, guiding the agent to improve its strategy.

In [None]:
class Agent:
    def __init__(self, obs_space: int, action_space: int):
        self.alpha = 1e-3 # learning rate of 0.001 to ensure reasonable optimization size
        self.gamma = 0.99 # long-term reward focus 
        self.eps =  np.finfo(np.float32).eps.item() # to avoid arithmatic problems 
        self.gradient_clipping = 0.5 # for gradient regulation 
        self.entropy_regularizationo = 0.5 # balance between exploration and exploitation
        
        self.policy = Network(obs_space, action_space)
        self.optimizer = torch.optim.AdamW(self.policy.parameters(), lr=self.alpha) # optimizing policy network 
        
    def action(self, state: np.ndarray) -> Tuple[int, torch.Tensor]:
        state = torch.from_numpy(state).float().unsqueeze(0) # converts the input state from a NumPy array to a PyTorch tensor
        m = Categorical(self.policy(statestate)) # creating categorial distribution 
        action_selected = m.sample() #selecting the action based on the distribution 
        return action_selected.item(), m.log_prob(action), m.entropy() # the selected action, log prob. of the action and exploritative v.s exploitative behaviour val
    
    def learn(self, rewards: List[float], log_probs: List[torch.Tensor], entropies: List[torch.Tensor]) -> float:
        ret = 0
        returns = []
        for r in rewards[::-1]: 
            ret = r + self.gamma * ret
            returns.insert(0, ret)
        
        returns = torch.tensor(returns)
        returns = (returns - returns.mean()) / (returns.std() + self.eps)  

        loss = []
        for log_prob, ret, entropy in zip(log_probs, returns, entropies):
            loss.append((-log_prob * ret) - (self.entropy_regularization * entropy)) 
        loss = torch.cat(loss).sum()

        #gradient decent : backpropagation and optimizer step
        self.optimizer.zero_grad()
        loss.backward()
        nn.utils.clip_grad_norm_(self.policy.parameters(), self.gradient_clipping) 
        self.optimizer.step()

        return loss.item()

In [None]:
# initiallizing env & agent

environment = gym.make("CartPole-v1")

obs_space = environment.observation_space.shape[0]
action_space = environment.action_space.n

print(f"=== CartPole-v1 Environment ===")
print(f" Observation Space:", obs_space)
print(f" Action Space:", action_space)

agent = Agent(obs_space, action_space)

n_average = 50 # episodes used for avg loss calc. 
threshold_loss = -15 #allowable limit of loss during any traj. 

history = [] #to keep track of losses
n_trajectories = 800 #maximum trajectories 

for i in range(n_trajectories + 1):

    state = environment.reset() # starting new trajectory

    rewards = [] # record sequence of rewards for policy update
    log_probs = [] # record sequence of log_probs for policy update
    entropies = [] # record sequence of entropies for entropy regularization

    T = 1000
    for t in range(1, T):
        
        # select action
        action_selected, log_prob, entropy = agent.action(state)

        # get feedback from environment based on the action selected
        state, reward, done, _ = environment.step(action_selected)
        rewards.append(reward)
        log_probs.append(log_prob)
        entropies.append(entropy)

        # terminate if needed (ie. max trajectories reached || threshold crossed)
        if done:
            break

    # update policy with latest trajectory
    loss = agent.learn(rewards, log_probs, entropies)
    reward = rewards[-1]
    history.append(loss)

    if i % 50 == 0:
        avg_loss = np.mean(history[max(0, i-n_average):(i+1)])
        print(f"Episode: {i:4} | Average Loss: {avg_loss:3.4f} | Current Loss: {loss:3.4f} | Current Reward {reward}")

        if avg_loss <= threshold_loss:

            print(f'Reached loss threshold in {i} episodes')
            break