In [None]:
%matplotlib inline


Rainbow DQN PyTorch Implemetation
=====================================
**Author**: `Andres Quintela`

Based on the PyTorch DQN tutorial

Arcade Learning Environment on Cartpole

November 2018

In [None]:
import gym
import math
import random
import numpy as np
import matplotlib
import matplotlib.pyplot as plt
from collections import namedtuple
from itertools import count
from PIL import Image

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
import torchvision.transforms as T


env = gym.make('CartPole-v0').unwrapped

# set up matplotlib
is_ipython = 'inline' in matplotlib.get_backend()
if is_ipython:
    from IPython import display

plt.ion()

# if gpu is to be used
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

Expererience replay
-------------

In [None]:
Transition = namedtuple('Transition',
                        ('state', 'action', 'next_state', 'reward'))


class ReplayMemory(object):

    def __init__(self, capacity):
        self.capacity = capacity
        self.memory = []
        self.position = 0
    
    #saving a transition tuple
    def push(self, *args):
        if len(self.memory) < self.capacity:
            self.memory.append(None)
        self.memory[self.position] = Transition(*args)
        self.position = (self.position + 1) % self.capacity
    #sample a random number according to batch size
    def sample(self, batch_size):
        return random.sample(self.memory, batch_size)
    
    def __len__(self):
        return len(self.memory)

Neural Network
-------------

In [None]:
class DQN(nn.Module):
    def __init__(self,input_size,action_size):
        super(DQN, self).__init__()
        
        self.hidden1 = nn.Linear(input_size,16)
        self.hidden2 = nn.Linear(16,16)
        self.output = nn.Linear(16,action_size)

    def forward(self, x):
        x = F.relu(self.hidden1(x))
        x = F.relu(self.hidden2(x))
        return self.output(x.view(x.size(0), -1))

Training
--------

In [None]:
BATCH_SIZE = 128
GAMMA = 0.999
EPS_START = 0.9
EPS_END = 0.05
EPS_DECAY = 200
TARGET_UPDATE = 10



input_size=len(env.reset())
action_size=env.action_space.n



policy_net = DQN(input_size,action_size).to(device)
target_net = DQN(input_size,action_size).to(device)
target_net.load_state_dict(policy_net.state_dict())
target_net.eval()

optimizer = optim.Adam(policy_net.parameters(),lr=0.01)
memory = ReplayMemory(10000)


steps_done = 0


def select_action(state):
    global steps_done
    sample = random.random()
    eps_threshold = EPS_END + (EPS_START - EPS_END) * \
        math.exp(-1. * steps_done / EPS_DECAY)
    steps_done += 1
    if sample > eps_threshold:
        with torch.no_grad():
            return policy_net(state).max(1)[1].view(1, 1)
    else:
        return torch.tensor([[random.randrange(2)]], device=device, dtype=torch.long)

Training loop
--------------

In [None]:
def optimize_model():
    if len(memory) < BATCH_SIZE:
        return
    transitions = memory.sample(BATCH_SIZE)
    # Transpose the batch (see http://stackoverflow.com/a/19343/3343043 for
    # detailed explanation).
    batch = Transition(*zip(*transitions))

    # Compute a mask of non-final states and concatenate the batch elements
    non_final_mask = torch.tensor(tuple(map(lambda s: s is not None,
                                          batch.next_state)), device=device, dtype=torch.uint8)
    non_final_next_states = torch.cat([s for s in batch.next_state
                                                if s is not None])
    state_batch = torch.cat(batch.state)
    action_batch = torch.cat(batch.action)
    reward_batch = torch.cat(batch.reward)

    # Compute Q(s_t, a) - the model computes Q(s_t), then we select the
    # columns of actions taken
    state_action_values = policy_net(state_batch).gather(1, action_batch)

    
    # Double DQN part
    # Compute the action that would be taken according to the policy net on the next state
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    
    next_state_actions = policy_net(non_final_next_states).max(1)[1].view(len(non_final_next_states),1).detach()
    
    next_state_values_temp = target_net(non_final_next_states).gather(1, next_state_actions).detach()
    
    next_state_values = torch.zeros(BATCH_SIZE, device=device)
    
    next_state_values[non_final_mask] =next_state_values_temp.view(1,len(non_final_next_states))

    
    
    # Compute the expected Q values
    expected_state_action_values = (next_state_values * GAMMA) + reward_batch
    expected_state_action_values = expected_state_action_values.view(BATCH_SIZE,1)

    # Compute Huber loss
    loss = F.smooth_l1_loss(state_action_values, expected_state_action_values)

    # Optimize the model
    optimizer.zero_grad()
    loss.backward()
    for param in policy_net.parameters():
        param.grad.data.clamp_(-1, 1)
    optimizer.step()

