In [13]:
import gymnasium as gym

import torch 
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim

import numpy as np
import matplotlib.pyplot as plt
from IPython.display import clear_output
from time import sleep

from tqdm.notebook import tqdm
from collections import namedtuple
from collections import deque
import random

## The Cartpole environment

Cartpole is a classic RL control task. A cart lies on a track and has a pole attached to it which can freely rotate. At each time step, the cart may choose to move left or right. The goal is to keep the pole upright for as long as possible. 

### Termination and truncations conditions
The environment terminates if the pole angle is more than 12 degrees from vertical, or if the cart position is more than 2.4 units from the center.
The environment is truncated after 500 time steps, if the pole is still upright.

### Rewards
The agent recieves a reward of +1 at each time step, as long as the pole is upright.

### Observations
At each time point, the RL agent recieves the following observations:
1. Cart position
2. Cart velocity 
3. Pole angle from vertical
4. Pole anglular velocity 

### Action space
The agent can take one of two actions at each time step:
1. Move the cart to the left
2. Move the cart to the right

# DQN algorithm components

The DQN algorithm requires the following components, with the following roles:
1. **Value network**: A neural network that takes in states and outputs predictions for the value of each action in that state
2. **Target network**: A copy of the value network with lagged parameters that is used to compute the target values for the regression targets
3. **Replay buffer**: A buffer that stores transitions (state, action, reward, next_state, terminated) that the agent has experienced. This is used to sample mini-batches of transitions to train the value network
4. **Policies**: These use the value network to select actions in the environment. We will consider two policies:
    - **Greedy policy**: This policy selects the action with the highest value, and is used to evaluate the value network
    - **$\epsilon$-greedy policy**: This policy selects a random action with probability $\epsilon$, and the action with the highest value with probability 1-$\epsilon$. This is used to explore the environment using the training phase. 
   

## The value and target networks

We start by defining the value network architecture. This is a simple feedforward neural network (a **multi-layer perceptron**). 
The input to the network is the state of the environment, and the output is the value prediction of each action.

In [2]:
# Define the Q_net class. This inherits from the nn.Module class

    # Initialise the network using the size of the observation space and the number of actions
    def __init__(self, obs_size, n_actions):
        # Use the nn.Module's __init__ method to ensure that the parameters can be updated during training
        super().__init__()

        # Define the layers of the network
        


    # Define the forward method
    
    

## The replay buffer

To implement the replay buffer, we first define a named tuple data type to store transitions. We then define the replay buffer class, which stores transitions and can sample mini-batches of transitions. The replay buffer has the following methods:
1. **Push**: This method stores a new transition in the buffer, and removes the oldest transition if the buffer is full
2. **Sample**: This method samples a mini-batch of transitions from the buffer

In [3]:
# Define the transition named tuple
# This will be used to store the transitions (state, action, reward, next_state, terminated) in the replay buffer
Transition = namedtuple('Transition', ('state', 'action', 'reward', 'next_state', 'terminated'))

# Define the ReplayBuffer class
class ReplayBuffer:
    # Initialise the buffer with a capacity. 
    def __init__(self, capacity):
        # Use a deque object to implement the buffer with the maxlen parameter set to the capacity
        

    # Define the push method to add a transition to the buffer
    def push(self, state, action, reward, next_state, terminated):
        # Use the append method to add a transition to the buffer


    # Sample a batch of transitions for training 
    def sample(self, batch_size):
        # Use the random.sample method to sample a batch of transitions from the buffer
        

## The DQN agent
We now define the DQN agent class. This class will have:
1. **A value network**, implemented using the Q_net class
2. **A target network**, implemented using the Q_net class
3. **A replay buffer**, implemented using the ReplayBuffer class
4. **An optimiser**, which is used to train the value network

The DQN agent will have the following methods:
1. **Greedy policy**: This method selects the action with the highest value
2. **$\epsilon$-greedy policy**: This method selects a random action with probability $\epsilon$, and the action with the highest value with probability 1-$\epsilon$
3. **Sync**: This method synchronises the target network with the value network by loading the parameters of the value network into the target network

