In [None]:
import gymnasium as gym
import math
import random
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

env = gym.make("CartPole-v1")

# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display

plt.ion()

# if GPU is to be used
device = torch.device(
    "cuda" if torch.cuda.is_available() else
    "mps" if torch.backends.mps.is_available() else
    "cpu"
)

print("CUDA available:", torch.cuda.is_available())
print("CUDA version:", torch.version.cuda)
print("Current device:", torch.cuda.current_device())
print("Device name:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "No GPU available")

CUDA available: True
CUDA version: 12.1
Current device: 0
Device name: NVIDIA GeForce GTX 1660 Ti


In [None]:
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))


class ReplayMemory(object):

    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        """Save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)


The Replay memory function is essential to optimize the agent by providing past transition data to ensure efficiency in the agent's learning. It uses a circular buffer to store transition objects which hold the states that the agent sees as it does its actions. Once there are enough transitions to fill a batch, the optimizer randomly samples the transitions to train the agent. This reduces the potential of the agent diverging due to correlation between transitions that are sampled sequentially.

In [None]:
class DQN(nn.Module):
    
    def __init__(self, n_observations, n_actions):
        super(DQN, self).__init__()
        self.layer1 = nn.Linear(n_observations, 128)
        self.layer2 = nn.Linear(128, 128)
        self.layer3 = nn.Linear(128, n_actions)

    # Called with either one element to determine next action, or a batch
    # during optimization. Returns tensor([[left0exp,right0exp]...]).
    def forward(self, x):
        x = F.relu(self.layer1(x))
        x = F.relu(self.layer2(x))
        return self.layer3(x)


The DQN function creates the structure of the DQN. It takes the agent’s current state, passes it through a layer which maps it to 128 and eventually maps it to the size of the actions that the agent can take which in this case should be two, left and right. Between each layer it uses a relu in order to prevent the layers from decomposing into a single layer.

In [None]:
# BATCH_SIZE is the number of transitions sampled from the replay buffer
# GAMMA is the discount factor as mentioned in the previous section
# EPS_START is the starting value of epsilon
# EPS_END is the final value of epsilon
# EPS_DECAY controls the rate of exponential decay of epsilon, higher means a slower decay
# TAU is the update rate of the target network
# LR is the learning rate of the ``AdamW`` optimizer
BATCH_SIZE = 128
GAMMA = 0.99
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 1000
TAU = 0.005
LR = 1e-4

# Get number of actions from gym action space
n_actions = env.action_space.n
# Get the number of state observations
state, info = env.reset()
n_observations = len(state)

policy_net = DQN(n_observations, n_actions).to(device)
target_net = DQN(n_observations, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())

optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)
memory = ReplayMemory(10000)


steps_done = 0


def select_action(state):
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * \
        math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    if sample > eps_threshold:
        with torch.no_grad():
            # t.max(1) will return the largest column value of each row.
            # second column on max result is index of where max element was
            # found, so we pick action with the larger expected reward.
            return policy_net(state).max(1).indices.view(1, 1)
    else:
        return torch.tensor([[env.action_space.sample()]], device=device, dtype=torch.long)


episode_durations = []


def plot_durations(show_result=False):
    plt.figure(1)
    durations_t = torch.tensor(episode_durations, dtype=torch.float)
    if show_result:
        plt.title('Result')
    else:
        plt.clf()
        plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(durations_t.numpy())
    # Take 100 episode averages and plot them too
    if len(durations_t) >= 100:
        means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())

    plt.pause(0.001)  # pause a bit so that plots are updated
    if is_ipython:
        if not show_result:
            display.display(plt.gcf())
            display.clear_output(wait=True)
        else:
            display.display(plt.gcf())

This next part of the code is where they define some of their parameters of the network like their learning rate and their reward decay and some initialization like setting the replay memory capacity. It is also here that they create the policy and target network which is used to help stabilize the policy network’s learning using the q values from the target network whose q values stay stable for longer.

The select action function is where they define their epsilon greedy algorithm. It determines the next action that the agent takes which could either be from the policy that they have learned or a random action. The probablity that the agent takes a random action is decaying by their defined rate of EPS_DECAY (1000). This allows the agent explore its environment when it hasn't learned much but over time when it learns more it will take the action with the highest rewards determined by their policy.

In [None]:
def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE)
    # Transpose the batch (see https://stackoverflow.com/a/19343/3343043 for
    # detailed explanation). This converts batch-array of Transitions
    # to Transition of batch-arrays.
    batch = Transition(*zip(*transitions))

    # Compute a mask of non-final states and concatenate the batch elements
    # (a final state would've been the one after which simulation ended)
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.bool)
    non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None])
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # Compute Q(s_t, a) - the model computes Q(s_t), then we select the
    # columns of actions taken. These are the actions which would've been taken
    # for each batch state according to policy_net
    state_action_values = policy_net(state_batch).gather(1, action_batch)

    # Compute V(s_{t+1}) for all next states.
    # Expected values of actions for non_final_next_states are computed based
    # on the "older" target_net; selecting their best reward with max(1).values
    # This is merged based on the mask, such that we'll have either the expected
    # state value or 0 in case the state was final.
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    with torch.no_grad():
        next_state_values[non_final_mask] = target_net(non_final_next_states).max(1).values
    # Compute the expected Q values
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    # Compute Huber loss
    criterion = nn.SmoothL1Loss()
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

    # Optimize the model
    optimizer.zero_grad()
    loss.backward()
    # In-place gradient clipping
    torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)
    optimizer.step()

This portion of the code is where they do their optimization which runs once there are enough transitions in the replay memory to fill up a batch.


It takes the batches which are randomly sampled and seperate them into state, action, and reward batches. They then compute the Q values and action values for the current state and action batch and store it in state_action_values.

It then finds the expected Q values using the target network and the bellman equation which they explained when going over their DQN algorithm. Then the Huber loss is calculated which is used during back propagation to mimimize their errors.

In [None]:
if torch.cuda.is_available() or torch.backends.mps.is_available():
    num_episodes = 600
else:
    num_episodes = 50

for i_episode in range(num_episodes):
    # Initialize the environment and get its state
    state, info = env.reset()
    state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)
    for t in count():
        action = select_action(state)
        observation, reward, terminated, truncated, _ = env.step(action.item())
        reward = torch.tensor([reward], device=device)
        done = terminated or truncated

        if terminated:
            next_state = None
        else:
            next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)

        # Store the transition in memory
        memory.push(state, action, next_state, reward)

        # Move to the next state
        state = next_state

        # Perform one step of the optimization (on the policy network)
        optimize_model()

        # Soft update of the target network's weights
        # θ′ ← τ θ + (1 −τ )θ′
        target_net_state_dict = target_net.state_dict()
        policy_net_state_dict = policy_net.state_dict()
        for key in policy_net_state_dict:
            target_net_state_dict[key] = policy_net_state_dict[key]*TAU + target_net_state_dict[key]*(1-TAU)
        target_net.load_state_dict(target_net_state_dict)

        if done:
            episode_durations.append(t + 1)
            plot_durations()
            break

print('Complete')
plot_durations(show_result=True)
plt.ioff()
plt.show()

This is where they show how the training episodes have been implementated for the network. For each new episode they find the initial state tensor and use it to determine the action that the agent would respond with using the select action function that was define before. Once the action is completed the observation is used to find the state of the agent as a result of their action and the current transition is stored in the replay memory if the action was not terminated. It then moves to the next state and optimizes the policy model. Then the target network is updated based on TAU which dictates how often it is updated allowing the Q values that the expected action values are computed from can remain semi-stable to increase stability in training, and finally the episode concludes and the next can start.

PROBLEM 2

DQN, short for Deep Q-Network, is a reinforcement learning algorithm where an agent learns to make optimal decisions based on the environment it is interacting with. An agent in this context refers to the “decision maker” that must interact with the given environment in order to achieve a goal such as keeping the car that has the goal of keeping the pole upright. The environment represents the external system where the agent interacts and provides the state (a set of variables describing the system at a given time). 

The agent then chooses an action based on the current state, with the hopes of influencing the environment toward achieving its goal. In the cart-pole problem, the actions are very simple, either moving the cart to the left or right. Each action modifies the environment and leads to the next state. Along with the next state, the agent receives a reward, which is a scalar value that provides feedback on the quality of the action. In the pole car problem for example, the agent earns a reward for every time step the pole remains upright. If the pole falls, the episode ends with a negative or zero reward, meaning failure. 

Over time, the DQN learns to associate specific states with corresponding actions that can maximize the reward in the long term. In the cart-pole problem, this results in the agent learning to balance the pole by correctly moving the cart in response to the position of the and rotation of the pole.


In [None]:
#PROBLEM 3



Problem 4 : 

Frozen Lake (Toy Text) - https://gymnasium.farama.org/environments/toy_text/frozen_lake/

    State:

        Each state is where the agent is in the 4x4 map which is described as currentrow * numberofColumns + currentCollumn

    Action:

        There are 4 valid actions to move left ,right,up, or down

    Environment: 

        the environment is a 4x4 grid. There are a total of 3 types of squares: a regular grid, a frozen lake, and a goal square with the reward changing depending on which tile the agent is interacting with.

    Reward:

        The agent gains a reward when it reaches the goal and no rewards for a regular snow grid or a frozen lake. If the agent steps on a frozen lake then the episode ends same with reaching the goal.

Car Racing (BOX2D) - https://gymnasium.farama.org/environments/box2d/car_racing/#

    State:

        The state would the current position the car is in on the 96x96 image. It would have the car's velocity, grip level for each of its tires, and its current steering angle

    Action:

        There would 5 actions: 2 for left and right steering, one for doing nothing, and two for gas and braking.

    Environment: 

        The car has two surfaces to drive over; grass and track. The track can curve and straighten and the grass would cover the rest of the 96x96 screen. The car would also have to obey the physic engine

    Reward:

        The agent would get a reward for every lap that it accomplishes while touching track tiles and is subtracted by the amount of frames it took to do it. 

Blackjack (Box2D) - https://gymnasium.farama.org/environments/toy_text/blackjack/

    State:

        It would be the player's current sum of their hand which could be two depending on whether or not they have an ace. The state would also include the value of the dealer's card.

    Action:

        There are 2 actions; stand and hit. 

    Environment: 

        The player loses whenever the sum of their cards exceed 21 or when the dealer has a higher sum without going over 21. The dealer would have one card face up and one card face down that they only reveal once the player has either stood or busted. Once they reveal then the dealer would hit until they have at least 17 and if they bust then the player wins. 

    Reward:

        The agent would get a reward every time they win and no reward each time they don't win