a very simple example to show how to implement deep q networks using pytorch, only require gym and pytorch installed, no external files or other libraries, everything needed to work is contained within this notebook

-freddy chua

In [None]:
import gym
from gym import wrappers
import torch
import torch.nn as nn
import torch.nn.init
import torch.nn.functional as F
from collections import namedtuple
from torch.autograd import Variable
import random

In [None]:
env = gym.make('CartPole-v0')
env = wrappers.Monitor(env, 'cartpole', force=True)

In [None]:
# try implementing dqn

# the action reward value function can be represented by a mlp
class Mlp(nn.Module):
  def __init__(self, input_size, output_size):
    super(Mlp, self).__init__() # this statement is always needed
    
    self.fc1 = nn.Linear(input_size, 10) # matrix multiplication
    self.fc2 = nn.Linear(10, output_size) # matrix multiplication
    
    # == parameters initialization ==
    nn.init.xavier_normal(self.fc1.weight)
    nn.init.xavier_normal(self.fc2.weight)
    
    nn.init.normal(self.fc1.bias)
    nn.init.normal(self.fc2.bias)
    # =============================== 
    
  def forward(self, x):
    x = F.relu(self.fc1(x))
    x = self.fc2(x)
    return x
  
  # no backward function needed, awesome!
# end class

In [None]:
# the memory
Event = namedtuple('Event', ['state', 'action', 'next_state', 'reward'])

class Memory(object):
  def __init__(self, capacity):
    self.capacity = capacity
    self.idx = 0
    self.mem = []

  def add_event(self, event):
    if len(self.mem) < self.capacity:
      self.mem.append(event)
    else:
      self.mem[self.idx] = event
    self.idx = (self.idx + 1) % self.capacity
  
  def sample(self, batch_size):
    return random.sample(self.mem, batch_size)

# end class

In [None]:
input_size = env.observation_space.shape[0]
output_size = env.action_space.n
print('input_size = {0}, output_size = {1}'.format(input_size, output_size))

In [None]:
# create 2 Q-network

eval_Q   = Mlp(input_size, output_size)
target_Q = Mlp(input_size, output_size)
target_Q.load_state_dict(eval_Q.state_dict()) # set them to be similar

In [None]:
epsilon = 1.0 # the exploration decision parameter, will decay over time
batch_size = 100 # for batch processing, larger batch size -> faster computation
gamma = 0.99 # the parameter for discounting future rewards
decay = 0.999
C = 5 # the time delay in updating target_Q

In [None]:
optimizer = torch.optim.RMSprop(eval_Q.parameters()) # RMSprop for learning eval_Q parameters
criterion = nn.MSELoss() # mean squared error, similar to least squared error

In [None]:
replay_memory = Memory(10000) # create a replay memory of capacity 10
top_score = 0
c = 0
for i in range(1000):
#   print('episode: {0}'.format(i+1))
  current_state = env.reset() # an array of 4 values
  done = False
  episode_reward = 0
  while not done:
    if random.random() < epsilon:
      # perform random action to explore the search space
      action = env.action_space.sample()
    else:
      # choose action with highest value
      state_tensor = torch.Tensor(current_state) # convert current_state into a torch tensor
      state_tensor = state_tensor.unsqueeze_(0) # unsqueeze to allow for batch processing
      # convert to a autograd Variable for automatic backpropagation
      state_tensor = Variable(state_tensor, volatile=True) # volatile is True for inference only
      action_values = eval_Q(state_tensor) # forward
      
      _, action = torch.max(action_values, 1)
      action = action.data[0,0]
    # end if
    next_state, reward, done, _ = env.step(action)
    episode_reward += reward
    if done:
      replay_memory.add_event(Event(current_state.copy(), action, None, reward))
    else:
      replay_memory.add_event(Event(current_state.copy(), action, next_state.copy(), reward))
    # end if
    current_state = next_state
    
    # train
    if len(replay_memory.mem) >= batch_size:
      # sample from replay memory
      mini_batch = replay_memory.sample(batch_size)
      mini_batch = Event(*zip(*mini_batch)) # do this for batch processing
      
      # calculate the estimated value
      estimated_value = eval_Q(Variable(torch.Tensor(mini_batch.state)))
      # select the value associated with the action taken
      estimated_value = estimated_value.gather(1, Variable(torch.LongTensor(mini_batch.action).unsqueeze_(1)))
      
      # calculate the actual value
      mask = torch.ByteTensor(tuple(map(lambda s: s is not None, mini_batch.next_state)))
      target_val = target_Q(Variable(torch.Tensor([
        next_state for next_state in mini_batch.next_state if next_state is not None])))
      target_val, _ = torch.max(target_val, 1)
      
      targetted_value = Variable(torch.zeros(batch_size, 1))
      targetted_value[mask] = gamma * target_val
      targetted_value += Variable(torch.Tensor(mini_batch.reward).unsqueeze_(1))
      
      # compute the loss between estimated value and actual value
      optimizer.zero_grad()
      loss = criterion(estimated_value, targetted_value.detach())      
      loss.backward()
      optimizer.step() # do a gradient descent on it
      
      c += 1
      if c == C:
        c = 0
        target_Q.load_state_dict(eval_Q.state_dict())
        epsilon = epsilon * decay
      # end if
    # end if
    
  # end while
  print('episode {0} reward = {1}, epsilon = {2:3g}'.format(i, episode_reward, epsilon))
  top_score = max(top_score, episode_reward)
# end for
print('top_score = {0}'.format(top_score))

In [None]:
env.render(close=True)
env.close()