### GYM CartPole
- Rewards
    - +1 for every incremental timestep
    - Env terminate if pole falls over or cart moves more than 2.4 units away from center
- Performance (value)
    - Higher value if scenarios run for longer duration, accumulating larger return
- Input
    - (position, velocity, etc.)

### Useful Syntax
- namedtuple: access elements in tuple with "keys"; for example, transition.state=0
- deque: efficient append and pop, fixed-size buffer (useful for replay memory in RL, where older experiences are dorpped as new ones are added, keeping a manageable memory size)
- random.sample(array, n): randomly sample an array of length n from array
- mask: A tensor of Boolean values (True or False) where each position indicates whether the corresponding element in another tensor should be selected or ignored
- torch.cat() concatenates the list of tensors into a single tensor along the first dimension (batch dimension)

### Ideas
#### Replay Memory
During each step of an episode, an agent generates a tuple of experience, often represented as $(\text{state}, \text{action}, \text{next\_state}, \text{reward})$. Replay memory stores these experiences in a buffer, allowing the agent to access and learn from past actions, even if they no longer represent the current state.
#### Neural Net
Minimize:
$$\delta = Q(s,a)-r*\gamma*\max_{a'}Q(s',a')$$
Use Huber loss, which acts like the mean squared error when the error is small, but like the mean absolute error when the error is large; more robust to outliers
$$L(\delta) = \begin{cases}
\frac{1}{2}*\delta^2 & \text{ for } |\delta|\leq 1 \\
|\delta|-\frac{1}{2} & \text{ otherwise }
\end{cases}$$
#### Policy Net
During training, the policy_net is updated after each step or batch to minimize the difference between the Q-values predicted by the network and the target Q-values.
#### Target Net
The key idea is that by keeping this network slightly “behind” the policy_net, it reduces the risk of instability in the training process. If only had a single network, the constantly changing Q-values would lead to feedback loops, destabilizing the learning process.
- Soft Update: Use a small factor, TAU, to slowly blend the weights from policy_net into target_net, making incremental updates.

In [1]:
import gym
import math
import random
import matplotlib.pyplot as plt
from collections import namedtuple, deque
from itertools import count

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

In [7]:
env = gym.make('CartPole-v1')

# if GPU, use it
device = torch.device(
    "cuda" if torch.cuda.is_available() else
    "mps" if torch.backends.mps.is_available() else
    "cpu"
)

In [None]:
Transition = namedtuple('Transition', ('state', 'action', 'next_state', 'reward'))

class ReplayMemory:
    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)
    
    def push(self, *args):
        # save memory of a transition
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        # random sample a batch of transitions
        return random.sample(self.memory, batch_size)
    
    def __len__(self):
        return len(self.memory)

In [15]:
# nn trying to predict expected return (Q-value) of taking each possible action given a state (with features decribing the state)
class DQN(nn.Module):
    def __init__(self, n_observations, n_actions):
        """
        n_observations: number of features describing a state of environment
            e.g., (position, velocity, angle, angular_velocity), n_observations=4
        n_actions: (left, right), n_actions=2
        """
        super(DQN, self).__init__() # constructor of parent class
        self.layer1 = nn.Linear(n_observations, 128)
        self.layer2 = nn.Linear(128, 128)
        self.layer3 = nn.Linear(128, n_actions)

    def forward(self, x):
        x = F.relu(self.layer1(x))
        x = F.relu(self.layer2(x))
        return self.layer3(x)

In [16]:
batch_size = 128
gamma = 0.99
eps_start = 0.9 # epsilon greedy
eps_end = 0.05 # more exploitation in the end
eps_decay = 1000 # exponential decay rate of epsilon, higher means slower decay
tau = 0.005 # update rate for target network
LR = 1e-4 # learning rate
n_actions = env.action_space.n # num of actions
state, info = env.reset() 
n_observations = len(state)

In [17]:
policy_net = DQN(n_observations, n_actions).to(device)
target_net = DQN(n_observations, n_actions).to(device)
target_net.load_state_dict(policy_net.state_dict())

