In [1]:
# Import dependencies
import matplotlib.pyplot as plt
import seaborn as sns
import gym
import random as rn
import numpy as np
import tensorflow as tf
from tensorflow.losses import huber_loss
from datetime import datetime
from keras.models import Sequential
from keras.optimizers import RMSprop
from keras.layers import Conv2D, Dense, Flatten
from matplotlib import animation
from JSAnimation.IPython_display import display_animation
import os

# Set randomization defaults for solution reproducability
np.random.seed(21)
rn.seed(21)
tf.set_random_seed(21)

# Set visualization defaults
sns.set_context('notebook')
plt.style.use('seaborn-darkgrid')
%matplotlib inline 

# Set input directory from model training, ouput director for storing visualizations
INPUT_DIR = 'DDQN_model_output/'
OUTPUT_DIR = 'Assets/'
if not os.path.exists(OUTPUT_DIR):
    os.makedirs(OUTPUT_DIR)

Declare `ReplayMemory` and `DDQNAgent` classes (mimicing earlier code); define image processing and animation helper functions.

In [2]:
# Define helper function for preprocessing mimage
def process_image(obs):
    img = obs[12:195:2, ::2] # Trim image to playable display domain, downsample
    img = img.mean(axis = 2) # Convert image to greyscale
    img = img.astype(np.uint8)  # Cast pixel values as integers
    img = img.reshape((1,) + img.shape + (1,)) # Reshape image to 4D array of dimensions 1 x H x L x 1 for Keras compatibility
    return img # return processed image array

# Declare Replay buffer python class
class ReplayMemory:
    def __init__(self, maxlen):
        self.maxlen = int(maxlen)
        self.buf = np.empty(shape = self.maxlen, dtype = np.object)
        self.index = 0
        self.length = 0
        
    def append(self, data): # Define method for storing samples in replay memory buffer
        self.buf[self.index] = data
        self.length = min(self.length + 1, self.maxlen)
        self.index = (self.index + 1) % self.maxlen # If `maxlen` is exceeded, replace prior memory samples with new observations
    
    def sample(self, batch_size, with_replacement = True): # Define method for randomly sampling replay memory; default to sampling with replacement (faster than alternative)
        if with_replacement:
            indices = np.random.randint(self.length, size = batch_size)
        else:
            indices = np.random.permutation(self.length)[:batch_size]
        return self.buf[indices]

# Define helper function for randomly sampling from memory buffer of class `ReplayMemory`
def sample_memories(memory_store, batch_size):
    cols = [[], [], [], [], []]
    for memory in memory_store.sample(batch_size):
        for col, value in zip(cols, memory):
            col.append(value)
    cols = [np.array(col) for col in cols]
    return cols[0], cols[1], cols[2].reshape(-1, 1), cols[3], cols[4].reshape(-1, 1)