In [4]:
# Define the DQN_agent class

    def __init__(self, obs_size, n_actions, buffer_capacity=______, gamma=____, epsilon=____, lr=____, batch_size=__):
        # Save the hyperparameters (buffer_capacity, gamma, epsilon, lr, batch_size)
        self.buffer_capacity = buffer_capacity
        self.gamma = gamma
        self.epsilon = epsilon
        self.lr = lr
        self.batch_size = batch_size
        
        # Save the number of actions and the observation size
        self.n_actions = n_actions
        self.obs_size = obs_size

        # Define the value network for the agent. 

        # Define the target network for the agent.
        
        # Sync the networks
        
        # Define the replay buffer
        
        # Define the optimiser
        


    # Define the sync method to sync the target network with the value network
    def sync(self):
        # Load the state dict of the value network into the target network


    # Define the greedy policy
    def greedy_policy(self, state):
        # Enter no-gradient mode (since we are not training the network when we sample actions)
        with torch.no_grad():
            # Convert the state to a tensor
            state = torch.tensor(state, dtype=torch.float32)
            # Compute the Q values for the state using the value network
            
            # Find the action with the highest Q value, converting it to a integer
            max_action = q_values.argmax().item()
            # Return the action
            return max_action
            

    # Define the epsilon greedy policy 
    def epsilon_greedy_policy(self, state):
        # Sample a random number uniformly between 0 and 1 using numpy's random method
        rand_num = np.random.random()

        # If the random number is less than the exploration rate, choose a random action
        if rand_num < self.epsilon:
            # Choose a random action using the numpy randint method 
            
            # Return the action
            return action
        
        # Otherwise, choose the action with the highest Q value
        else:
            # Use the greedy policy to choose the action
            
            # Return the action
            return action


## Interacting with the environment
During training, the DQN agent must interact with the environment in order to collect transitions to be added to the replay buffer. We define an **interact** function that takes in the environment, the DQN agent, and the number of time steps to interact for. This function will use the $\epsilon$-greedy policy to select actions, and will store the transitions in the replay buffer. 

In [5]:
# Define the interact method
# This method takes in an agent, an environment, and the number of steps to interact for.
def interact(agent, env, n_steps):
    # Loop over the steps
        
        # Get the current state of the environment
        
        # Choose an action using the epsilon greedy policy
        
        # Take a step in the environment using the action. 
        # This returns the next state, reward, terminated, truncated, and info variables
        
        # Push the transition to the replay buffer

        # Reset the environment if the episode is terminated or truncated
        
            

## Training the DQN agent

We define a **update weights** function that updates the weights of the value network for a DQN agent. This function will:
1. Sample a mini-batch of transitions from the replay buffer
2. Extracts the states, actions, rewards, next_states, and terminated flags from the transitions
3. Uses the target network, rewards, and terminated flags to compute the target values for the value network 
4. Computes the loss between the value network predictions and the target values
5. Runs backpropagation and updates the weights of the value network using that loss

