We'll use the following from PyTorch:

- neural networks(torch.nn)
This module provides tools for building neural networks. It includes a wide range of layer types, such as fully-connected layers, convolutional layers, and recurrent layers, as well as activation functions and loss functions.


- Optimization (torch.optim)
This module provides a range of optimization algorithms for training neural networks. It includes classic optimization algorithms such as Stochastic Gradient Descent (SGD), as well as more advanced algorithms like Adam and RMSProp.

- automatic differentitation (torch.autograd)
This module provides automatic differentiation functionality, which is essential for training neural networks via backpropagation. It enables PyTorch to automatically compute gradients of a loss fuction with respect to all the parameters of the network, allowing optimization algorithms to adjust the parameters in order to minimize the loss.

In [1]:
import gymnasium as gym
import math
import random
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

In [2]:
env = gym.make('CartPole-v1')

In [16]:
is_ipython = 'inline' in matplotlib.get_backend()

if is_ipython:
    from IPython import display

plt.ion()

<contextlib.ExitStack at 0x18987d2c250>

In [17]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# Replay Memory

1. Replay Memory is a technique used in reinforcement learning to store and manage the experiences of an agent during training. The idea is to store the agent's experiences as a sequence of (state, action, reward, next_state) tuples, which are collected as the agent interacts with the environment. During training, these experiments are used to update the agent's policy and value function.

2. The Replay Memory allows the agent to learn from past experiences by randomly sampling a batch of experiences from the memory buffer, rather than just learning from the most recent experience. This helps to reduce the correlation between subsequent experiences, which can improve the stability and convergence of the learning algorithm. In addition, by storing experiences in a buffer, the agent can re-use past experiences to update it's policy and value function multiple times, which can further improve learning efficiency.

3. The Replay Memory is typically implemented as a fixed-size buffer or queue that stores the most recent experiences. When the buffer is full, new experiences overwrite the oldest experiences in the bufer. During training, a batch of experiences is randomly sampled from the buffer and used to update the agent's policy and value function. This process is repeated iteratively untill the agent converges to an optimal policy.



# How to use the concept of Replay Memory to implement DQN algorithm

We'll be using experience replay memory for training our DQN. It stores the transition that the agent observes, allowing us to reuse this data later. By sampling from it randomly, the trasitions that build up a batch are decorrelated. It has been shown that this greatly stabilizes and improves the DQN training procedure. 

For this, we're going to need two classes:
1. Transition - a named tuple representing a single transition in our environment. It essentially maps (state, action) pairs to their (next_state, reward) result, with being the screen difference image as described later on.

2. ReplayMemory - a cyclic buffer of bounded size that holds the transitions observe recently. It also implements a .sample() method for selecting a random batch of transitions for training. 


In [18]:
Transition = namedtuple('Transition',('state', 'action', 'next_state', 'reward'))

In [19]:
class ReplayMemory(object):
    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args): # take states
        """
        Save a transition
        """
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)
        

# DQN Algorithm

DQN (Deep Q-Network) is a reinforcement learning algorithm that uses deep neural networks to approximate the Q-function in a Q-learning algorithm.

The DQN algorithm in the context of the Cartpole environment can be summarized following steps:
1. Initialize the Q-network with random weights.
2. Sample an action using an epsilon-greedy policy, which selects the action with the highest Q-value with probability 1-epsilon and a random action with probability epsilon.
3. Execute the action and observe the next state and reward.
4. Store the experience tuple (state, action, reward, next_state) in a reply buffer.
5. Sample a mini-batch of experiences from the replay buffer.
6. Compute the Q-target values for the mini-batch using the Bellman equation: Q_t = reward + gamma * max_a(Q(next_state, a))
7. Compute the Q-values for the mini-batch using the current Q-network: Q_values Q(state, action).
8. Compute the loss between the Q-values and the Q-target values and update the network parameters using gradient descent.
9. Repeat steps 2-8 for a fixed number of episodes or untill convergence.

The DQN algorithm uses a target network to stabilize the training process. The target network is a copy of the Q-network that is updated less frequently than the Q-network. This helps to prevent the Q-values from oscillating during training.