# Declare Double Deep-Q Learning Network Agent class
class DDQNAgent:
    def __init__(self, image_shape, action_space, learning_rate, rho, optimizer_epsilon, replay_memory, gamma, epsilon_max, epsilon_min, epsilon_steps):
        self.image_shape = image_shape
        self.action_space = action_space
        self.learning_rate = learning_rate
        self.rho = rho
        self.optimizer_epsilon = optimizer_epsilon
        self.memory = replay_memory
        self.gamma = gamma
        self.epsilon_max = epsilon_max
        self.epsilon_min = epsilon_min
        self.epsilon_decay = (epsilon_max - epsilon_min)/epsilon_steps # set step size for decay from `epsilon_max` to `epsilon_min` over `epsilon_step` steps
        self.ε = epsilon_max # set initial ε-greedy learning `ε` parameter value equal to `epsilon_max`
        self.model, self.target_model = self._build_model() # instantiate core model, target model (same initial weights)
    
    def _build_model(self): # Define ANN architecture for Deep-Q Network Learning:
        model = Sequential()
        
        model.add(Conv2D(32, 8, strides = 4, padding = "valid", activation = "relu", input_shape = self.image_shape, data_format = "channels_last"))
        model.add(Conv2D(64, 4, strides = 2, padding = "valid", activation = "relu"))
        model.add(Conv2D(64, 3, strides = 1, padding = "valid", activation = "relu"))
        model.add(Flatten())
        model.add(Dense(512, activation = "relu"))
        model.add(Dense(self.action_space))
        
        model.compile(loss = huber_loss, optimizer = RMSprop(lr = self.learning_rate, rho = self.rho, epsilon = self.optimizer_epsilon))
        return model, model

    def act(self, state): # Define ε-greedy learning action method
        
        ## Apply image processing helper function to current environment state; convert image pixel values to [0,1]
        state_proc = process_image(state)
        state_scaled = state_proc/255
        
        ## Perform action based on ε value: 
        ε = max(self.epsilon_min, self.ε)
        
        ### If Agent acting randomly, select random action from available action-space
        if np.random.rand() < ε:
            return rn.randrange(self.action_space)
        
        ### If Agent not acting randomly, choose action based on policy of selecting action yielding highest Q-value
        Q_value = self.model.predict(state_scaled)[0] 
        return np.argmax(Q_value)
         
    def remember(self, state, action, reward, next_state, done): # Define method for appending agent experiences to memory buffer
        self.memory.append((state, action, reward, next_state, done)) 
       
    def train(self, batch_size): # Define method for Agent to traing DNN with experiences sampled from memory buffer
        ## Extract minibatch samples from memory buffer
        state, action, reward, next_state, done = sample_memories(self.memory, batch_size)
        
        ## Convert current, next state observation pixel values to [0,1]
        state_scaled = state/255
        next_state_scaled = next_state/255
        
        ## Update best Q value for each observation within minibatch
        for state_scaled, action, reward, next_state_scaled, done in zip(state_scaled, action, reward, next_state_scaled, done):
            
            ### Predict Q-values for current state
            Q_values = self.model.predict(state_scaled) 
            
            ### Update Q-value for selected action
            target = reward ### If training episode is complete, set Q-value for selected action equal to present reward
            if not done: ### If episode is ongoing, set Q-value for selected action discounted future Q-value predicted using target model
                target = reward + self.gamma * np.amax(self.target_model.predict(next_state_scaled)[0]) 
    
            Q_values[0][action] = target
            
        ## Fit online model on minibatch to optimize quality of Q-value prediction for selected action; update ε-greedy learning parameter value
        history = self.model.fit(state_scaled, Q_values, batch_size = batch_size, epochs = 1, verbose = 0)
        
        if self.ε > self.epsilon_min:
            self.ε -= self.epsilon_decay
            
        return history
    
    def update_target_model(self): # Define method to copy online DQN model feature weights to target model
        model_weights = self.model.get_weights()
        self.target_model.set_weights(model_weights)
        
    def save(self, name): # Define method for saving model weights
        self.model.save_weights(name)

    def load(self, name): # Define method for loading saved model weights
        self.model.load_weights(name)

# Define helper function for animating game history
def display_frames_as_gif(frame_history, filename_gif = None):
    
    plt.figure(figsize = (frame_history[0].shape[1] / 72.0, frame_history[0].shape[0] / 72.0), dpi = 72)
    patch = plt.imshow(frame_history[0])
    plt.axis('off')

    def animate(i):
        patch.set_data(frame_history[i])

    anim = animation.FuncAnimation(plt.gcf(), animate, frames = len(frame_history), interval = 50)
    if filename_gif: 
        anim.save(filename_gif, writer = 'imagemagick', fps = 20)
    display(display_animation(anim, default_mode = 'loop'))

Instantiate DDQN Agent; load weights from trained agent.

In [3]:
# Set DQN Agent parameters
## Set RL environment parameters
INPUT_SHAPE = (92, 80, 1)
ACTION_SPACE = 6

# Set RMSProp optimizer learning parameters
LEARNING_RATE = 0.00025
RHO = 0.95
OPTIMIZER_EPSILON = 0.01

## Set Q-learning γ discount-rate parameter
GAMMA = 0.99

## Set replay memory size parameter; instantiate memory buffer
REPLAY_MEMORY_SIZE = 6 * 10**4
REPLAY_MEMORY = ReplayMemory(REPLAY_MEMORY_SIZE)

## Set ε-greedy learning parameters for actor-agent
EPSILON_MAX = 1
EPSILON_MIN = 0.1
DECAY_STEPS = 2 * 10**4

# Instantiate Deep-Q Learning Agent; load weights from previously trained trial
agent = DDQNAgent(INPUT_SHAPE, ACTION_SPACE, LEARNING_RATE, RHO, OPTIMIZER_EPSILON, REPLAY_MEMORY, GAMMA, EPSILON_MIN, EPSILON_MIN, DECAY_STEPS)
agent.load(INPUT_DIR + 'weights_100K_frames.hdf5')
agent.ε = 0 

Play one game of ATARI 2600 Space Invaders:

In [4]:
# Set game defaults
env = gym.make('SpaceInvaders-v0') # Instantiate Space Invaders gym environemtn
state = env.reset() # Initialize environment
done = False

frameshistory = [state]

# Loop over game instance
while not done:
    
    env.render() # Render environment
    
    # Act within environment, remember outcome, update state variable
    action = agent.act(state)
    next_state, _, done, _ = env.step(action)
    
    frameshistory.append(next_state)
    state = next_state

env.close()
end_time = datetime.now()

# Report game length, save reult
display_frames_as_gif(frameshistory, OUTPUT_DIR + 'game_run.gif')