In [1]:
import gym
import tqdm
import random
import numpy as np
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import namedtuple, deque

In [2]:
Transition = namedtuple(
    "Transition",
    ("state", "action", "reward", "next_state", "done")
)

class Memories(object):
    def __init__(self, size):
        self.memory = deque(maxlen=size)
    
    def __add__(self, transition):
        self.memory.append(transition)
    
    def __len__(self):
        return len(self.memory)
    
    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)

In [3]:
class DQN(nn.Module):
    def __init__(self, in_shape, hidden, out_shape):
        super(DQN, self).__init__()
        
        self.lin1 = nn.Linear(in_shape, hidden)
        self.lin2 = nn.Linear(hidden, hidden)
        self.lin3 = nn.Linear(hidden, out_shape)
    
    def forward(self, x):
        x = F.relu(self.lin1(x))
        x = F.relu(self.lin2(x))
        x = self.lin3(x)
        return x

In [4]:
env = gym.make("CartPole-v0")

In [5]:
BATCH_SIZE = 128
GAMMA = 0.999
EPSILON_MAX = 1
EPSILON_MIN = 0.01
EPSILON_DECAY = 0.999
HIDDEN_SIZE = 64

state_shape = env.observation_space.shape[0]
n_actions = env.action_space.n

In [6]:
memory = Memories(size=10000)
policy_net = DQN(state_shape, HIDDEN_SIZE, n_actions)
optimizer = optim.Adam(policy_net.parameters(), lr=5e-3)

In [7]:
def select_action(state):
    if np.random.rand() > EPSILON_MAX:
        with torch.no_grad():
            return policy_net(state).argmax(1).view(1, 1)
    else:
        return torch.tensor([[random.randrange(n_actions)]], dtype=torch.long)

In [8]:
def experience_replay():
    global EPSILON_MAX
    
    if len(memory) < BATCH_SIZE: return
        
    transitions = memory.sample(BATCH_SIZE)
    batch = Transition(*zip(*transitions))

    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    next_state_batch = torch.cat(batch.next_state)
    reward_batch = torch.tensor(batch.reward)

    done_batch = (torch.tensor(batch.done) == False).type(dtype=torch.long)

    predicted_Q_s_a = policy_net(state_batch).gather(1, action_batch)

    expected_Q_s_a = (done_batch * GAMMA * policy_net(next_state_batch).max(1)[0]) + reward_batch

    loss = F.smooth_l1_loss(predicted_Q_s_a, expected_Q_s_a.unsqueeze(1))

    optimizer.zero_grad()
    loss.backward()
    for param in policy_net.parameters():
        param.grad.data.clamp_(-1, 1)
    optimizer.step()
    
    EPSILON_MAX *= EPSILON_DECAY
    EPSILON_MAX = max(EPSILON_MAX, EPSILON_MIN)

In [9]:
reward_threshold = 200

In [None]:
n_episodes = 100000
score_card = deque(maxlen=100)

with tqdm.trange(n_episodes) as t:
    for episode in t:
        episode_reward = 0
        state = env.reset()
        state = torch.tensor([state], dtype=torch.float32)
        for step in range(1000):
            action = select_action(state)
            next_state, reward, done, _ = env.step(action.item())
            
            next_state = torch.tensor([next_state], dtype=torch.float32)
            
            memory + Transition(state, action, reward, next_state, done)

            state = next_state            
            experience_replay()
            episode_reward += reward
            
            if done: break
        
        score_card.append(episode_reward)
        running_reward = np.sum(score_card)/100
        
        t.set_description(f'Episode {episode+1}')
        t.set_postfix(
            episode_reward=episode_reward, running_reward=running_reward
        )

        if running_reward >= reward_threshold:
            break
print("Done")

Episode 343:   0%|          | 343/100000 [01:25<12:25:09,  2.23it/s, episode_reward=200, running_reward=178]