In the Cartpole environment, the DQN algorithm learns to balance the pole on the cart moving the cart left or right. The Q-network takes the state of the environment as inputs and outputs the Q-values for each possible action. The DQN algorithm learns to maximize the Q-values by updating the Q-network parameters using gradient descent. With enough training, the DQN algorithm can learn to balance the pole on the cart extended periods of time.

Our model will be a convolutional neural netwrok that takes in the difference between the current and previous screen patches. It has two outputs, representing `Q(s, left)`, `Q(s, right)` (where s is the input to the network). In effect, the network is trying to predict the expected return of taking each action given the current input.

In [20]:
class DQN(nn.Module): 
    # Multi-layer perceptron with three layers
    # n_observations is our input (state of the environment) to the network
    # n_action - number of possible actions in the environment
    def __init__(self, n_observations, n_actions):
        super(DQN, self).__init__()
        self.layer1 = nn.Linear(n_observations, 128)
        self.layer2 = nn.Linear(128, 128)
        self.layer3 = nn.Linear(128, n_actions)

    # take input and pass to 3 layer of the neural network
    def forward(self, x):
        x = F.relu(self.layer1(x))
        x = F.relu(self.layer2(x))
        return self.layer3(x)

# Training
During training, the Q-network is updated using the Bellman equation to minimize the mean squared error between the predicted Q-values and the target Q-values. The target Q-values are computed using the Q-network, but with the weights frozen and not updated during the current iteration of training. This helps to stabilize the training process and prevent the network from overfitting to the training data.

`select_action` - will select an action accordingly to an epsilon greedy policy. Simply we'll sometimes use our model for choosing the action, and sometimes we'll just sample one uniformly. The probability of choosing a random action will start at `EPS_START` and will decay exponentially towards `EPS_END`. `EPS_DECAY` controls the rate of the decay.

`plot_durations` - a helper for plotting the durations of episodes, along with an average over the last 100 episodes (the measure used in the official evaluations). The plot underneath the cell containing the main training loop, and will update after every episode.

In [21]:
# number of transitions smaples from the replay buffer
BATCH_SIZE = 128
# GAMMA is the discount factor
GAMMA = 0.99
# EPS_START is the starting value of epsilon
EPS_START = 0.9
EPS_END = 0.05 # Ending value of epsilon
EPS_DECAY = 1000 # controls the rate of exponential decay of epsilon; 
# higher means slower decay
TAU = 0.005 # update rate of the target network
LR = 1e-4 # learning rate of AdamW optimizer

# Adam Optimizer

Adam (Adaptive Moment Estimation) is a popular optimization algorithm that is commonly used in deep learning. It is an extension of stochastic gradient descent (SGD), which is the most basic optimization algorithm used to train neural network. The main idea behind Adam is to combine the advantages of two other optimization techniques, AdaGrad and RMSProp.

In the DQN algorithm, we use the Adam optimizer to update the weights of our neural network based on the gradients of the loss function with respect to the parameters. Specifically, we use the AdamW optimizer, which is a variant of Adam that also incorporates weight decay regularization. Weight decay helps prevent overfitting by adding a penalty to the loss function that is proportional to the magnitude of the weights. By adding this penalty, the optimizer encourages the network to learn simpler and more generalizable representations.

The learning rate (LR) is a hyperparameter that controls the step size taken during the optimization. It is an important parameter to tune, as a high learning rate can cause optimizer to overshoot the optimal weights and lead to divergence, while a low learning rate can result in slow convergence and getting stuck in local minima. In the DQN algorithm, we set the learning rate to 1e-4.

In summary, the AdamW optimizer is a widely used optimization algorithm in deep learning, and it is used in the DQN algorithm to update the weights of the neural network based on the gradients of the loss function with respect to the parameters, while also incorporating weight decay regularization.

In [22]:
# Get number of actions from gym action space
n_actions = env.action_space.n

# Get the number of state observation
state, info = env.reset()

# number of features in the state
n_observations = len(state)

# target net is initialized with same weight as `policy_net`
policy_net = DQN(n_observations, n_actions).to(device)
target_net = DQN(n_observations, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())