optimizer = optim.AdamW(policy_net.parameters(), lr=LR, amsgrad=True)
memory = ReplayMemory(10000)

In [40]:
steps_done = 0
def select_action(state): # greedy approach
    global steps_done
    sample = random.random() # uniform random number between 0 and 1
    eps_threshold = eps_end + (eps_start - eps_end) * math.exp(-1. * steps_done / eps_decay) # exponential decay eps value
    steps_done += 1
    if sample > eps_threshold:
        with torch.no_grad(): # no need to compute gradients
            """
            t.max(1) returns largest column value of each row
            second column on max result is index of where max element was found
            pick action with the largest expected reward (Q-value)
            """
            return policy_net(state).max(1).indices.view(1,1)
    else:
        # tensor([[0]], device='mps:0')
        return torch.tensor([[env.action_space.sample()]], device=device, dtype=torch.long)

In [41]:
# plotting
episode_durations = []

is_ipython = 'inline' in plt.get_backend()
if is_ipython:
    from IPython import display

plt.ion()

def plot_durations(show_result=False):
    plt.figure(1)
    durations_t = torch.tensor(episode_durations, dtype=torch.float)
    if show_result:
        plt.title('Result')
    else:
        plt.clf()
        plt.title('Training...')
    plt.xlabel('Episode')
    plt.ylabel('Duration')
    plt.plot(durations_t.numpy())
    # Take 100 episode averages and plot them too
    if len(durations_t) >= 100:
        means = durations_t.unfold(0, 100, 1).mean(1).view(-1)
        means = torch.cat((torch.zeros(99), means))
        plt.plot(means.numpy())

    plt.pause(0.001)  # pause a bit so that plots are updated
    if is_ipython:
        if not show_result:
            display.display(plt.gcf())
            display.clear_output(wait=True)
        else:
            display.display(plt.gcf())

In [None]:
def optimize_model():
    """
    performs a single step of the optimization
    first samples a batch, compute Q(st, at) and V(st+1)=max_aQ(st+1, a)
    then compute the loss and backpropagate
    use target network to compute V(st+1) for stability
    target network updated at every step with a soft update with tau
    """
    if len(memory) < batch_size:
        return # not enough samples to train
    
    transitions = memory.sample(batch_size)
    # transpose the batch, converting batch-array of transitions to transition of batch-arrays
    batch = Transition(*zip(*transitions))

    # check if state is final state (i.e., next state is None), store in a boolean tensor
    # mask: tensor of Boolean values, each position indicates whether the corresponding element in another tensor should be selected or ignored
    non_final_mask = torch.tensor(tuple(map(lambda s:s is not None, batch.next_state)), device=device, dtype=torch.bool)
    # gather all next-state values not none
    non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])
    # concatenate state, action, reward tensors in batch to a single tensor
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # computes Q(s_t, a)
    # policy_net generates value for all actions for each state
    # .gather(1, action_batch) selects the Q-value for each specific action taken 
    # (as specified in action_batch) in each state from the output of policy_net(state_batch)
    state_action_values = policy_net(state_batch).gather(1, action_batch)

    # computes V(s_t+1)=max_aQ(st+1, a)
    # will eventually hold the Q-values of the next_state for each transition in the batch
    # terminal states have zero by default
    next_state_values = torch.zeros(batch_size, device=device)
    with torch.no_grad(): # no need for gradient calculation, only need to calculate target values
        """
        target_net(non_final_next_states): passes the non_final_next_states through 
            the target network to compute the Q-values for each non-terminal next state
        max(1).values finds the maximum Q-value across the action dimension (dim 1, based on Transition namedtuple before)
        next_state_values[non_final_mask]: assigns Q-values only to the positions in 
            next_state_values where non_final_mask is True
        """
        next_state_values[non_final_mask] = target_net(non_final_next_states).max(1).values
    expected_state_action_values = (next_state_values * gamma) + reward_batch

    # compute Huber loss
    criterion = nn.SmoothL1Loss()
    # state_action_values is typically a 2D tensor with a shape like (BATCH_SIZE, 1)
    # unsqueeze(1) adds a singleton dimension
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))

