# Space Invaders trained with Deep Q Network

## Step 0: Import necessary packages

In [1]:
import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
import gym
import random
import cv2

## Step 1: Activate the environment and examine state/action spaces

In [2]:
env = gym.make('SpaceInvaders-v0')

In [3]:
print(env.observation_space.shape)
print(env.action_space)
print(env.unwrapped.get_action_meanings())

(210, 160, 3)
Discrete(6)
['NOOP', 'FIRE', 'RIGHT', 'LEFT', 'RIGHTFIRE', 'LEFTFIRE']


In [4]:
state_shape = env.observation_space.shape
action_size = env.action_space.n

Let's take some random actions in the environment and see what happens...

In [5]:
scores = []
for i_episode in range(100):
    score = 0
    observation = env.reset()
    for t in range(1000):
        env.render()
        action = env.action_space.sample()
        observation, reward, done, info = env.step(action)
        score += reward
        if done:
            print("Episode finished after {0} timesteps with a score of {1}".format(t+1, score))
            break
    scores.append(score)
env.close()

Episode finished after 374 timesteps with a score of 60.0
Episode finished after 839 timesteps with a score of 410.0
Episode finished after 982 timesteps with a score of 165.0
Episode finished after 415 timesteps with a score of 30.0
Episode finished after 397 timesteps with a score of 30.0
Episode finished after 402 timesteps with a score of 90.0
Episode finished after 504 timesteps with a score of 15.0
Episode finished after 548 timesteps with a score of 105.0
Episode finished after 548 timesteps with a score of 60.0
Episode finished after 500 timesteps with a score of 60.0
Episode finished after 489 timesteps with a score of 50.0
Episode finished after 697 timesteps with a score of 155.0
Episode finished after 959 timesteps with a score of 240.0
Episode finished after 611 timesteps with a score of 55.0
Episode finished after 651 timesteps with a score of 80.0
Episode finished after 656 timesteps with a score of 120.0
Episode finished after 809 timesteps with a score of 155.0
Episode

In [7]:
print("Average score after 100 episodes = {}".format(sum(scores)/len(scores)))

Average score after 100 episodes = 157.6


## Step 2: Create and train the agent

In [8]:
from agent import DQN
from collections import deque

In [9]:
# Grayscale the image, downsample it, crop out the irrelevant portion of the game, pad it into a square
def preprocess(img):
    src = cv2.cvtColor(img, cv2.COLOR_BGR2GRAY)[::2, ::2][:100,:]
    src_norm = src / 255.0
    dst = cv2.copyMakeBorder(src_norm, 0, 0, 10, 10, borderType=cv2.BORDER_CONSTANT, value=0.0)
    return dst

In [None]:
# Define all hyperparameters here
LR = 1e-3
RANDOM_SEED = 42
BUFFER_SIZE = 1e5
BATCH_SIZE = 64
GAMMA = 0.95
TAU = 1e-3
N_TIME_STEPS = 1
N_LEARN_UPDATES = 1
EPS_START = 1.0
EPS_END = 0.01
EPS_DECAY = 0.995

if tf.test.is_gpu_available():
    DEVICE = "/GPU:0"
else:
    DEVICE = "/device:CPU:0"

In [None]:
state_size = (100, 100, 1) # After pre-processing images are 100x100x1
action_size = env.action_space.n # Here, 6 available actions
agent = DQN(state_size, action_size, LR,
            RANDOM_SEED, BUFFER_SIZE, BATCH_SIZE,
            GAMMA, TAU, N_TIME_STEPS, N_LEARN_UPDATES, DEVICE) # Instantiate the agent

In [None]:
def dqn(n_episodes=50000, print_every=100):
    scores_deque = deque(maxlen=print_every)
    scores = []
    eps = EPS_START
    
    for i_episode in range(1, n_episodes+1):
        state = env.reset()
        state = preprocess(state)
        
        score = 0
        t = 0
        
        while(True):
            t += 1
            
            action = agent.act(state, eps)
            
            next_state, reward, done, _ = env.step(action)
            
            next_state = preprocess(next_state)
            agent.step(t, state, action, reward, next_state, done)
            
            state = next_state.copy()
            
            score += reward
            
            if done:
                break
                
        scores_deque.append(score)
        scores.append(score)
        
        # Reduce exploration rate because we are more sure of our actions now
        if eps > EPS_END:
            eps *= EPS_DECAY
            
        print('\rEpisode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_deque)), end="")
        agent.q_local.model.save('checkpoint_q_network.h5')
        if i_episode % print_every == 0:
            print('\rEpisode {}\tAverage Score: {:.2f}'.format(i_episode, np.mean(scores_deque)))
            
        if np.mean(scores_deque) >= 3000.0:
            print('\nHigh score so stopping training in {:d} episodes!\tAverage Score: {:.2f}'.format(i_episode-100, np.mean(scores_deque)))
            agent.q_local.model.save('checkpoint_q_network.h5')
            break
            
    return scores

scores = dqn()

fig = plt.figure()
ax = fig.add_subplot(111)
plt.plot(np.arange(1, len(scores)+1), scores)
plt.ylabel('Score')
plt.xlabel('Episode #')
plt.show()

## Step 3: See the trained agent in action

In [None]:
from tensorflow.keras.models import load_model

trained_model = load_model('checkpoint_actor.h5', compile=False)

In [None]:
for i_episode in range(20):
    next_state = env.reset()
    
    score = 0.0
    
    for t in range(999):
        env.render()
        next_state = np.expand_dims(next_state, axis=0)
        action = trained_model(next_state).numpy()[0]
        next_state, reward, done, info = env.step(action)
        score += reward
        if done:
            break
    
    print("Episode {0} finished after {1} timesteps. Total score: {2}".format(i_episode+1, t+1, score))
            
env.close()