**dqn_cartpole_pytorch.ipynb**

Wonhee Lee

2024 MAY 05 (SUN)

reference:

https://pytorch.org/tutorials/intermediate/reinforcement_q_learning.html

In [1]:
import gymnasium as gym

In [2]:
import torch

In [3]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


In [4]:
import numpy as np

In [5]:
from collections import namedtuple, deque

# environment setup

In [6]:
env = gym.make("CartPole-v1")

# replay memory setup

In [7]:
import random

In [8]:
Transition = namedtuple("Transition", ("state", "action", "reward", "next_state"))

In [9]:
class Replay_Memory(object):  # object (?)
    def __init__(self, capacity):
        self.memory = deque([], maxlen=capacity)

    def push(self, *args):
        """save a transition"""
        self.memory.append(Transition(*args))

    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

    def __len__(self):
        return len(self.memory)

In [10]:
replay_memory = Replay_Memory(10000)

# Q-network

In [11]:
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

In [12]:
class DQN(nn.Module):
    def __init__(self, num_observations, num_actions):
        super(DQN, self).__init__()

        self.layers = nn.Sequential(
            nn.Linear(num_observations, 128),
            nn.ReLU(0.1),
            nn.Linear(128, 128),
            nn.ReLU(0.1),
            nn.Linear(128, num_actions))

    def forward(self, x):
        return self.layers(x)

# training

## hyperparameters

In [13]:
replay_memory_batch_size = 128
gamma = 0.99  # discount

# epsilon-greedy
epsilon_initial = 0.9
epsilon_final = 0.05
epsilon_decay_rate = 1000

tau = 0.005  # target network update rate
learning_rate = 1E-4

## network setup

In [14]:
num_actions = env.action_space.n

observation, observation_info = env.reset()
num_observations = len(observation)

In [15]:
print(num_actions, num_observations)

2 4


In [16]:
policy_net = DQN(num_observations, num_actions).to(device)
target_net = DQN(num_observations, num_actions).to(device)

target_net.load_state_dict(policy_net.state_dict())  # (?)

<All keys matched successfully>

In [17]:
optimizer = optim.AdamW(policy_net.parameters(), lr=learning_rate, amsgrad=True)

In [31]:
steps_done = 0
def select_action(state):
    """select action based on epsilon-greedy."""
    global steps_done
    random_number = random.random()

    epsilon_threshold = epsilon_final + (epsilon_initial - epsilon_final)*math.exp(-1.*steps_done/epsilon_decay_rate)

    steps_done += 1

    if random_number > epsilon_threshold:
        with torch.no_grad():  # (!)            
            return policy_net(state).max(1).indices.view(1, 1)  # (?)

    else:
        return torch.tensor([[env.action_space.sample()]], device=device, dtype=torch.long)

## plot setup

In [32]:
import plotly.express as px
import plotly.io as pio
pio.renderers.default = 'iframe'

In [1]:
import dash
from dash import Dash, dcc, html

## method

In [38]:
def optimize_model():
    if len(replay_memory) < replay_memory_batch_size:
        return  # (?)

    transitions = replay_memory.sample(replay_memory_batch_size)
    transition_batch = Transition(*zip(*transitions))  # (?)

    # compute a mask of non-final states
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None, transition_batch.next_state)), device=device, dtype=torch.bool)

    non_final_next_states = torch.cat([s for s in transition_batch.next_state if s is not None])
    state_batch = torch.cat(transition_batch.state)
    action_batch = torch.cat(transition_batch.action)
    reward_batch = torch.cat(transition_batch.reward)

    # Q(s_{t}, a)
    state_action_values = policy_net(state_batch).gather(1, action_batch)

    # V(s_{t+1})
    next_state_values = torch.zeros(replay_memory_batch_size, device=device)
    with torch.no_grad():
        next_state_values[non_final_mask] = target_net(non_final_next_states).max(1).values

    expected_state_action_values = reward_batch + gamma*next_state_values

    # compute Huber loss
    loss_function = nn.SmoothL1Loss()  # (?)
    loss = loss_function(state_action_values, expected_state_action_values.unsqueeze(1))

    # optimize
    optimizer.zero_grad() 
    loss.backward()
    torch.nn.utils.clip_grad_value_(policy_net.parameters(), 100)  # in-place gradient clipping
    optimizer.step()

In [39]:
from itertools import count
import math

In [40]:
if torch.cuda.is_available():
    num_episodes = 600
else:
    num_episodes = 50

In [41]:
for episode in range(num_episodes):
    observation, observation_info = env.reset()
    state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)

    for t in count():
        action = select_action(state)
        observation, reward, terminated, truncated, _= env.step(action.item())
        reward = torch.tensor([reward], device=device)
        done = terminated or truncated

        if terminated:
            next_state = None
        else:
            next_state = torch.tensor(observation, dtype=torch.float32, device=device).unsqueeze(0)

        # store transition in replay memory
        replay_memory.push(state, action, reward, next_state)

        # step forward
        state = next_state

        # optimize
        optimize_model()

        # soft update of target network weights (?)
        target_net_state_dict = target_net.state_dict()
        policy_net_state_dict = policy_net.state_dict()
        for key in policy_net_state_dict:
            target_net_state_dict[key] = tau*policy_net_state_dict[key] + (1 - tau)*target_net_state_dict[key]
            target_net.load_state_dict(target_net_state_dict)

        if done:
            # plot
            break 

print("training: complete.")

training: complete.
