## SpaceInvaders Gym
*If code differs between the notebook and the curriculum, go with the notebook!!!*

### Imports

In [1]:
import gym
import numpy as np
from collections import deque
import pickle

import keras
from keras.models import Sequential
from keras.layers.core import Dense, Flatten
from keras.layers import Conv2D, MaxPooling2D
from keras.layers import Activation, Input

pygame 1.9.6
Hello from the pygame community. https://www.pygame.org/contribute.html


Using TensorFlow backend.


## Create the environment 
### & setup Q and other parameters

In [2]:
env = gym.make('SpaceInvaders-v0')
state = env.reset()
action_space = env.action_space.n
state_space = (90, 70, 4)

possible_actions = np.array(np.identity(action_space, dtype=int).tolist())

stack_size=4

learning_rate = 0.00025
decay_rate = 0.00001
eps_max = 1
eps_min = 0.01

## Frame preprocessing

In [3]:

def preprocess_frame(frame):
    cropped_downsampled = frame[20:-10:2, 10:-10:2]
    return np.mean(cropped_downsampled, axis=2).astype(np.uint8) / 255

def stack_frames(stacked_frames, new_frame, new_episode):
    stacked_state = None
    frame = preprocess_frame(new_frame)
    if new_episode:
        stacked_frames = deque([np.zeros((90, 70), dtype=np.uint8) for i in range(stack_size)], maxlen=4) # maxlen missing in curric
        for _ in range(stack_size):
            stacked_frames.append(frame)
            stacked_state = np.stack(stacked_frames, axis=2)
    else:
        stacked_frames.append(frame)
        stacked_state = np.stack(stacked_frames, axis=2)
    return stacked_state, stacked_frames

def greedy_action(model, decay_step):
    exp_tradeoff = np.random.rand()
    epsilon = eps_min + (eps_max - eps_min) * np.exp(-decay_rate * decay_step)
    if (epsilon > exp_tradeoff):
        choice = np.random.randint(action_space)
        action = possible_actions[choice]
    else:
        choice = np.argmax(model.predict(np.array(stacked_frames).reshape(1, *state_space)))
        action = possible_actions[choice]
    return action

def sample_memory(buffered_list, batch_size):
    buffer_size = len(buffered_list)
    index = np.random.choice(np.arange(buffer_size), size=batch_size, replace=False)
    return [buffered_list[i] for i in index]

## Create the model

In [6]:
# creating the model
model = Sequential([
        Conv2D(16, (8,8), strides=(4,4), data_format="channels_last", input_shape=(90,70,4), activation='relu'),
        Conv2D(32, (4,4), strides=(2,2), data_format="channels_last", activation='relu'),
        Flatten(),
        Dense(256, activation='relu'),
        Dense(action_space, activation='softmax')
    ])

model.compile(optimizer=keras.optimizers.Adam(lr=learning_rate,
                                              beta_1=eps_min,
                                              beta_2=eps_max,
                                              decay=decay_rate),
              loss='categorical_crossentropy',
              metrics=['acc'])

## The training algorithm

In [7]:
history = []

memory = deque(maxlen=1000)

stacked_frames = deque([np.zeros((90, 70), dtype=np.uint8) for i in range(stack_size)], maxlen=4)

rewards_list = []

decay_step = 0

batch_size=60

for episode in range(100):
    
    obs = env.reset()
    done = False
    total_reward = 0
    obs, stacked_frames = stack_frames(stacked_frames, obs, True)

    while not done:
        

        decay_step += 1
        
        action = np.argmax(greedy_action(model, decay_step))
        
        next_obs, reward, done, _ = env.step(action)
        
        next_obs, stacked_frames = stack_frames(stacked_frames, next_obs, False)
        memory.append((obs, action, reward, next_obs, done))
        
        # we'll add the reward to our existing total_reward
        total_reward += reward
        
        if done:
            rewards_list.append(total_reward)
            

        obs = next_obs
        

    if len(memory) > 100:

        batch = sample_memory(memory, batch_size=batch_size)
        states = np.array([item[0] for item in batch], ndmin=3)
        actions = [item[1] for item in batch]
        rewards = [item[2] for item in batch]
        next_states = np.array([item[3] for item in batch], ndmin=3)
        

        targets = [learning_rate * np.max(item) for item in model.predict(next_states)]
        targets = [targets[i] + rewards[i] for i in range(len(targets))]
        
        # creates the outputs to fit to
        target_f = [item for item in model.predict(states)]
        for i in range(len(target_f)):
            target_f[i][actions[i]] = targets[i]
            
        # train on whole batch!
        model.train_on_batch(x=np.array(states).reshape(-1, * state_space),
                                            y=np.array(target_f).reshape(-1, action_space))
        

## Saving?
After all that, you might want to save your model!

In [85]:
# pickle.dump(model, open('SpaceInvadersModel.pkl', 'wb'))

In [3]:
model = pickle.load(open('SpaceInvadersModel.pkl', 'rb'))

Instructions for updating:
Colocations handled automatically by placer.
Instructions for updating:
Use tf.cast instead.


Note: the way we've written this notebook is not necessarily best practice... OOP would make for more robust, reusable code. Extension (and more emphasis in general) could be about building out classes. Also, missing reliable and regular validation tests due to train_on_batch... need to monitor training

## Playing
This one includes a rendering option. By default it shows the game play!

In [8]:
def play_game(num_games, model=False, render=True):
    
    stacked_frames = deque([np.zeros((90, 70), dtype=np.uint8) for i in range(stack_size)], maxlen=4)
    t_score = 0
    
    for episode in range(num_games):
        
        done = False
        turn = 0
        score = 0
        obs = env.reset()
        
        obs, stacked_frames = stack_frames(stacked_frames, obs, True)

        while not done:
            
            if render:
                env.render()

            if model:
                # get our move from the model
                choice = np.argmax(model.predict(np.array(stacked_frames).reshape(1, *state_space)))
                action = np.argmax(possible_actions[choice])
                next_obs, reward, done, _ = env.step(action)  
                
                # stack it 
                next_obs, stacked_frames = stack_frames(stacked_frames, next_obs, False)
                state = next_obs
            else:
                # random agent
                _, reward, done, _ = env.step(env.action_space.sample())
                
            # tally score
            score += reward

        if render:
            env.close()
        
        print(f'Game {episode + 1} score: {score}')
            
        # get avg score for all games played... for rough analysis
        t_score += score

    print(f'Average score over {num_games} games: {t_score/num_games}')

In [9]:
play_game(5, model)

Game 1 score: 145.0
Game 2 score: 215.0
Game 3 score: 290.0
Game 4 score: 90.0
Game 5 score: 205.0
Average score over 5 games: 189.0
