In [1]:
import gymnasium as gym
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import random
import numpy as np
from model import DQN
from replay_memory import ReplayMemory, Transition
from collections import deque, namedtuple
import matplotlib.pyplot as plt
import plotly.express as px
import pandas as pd
import plotly.express as px
from itertools import count

In [2]:
env = gym.make('CartPole-v1')
observation, info = env.reset()
print(observation)

[ 0.01351848  0.0061649  -0.02056811 -0.01578395]


In [3]:
device = torch.device(
    "cuda" if torch.cuda.is_available() else
    "mps" if torch.backends.mps.is_available() else
    "cpu"
)
device

In [4]:
n_observation = env.observation_space.shape[0]
n_action = env.action_space.n

In [5]:
policy_net = DQN(n_observation, n_action).to(device)
target_net = DQN(n_observation, n_action).to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()
memory = ReplayMemory(10000)

In [6]:
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 1000
BATCH_SIZE = 128
GAMMA = 0.99
LR = 1e-4
TARGET_UPDATE = 10
TAU = 0.005
steps_done = 0

optimizer = optim.Adam(policy_net.parameters(), lr=LR, amsgrad=True)
num_episodes = 350

In [7]:
def select_action(state):
    global steps_done
    
    # eps threshold decreases as steps increase, making exploitation more likely
    eps_threshold = EPS_END + (EPS_START - EPS_END) * np.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    if np.random.rand() > eps_threshold:
        with torch.no_grad():
            ret_val = policy_net(state).max(1).indices.view(1, 1)
            # print("A. Returning Action Tensor on device: ", ret_val.device)
            return ret_val
    else:
        ret_val = torch.tensor([[env.action_space.sample()]], device=device, dtype=torch.long)
        # print("B. Returning Action Tensor on device: ", ret_val.device)
        return ret_val

In [8]:
def optimize_model(loss_func):
    if len(memory) < BATCH_SIZE:
        # if the memory is not enough, do nothing
        return
    
    transitions = memory.sample(BATCH_SIZE)
    batch = Transition(*zip(*transitions))
    
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)
    
    # used to filter out states that precede terminal states
    non_final_mask = torch.tensor([s is not None for s in batch.next_state], 
                                  device=device, dtype=torch.bool)
    
    # concatenate all non-final states into a single tensor
    non_final_next_states = torch.cat([s for s in batch.next_state if s is not None])
    
    
    # print("Action Batch Size: ", action_batch.size())
    # print("Action Batch Device: ", action_batch.device)
    # use the policy network to predict the Q values of each state in batch
    # We compare the predicted Q values with the Q values obtained from the Bellman equation, to determine the loss
    state_action_values = policy_net(state_batch).gather(1, action_batch)
    
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    
    with torch.no_grad():
        # use the target net to generate the next state values
        # these are used as input into the Bellman equation
        next_state_values[non_final_mask] = target_net(non_final_next_states).max(1).values
    
    # Bellman equation
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch

    criterion = loss_func()
    loss = criterion(state_action_values, expected_state_action_values.unsqueeze(1))
    
    optimizer.zero_grad()
    loss.backward()
    
    #gradient clipping
    # prevents exploding gradients
    torch.nn.utils.clip_grad_norm_(policy_net.parameters(), 100)
    
    optimizer.step()

In [9]:
def custom_reward(state):
    """ 
    Custom reward function for CartPole-v1
    
    By default, reward is 1 for each step taken. When sing default reward function,
    my agent picked up the habit of racing to the right edge of the screen which ended
    training sessions (see models/swing_to_the_right.pth). I created this modified reward function
    which penalizes the model from moving too far from the origin and also included an additional penalty
    for when the pole exceeds 10 degrees from the vertical axis in either direction.
    """

    x, x_dot, theta, theta_dot = state 
    
    pole_penalty = abs(theta)
    cart_penalty = abs(x)
    reward = 1 - (pole_penalty*0.5 + cart_penalty*0.1)
    
    return reward

In [10]:
def training_loop(verbose=True, num_episodes=100, loss_function=nn.SmoothL1Loss):
    if verbose:
        print("Training model with following parameters:")
        print(f"Batch size: {BATCH_SIZE}")
        print(f"Gamma: {GAMMA}")
        print(f"Learning rate: {LR}")
        print(f"Target update: {TARGET_UPDATE}")
        print(f"Replay Memory capacity: {memory.capacity}")
        print(f"Number of episodes: {num_episodes}")
        print(f"EPS_Decay: {EPS_DECAY}")
        print(f"Running on device: {device}")

    scores = []
    for i_episode in range(1, num_episodes+1):
        if verbose and i_episode % (num_episodes / 10) == 0:
            print(f"{i_episode*100 / num_episodes}% complete")
        state, _ = env.reset()
        state = torch.tensor(state, dtype=torch.float32, device=device).unsqueeze(0)

        
        
        count = 0
        while True:
            action = select_action(state)
            next_state, reward, terminated , truncated, _ = env.step(action.item())
            done = terminated or truncated
            # x, _, _ , _ = state.values()
            # print(x)
            # print(next_state)
            # print(reward)
            reward = custom_reward(next_state)
            # print(reward)
            if terminated:
                next_state = None
            else:
                next_state = torch.tensor(next_state, dtype=torch.float32, device=device).unsqueeze(0)
            
            reward = torch.tensor([reward], dtype=torch.float32, device=device)
            
            memory.push(state, action, next_state, reward)
            
            state = next_state    

            optimize_model(loss_function)


            target_net_state_dict = target_net.state_dict()
            policy_net_state_dict = policy_net.state_dict()
            for key in policy_net_state_dict:
                # soft updates of target network parameters
                # originally used hard updates, but soft updates performed much better
                target_net_state_dict[key] = policy_net_state_dict[key]*TAU + target_net_state_dict[key]*(1-TAU)
            target_net.load_state_dict(target_net_state_dict)

            if done:
                scores.append(t+1)
                break
            count += 1

        
    
    torch.save(policy_net.state_dict(), "model.pth")
    print("Model Saved")
    return scores
        

In [11]:
scores = training_loop(num_episodes=num_episodes, loss_function=nn.SmoothL1Loss)

Training model with following parameters:
Batch size: 128
Gamma: 0.99
Learning rate: 0.0001
Target update: 10
Replay Memory capacity: 10000
Number of episodes: 350
EPS_Decay: 1000
Running on device: mps
10.0% complete
20.0% complete
30.0% complete
40.0% complete
50.0% complete
60.0% complete
70.0% complete
80.0% complete
90.0% complete
100.0% complete
Model Saved


In [12]:
df = pd.DataFrame(scores, columns=["Score"])
df["Episode"] = df.index
fig = px.scatter(df, x="Episode", y="Score", trendline='ols', title="Score per Episode")
fig.show()