As a reminder, the regression targets for $Q(S,A)$ are given by:
$$ R + \gamma (1-\text{terminated}) \max_{A'} Q(S',A')$$

In [6]:
# This method updates the weights of the value-network of the agent using a batch of transitions
def update_weights(agent):
    # Sample a batch of transitions from the replay buffer
    batch = agent.replay_buffer.sample(agent.batch_size)
    
    # Extract the batch of states as float32 tensors 
    states = torch.tensor([transition.state for transition in batch], dtype=torch.float32)
    # Extract the batch of actions as int64 tensors
    actions = torch.tensor([transition.action for transition in batch], dtype=torch.int64)
    # Extract the batch of rewards as float32 tensors
    rewards = torch.tensor([transition.reward for transition in batch], dtype=torch.float32)
    # Extract the batch of next states as float32 tensors
    next_states = torch.tensor([transition.next_state for transition in batch], dtype=torch.float32)
    # Extract the batch of terminated flags as bool tensors
    terminated = torch.tensor([transition.terminated for transition in batch], dtype=torch.bool)

    # Compute the next_state_values using the target network

    # Take the max over the actions (dim=1) 
    max_next_values = next_state_values.max(dim=1)[0] 
    # Zero out the max-next-values for the terminal states
    
    # Compute the target values
    

    # Compute the q_values for all actions using the value network
    
    # Compute the q_values for the actions that were taken
    q_SA = q_values[torch.arange(q_values.size(0)), actions]
    # Compute the loss using the mean squared error
    

    # Zero the gradients using the optimiser
    
    # Compute the gradients of the loss through backpropagation
    
    # Update the weights of the value network using the gradients
    

    # return the loss (as a float)
    
    

## The training loop for the agent
We now put everything together into a training loop for the DQN agent. This loop will:
1. Interact with the environment for a number of time steps
2. Update the weights of the value network
3. Synchronize the target network with the value network at regular intervals

In [7]:
# Define the train_loop method
def train_loop(agent, env, interactions_per_update, update_steps, sync_delay):
    # Initialise the environment
    

    # Initialise the list of losses
    

    # Gather initial interactions for the experience replay buffer
    

    # Loop over the update_steps
    
        # Interact with the environment


        # Update the weights, appending the loss to the list of losses
        

        # Sync the networks every sync_delay steps
        

    
    # Return the losses
    

## Some helper functions
We define some helper functions to:
1. Evaluate the agent's performance
2. Visualise the agent's performance
3. Plot the (smoothed) losses during training

In [8]:
# Define the evaluate function
def evaluate(agent, env, n_episodes):
    # Initialise the list of rewards
    rewards = []
    
    # Loop over the episodes
    for episode in tqdm(range(n_episodes)):
        # Reset the environment
        env.reset()
        # Get the initial state
        state = env.state
        # Initialise the episode reward
        episode_reward = 0
        
        # Loop over the steps
        while True:
            # Choose the action with the highest Q value
            action = agent.greedy_policy(state)
            # Take the action
            next_state, reward, terminated, truncated, info = env.step(action)
            # Update the state and reward
            state = next_state
            episode_reward += reward
            # Break if the episode has terminated
            if terminated or truncated:
                break
        
        # Append the episode reward to the list of rewards
        rewards.append(episode_reward)
    # Return the mean of the rewards
    return np.mean(rewards)

In [9]:
# Define the visualise function
# This displays the agent's behaviour in the environment for 500 steps.  
def visualise(agent, env, n_steps):
    # Reset the environment
    env.reset()

    # Initialise the list of frames   
    frames = []

    for _ in range(n_steps):
        # Render the environment and store the frame
        frames.append(env.render())

        # Take an action using the greedy policy
        state = env.state
        action = agent.greedy_policy(state)
        next_state, reward, terminated, truncated, info = env.step(action)
        if terminated or truncated:
            env.reset()

    # Display the movie
    for frame in frames:
        clear_output(wait=True)
        plt.imshow(frame)
        plt.show()
        sleep(0.003)

In [10]:
# Plot a smoothed version of the losses for the regression training
def plot_losses(losses):
    # Create a smoothed version of the losses
    smoothed_losses = np.convolve(losses, np.ones(1000)/1000, mode='valid') 
    plt.plot(smoothed_losses)
    plt.xlabel('Update step')
    plt.ylabel('Loss')
    plt.xlim(0, len(smoothed_losses))
    plt.ylim(0, 1.1*max(smoothed_losses))
    plt.show()

# Let's gooooo

We will now train our network using the DQN algorithm and visualise the agent's performance. Have fun!

In [11]:
# Create the environment, setting the render mode to 'rgb_array'

# Reset the environment

# Create the agent


In [2]:
# Evaluate the agent's performance before training


In [None]:
# Visualise the agent's behaviour before training


In [1]:
# Train the agent, saving the losses


In [3]:
# Plot the losses


In [4]:
# Evaluate the agent's performance after training


In [5]:
# Visualise the agent's behaviour after training