# Optimizer - AdamW used to optimize the weights
optimizer = optim.AdamW(policy_net.parameters(), lr=LR)

# It will store the agent's experiences, which will be used for training.
memory = ReplayMemory(10000)

# keep track of number of steps taken by the agent
steps_done = 0

# input - current state and return an action.
def select_action(state):
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * math.exp(-1 * steps_done / EPS_DECAY)
    steps_done += 1

    if sample > eps_threshold:
        with torch.no_grad():
            # t.max(1) will return the largest column value of each row.
            # second column on max result is index of where max element was found
            # so we pick action with the larger expected reward
            return policy_net(state).max(1)[1].view(1, 1)
    else:
        return torch.tensor([[env.action_space.sample]], device=device, dtype=torch.long)

# It is used to keep track of the duration of each episodes
episode_durations = []

# Plot_durations - visualize the training progress of the DQN 
def plot_durations(show_result=False):
    plt.figure(1)
    duration_t = torch.tensor(episode_durations, dtype=torch.Float)
    if show_result:
        plt.title('Result')
    else:
        plt.clf()
        plt.title('Training')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(durations_t.numpy())

    # show the 100 episode moving average of the duration
    if(len(durations_t) >= 100):
        means = durations_t.unfold(0, 1000, 1).mean(1).view(-1, 1)
        means = torch.cat((torch.zeroes(99), means))
        plt.plot(means.numpy())

    plt.pause(0.001) # pause so that plots are updated
    if is_ipython:
        if not show_result:
            display.display(plt.gcf())
            display.clear_output(wait=True)
    else:
        display.display(plt.gcf())

In [23]:
def optimize_mode():
    # check if we have enough sample for mini batch
    if len(memory) < BATCH_SIZE:
        return 

    # extract a mini batch of transition(state, action, reward, next_state) from the replay memory
    transition = memory.sample(BATCH_SIZE)
    # Converts batch-array of Transitions to Transition of batch array
    batch = Transition(*zip(*transition))

    # Compute a mask of non-final states and concatenate the batch elements
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                           batch.next_state)), device=device, dtype=torch.boolean)

    non_final_next_state = torch.cat([s for s in batch.next_state if s is not None])

    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    state_action_policy = policy_net(state_batch).gather(1, action_batch)

    next_state_values = torch.zeroes(BATCH_SIZE, device=device)

    with torch.no_grad():
        next_state_values[non_final_mask] = target_net(non_final_next_state).max(1)[0]

    # Expected Q-values for each transition using the target network
    expected_state_action_values = (next_state_values * gamma) + reward_batch

    # Compute huber loss
    # smooth approximation of the mean square eroor loss less sensitive
    criterion = nn.SmoothL1Loss()
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

    # Optimize the model
    optimizer.zero_grad()
    loss.backward()

    # In-place gradient clipping
    # maximum value = 100 to prevent exploding gradient problem.
    torch.nn.utils.clip_grad_value_(policy.net.parameters(100))
    optimizer.step()
    

In [24]:
if torch.cuda.is_available():
    num_episodes = 600
else:
    num_episodes = 500

for i_episode in range(num_episodes):
    # initialize the environment and get it's state
    state, info = env.reset()
    state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)

    for t in count():
        action = select_action(state)
        observation, reward, terminated, truncated, _ = env.step(action.item())
        reward = torch.tensor([reward], device=device)
        done = terminated or truncated

        if terminated:
            next_state = None
        else:
            next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)

        # store the transition in memory
        memory.push(state, action.next_state, reward)

        # move to the next_state
        state = next_state

        optimize_model()

        target_net_state_dict = target_net.state_dict()
        policy_net_state_dict = target_net.state_dict()

        for key in policy_net_state_dict:
            target_net_state_dict[key] = policy_net_state_dict[key] * TAU + target_net_state_dict * (1 - TAU)

        if done:
            episodes_durations.append(t + 1)
            plot_durations()
            break


print('Complete')
plot_duration(show_result=True)
plt.ioff()
plt.show()

 

TypeError: 'method' object cannot be interpreted as an integer