# Imports
Importing python libraries necessary to implement DQN and render environment.

In [0]:
!apt-get install -y xvfb python-opengl > /dev/null 2>&1
!pip install gym pyvirtualdisplay > /dev/null 2>&1
!pip install gym[atari]
import gym
import numpy as np
import matplotlib.pyplot as plt
import keras
import random
from IPython import display as ipythondisplay
from pyvirtualdisplay import Display
display = Display(visible=0, size=(400, 300))
display.start()

# Helper Functions
Helper functions used to preprocess the raw pixel frames of the environment.

In [0]:
def grey(img): # Returns greyscale image
  return np.mean(img, axis=2).astype(np.uint8)

def resize(img): # Downsamples the size of a frame by half in both axes
  img = img/255.0
  return img[::2, ::2]

def preprocess(img):
  return grey(resize(img))

Function that returns the value of epsilon given the iteration/time-step

In [0]:
def anneal(t, eps_0):
  slope = (eps_0-0.1)/(-1000000)
  return max(slope*t+eps_0, 0.1)

In [0]:
def initialize_environment():
  env = gym.make('BreakoutDeterministic-v4')
  frame = env.reset()
  return env, frame

Function that returns the index of the maximum value in an numpy array.

In [0]:
def argmax(action_values):
  arg = 0
  mx = -100000000
  for i in range(4):
    if action_values[0][0][i] > mx:
      mx = action_values[0][0][i]
      arg = i
  return arg

Function that returns the optimal (greedy) action to take given the current state.

In [0]:
def optimal_action(Q, state):
  mask = np.ones(shape = (1, 4))
  action_values = Q.predict([[state], [mask]])
  return argmax(action_values)

Function that computes the Q value of all possible actions given a state.

In [0]:
def Q_value(Q, state):
  mask = np.ones(shape = (1, 4))
  action_values = Q.predict([[state], [mask]])
  return np.amax(action_values)

Function that transforms negative/positive rewards to -1 or 1.

In [0]:
def transform(reward):
  return np.sign(reward)

Function that constructs the new state given the old state and a new frame. The new state is the new frame concatenated with the most recent three frames from the previous state.

In [0]:
def get_state(state, new_frame):
  new_state = state
  for i in range(3):
    new_state[:, :, i], new_state[:, :, i+1] = new_state[:, :, i+1], new_state[:, :, i]
  new_state[:, :, 3] = new_frame
  return new_state

# Memory Buffer for Experience Replay
I implemented replay memory as a circular array that supports the following methods:


*   Initialize circular array with fixed size.
*   Append/overwite empty/oldest memory.
*   Return a randomly sampled batch of memories of size n



In [0]:
class Buffer:
  def __init__(self, size):
    self.data = [None] * (size)
    self.end = 0
    self.length = 0
        
  def append(self, element):
    self.data[self.end] = element
    self.end = (self.end + 1) % len(self.data)
    self.length = min(self.length + 1, len(self.data))
    
  def sample(self, n):
    batch = []
    for i in range(n):
      idx = random.randint(0,self.length-1)
      batch.append(self.data[idx])
    return batch

# Q-Network Definition
The definition of the function approximator (Deep Q-Network) as outlined by DeepMind's paper.

In [0]:
def atari_model(n_actions):
  
  frame_shape = (105, 80, 4)

  frames_input = keras.layers.Input(frame_shape, name='frames')
  actions_input = keras.layers.Input((1, n_actions), name='mask')
    
  conv_1 = keras.layers.Conv2D(16, (8, 8), activation='relu', strides=(4, 4))(frames_input)
  conv_2 = keras.layers.Conv2D(32, (4, 4), activation='relu', strides=(2, 2))(conv_1)
  conv_flattened = keras.layers.core.Flatten()(conv_2)
  hidden = keras.layers.Dense(256, activation='relu')(conv_flattened)
  output = keras.layers.Dense(n_actions)(hidden)
  filtered_output = keras.layers.Multiply()([output, actions_input])

  Q = keras.models.Model(input=[frames_input, actions_input], output=filtered_output)
  optimizer = keras.optimizers.RMSprop()
  Q.compile(optimizer, loss='mse')
  
  return Q

# Gradient Decent/Back Propagation
This function samples a batch of 32 random memories, uses the memories to build the feature vector X, and uses the Bellman Identity to compute the values of the labels by feeding the final state of each memory into the Q-Network and adding reward.

In [0]:
def gradient_descent(Q, memory, gamma):
  batch = memory.sample(32)
  # Create 'Label' vector Y and 'Feature' X with Mask
  Y = np.zeros(shape = (32, 1, 4))
  X = np.zeros(shape = (32, 105, 80, 4))
  MASK = np.zeros(shape = (32, 1, 4))
  for i in range(32):
    Y[i][0][batch[i][1]] = batch[i][3]
    if batch[i][4] == 0:
      Y[i][0][batch[i][1]] += gamma*Q_value(Q, batch[i][2])
    X[i, :, :, :] = batch[i][0]
    MASK[i][0][batch[i][1]] = 1
  Q.fit([X, MASK], Y, verbose = 0)

# Main Function
This function is literal translation of the DQN algorithm pseudocode outlined by DeepMind.

In [0]:
def main():
  
  n_actions = 4 # For Atari Breakout
  eps_0 = 0.8
  M = 15 # Number of Episodes
  N = 5000000 # Size of Replay Memory
  T = 10 # Number of Steps per Episode
  gamma = 0.8 # Discount Factor
  iteration = 0 # Counter to keep track of iteration count
  new_state = np.zeros(shape = (105, 80, 4)) # State of the agent/environment - Last 4 frames
  replay_memory = Buffer(N)
  Q = atari_model(n_actions) # Initialize Model
  env = gym.make('BreakoutDeterministic-v4') # Declare Environment - Atari Breakout with 4-frame skipping
  
  for episode in range(M):
    
    # Fill state with first 4 frames - All 4 frames are the initial frame of the environment
    frame = preprocess(env.reset())
    for i in range(4):
      new_state[:, :, i] = frame
    
    for t in range(T):
      state = new_state
      
      # epsilon-greedy
      eps = anneal(iteration, eps_0)
      iteration += 1
      if random.random() < eps:
        action = env.action_space.sample()
      else:
        action = optimal_action(Q, state)
      # Get new frame after performing selected action
      new_frame, reward, game_over, _ = env.step(action)
      
      # Construct new_state using the new frame
      new_state = get_state(state, preprocess(new_frame))
      
      # Make tuple of new memory/experience
      experience = (state, action, new_state, transform(reward), game_over)
      
      # Append tuple to replay memory
      replay_memory.append(experience)
      
      gradient_descent(Q, replay_memory, gamma)
      
      if game_over:
        break
  return Q

In [0]:
Q = main()

# Testing
Helper function to test the trained model Q. It tests it by running the model on a random episode and redering every frame to visually see progress.

In [0]:
env = gym.make('BreakoutDeterministic-v4')
new_state = np.zeros(shape = (105, 80, 4))
frame = preprocess(env.reset())
for i in range(4):
  new_state[:, :, i] = frame
game_over = False
while game_over == False:
  state = new_state
  action = optimal_action(Q, state)
  new_frame, reward, game_over, _ = env.step(action)
  state = get_state(state, preprocess(new_frame))
  screen = env.render(mode = 'rgb_array')
  plt.imshow(screen)
  ipythondisplay.clear_output(wait=True)
  ipythondisplay.display(plt.gcf())