In [None]:
#n step return memory

N_STEP=3
N_STEP_GAMMA=0.9

class Dynamic_memory(object):
    
    def __init__(self,size):
        self.size=size
        self.memory=[0]*size
        self.R=0
        self.position = 0

    def push(self,arg):
        if self.size > len(self.memory):
            self.memory[self.position] = Transition(*args)
            self.position = (self.position + 1)
        else:
            #append last reward and remove first one so that memory size is always equal to N_STEP
            self.memory.append(arg)
            self.memory.pop(0)
    def pull_reward(self):
        for i in range(self.size):
            self.R += self.memory[self.size-1]*N_STEP_GAMMA**i
        return self.R
    def pull(self):
        print(self.memory[0])
        return self.memory[0]
    def pull_n(self):
        return self.memory[self.size-1]
    def __len__(self):
        return len(self.memory)

Main Loop
----------

In [None]:
num_episodes = 1000
max_t=200  #maximum timesteps per episode

episode_reward=[0]*num_episodes
i_episode_reward=0

steps_done=0


reward_memory=Dynamic_memory(N_STEP)
dynamic_memory_full=False
state_memory=Dynamic_memory(N_STEP)
action_memory=Dynamic_memory(N_STEP)
next_state_memory=Dynamic_memory(N_STEP)


for i_episode in range(num_episodes):
    # Initialize the environment and state
    state=env.reset()
    state=torch.tensor([state],dtype=torch.float,device=device)
    print('episode',i_episode)
    for t in count():
        # Select and perform an action
        if N_STEP > steps_done:
            action = select_action(state)
            next_state, reward, done, _ = env.step(action.item())
            reward = torch.tensor([reward], device=device)
            next_state=torch.tensor([next_state],dtype=torch.float,device=device)
            
            if done:
                next_state=None

           # save transitions in dynamic lists
            reward_memory.push(reward)
            state_memory.push(state)
            action_memory.push(action)
            next_state_memory.push(next_state)
            
            # Move to the next state
            state=next_state
            
            #accumulated reward for each episode
            i_episode_reward += reward.item()
            steps_done += 1 
            
            if done or (t>max_t):
            #save episode reward
                episode_reward[i_episode]=i_episode_reward
                i_episode_reward=0
                break
        
        else:
            action = select_action(state)
            next_state, reward, done, _ = env.step(action.item())
            reward = torch.tensor([reward], device=device)
            next_state=torch.tensor([next_state],dtype=torch.float,device=device)
            
            if done:
                next_state=None
            
            # save transitions in dynamic lists
            reward_memory.push(reward)
            state_memory.push(state)
            action_memory.push(action)
            next_state_memory.push(next_state)
            
            
            #Store the transition in memory (state(t),action(t),next_state(t+n_step),sum(R)) with n-step modifs
            memory.push(state_memory.pull(),action_memory.pull(), next_state_memory.pull_n(), reward_memory.pull_reward())

           # Move to the next state
            state=next_state
            
            #accumulated reward for each episode
            i_episode_reward += reward.item()

            # Perform one step of the optimization (on the target network)
            optimize_model()
            steps_done += 1 

            
            if done or (t>max_t):
            #save episode reward
                episode_reward[i_episode]=i_episode_reward
                i_episode_reward=0
                break
        
    # Update the target network
    if i_episode % TARGET_UPDATE == 0:
        target_net.load_state_dict(policy_net.state_dict())

print('Complete')

In [None]:
plt.plot(episode_reward[:i_episode])
plt.show()