# Deep Q Learning - Atari Space Invaders

import modules

In [None]:
import tensorflow as tf
from tensorflow import keras
import numpy as np 
from collections import deque
import random
import retro 


check GPU

In [None]:
tf.config.list_physical_devices('GPU')
from tensorflow.python.client import device_lib 
print(device_lib.list_local_devices())

create environment

In [None]:
env = retro.make(game='SpaceInvaders-Atari2600')
input_shape = env.observation_space.shape #pixel resolution
n_outputs = env.action_space.n
print(input_shape)
print(n_outputs)

preprocessing functions

In [None]:
def preprocces_img_1(img):
    
    #preprocessing only one frame

    x= img[8:-12,4:-12]
    x = x / 255.0
    x = tf.image.rgb_to_grayscale(x)
    x = tf.image.resize(x, [100, 84])
    
    return x

In [None]:
def preprocces_img_more(img):
    
    #preprocessing more frames
    
    x= img[:, 8:-12,4:-12]
    x = x / 255.0
    x = tf.image.rgb_to_grayscale(x)
    x = tf.image.resize(x, [100, 84])
    
    return x

In [None]:
#shape sanity check

x = np.random.rand( 64, 210, 160, 3)
x= x[: ,8:-12,4:-12]
x = x / 255.0
x = tf.image.rgb_to_grayscale(x)
x = tf.image.resize(x, [100, 84])
x.shape

create Neural Network

In [None]:
input_shape = [100, 84, 1]
model = keras.models.Sequential([
    keras.layers.InputLayer(input_shape),
    keras.layers.Conv2D(32, kernel_size = 8, strides = 2, padding="valid", activation='elu', kernel_initializer='glorot_normal'),
    keras.layers.Conv2D(64, kernel_size = 4, strides = 2, padding="valid", activation='elu', kernel_initializer='glorot_normal'),
    keras.layers.Conv2D(64, kernel_size = 3, strides = 2, padding="valid", activation='elu', kernel_initializer='glorot_normal'),
    keras.layers.Flatten(),
    keras.layers.Dense(512, activation='elu', kernel_initializer='glorot_normal'),
    keras.layers.Dense(env.action_space.n)
])

helping functions

In [None]:
def epsilon_greedy_policy(state, epsilon=0):
    if np.random.rand() < epsilon:
        return np.random.randint(8)
    else:
        img = preprocces_img_1(obs)
        img = np.expand_dims(img, axis=0)
        Q_values = model.predict(img)
        return np.argmax(Q_values[0])

In [None]:
replay_memory = deque(maxlen=1000000)

In [None]:
def sample_experiences(batch_size):
    indices = np.random.randint(len(replay_memory), size=batch_size)
    batch = [replay_memory[index] for index in indices]
    states, actions, rewards, next_states, dones = [
        np.array([experience[field_index] for experience in batch])
        for field_index in range(5)]
    return states, actions, rewards, next_states, dones

In [None]:
def play_one_step(env, state, epsilon):
    action = epsilon_greedy_policy(state, epsilon)
    action = tf.one_hot(action, 8)
    next_state, reward, done, info = env.step(action)
    replay_memory.append((state, action, reward, next_state, done))
    return next_state, reward, done, info

In [None]:

batch_size = 64
discount_rate = 0.95
optimizer = keras.optimizers.Adam(lr=1e-3)
loss_fn = keras.losses.mean_squared_error

def training_step(batch_size):
    experiences = sample_experiences(batch_size)
    states, actions, rewards, next_states, dones = experiences
    next_Q_values = model.predict(preprocces_img_more(next_states))

    max_next_Q_values = np.max(next_Q_values, axis=1)

    target_Q_values = (rewards + (1 - dones) * discount_rate * max_next_Q_values)
    target_Q_values = target_Q_values.reshape(-1, 1)
    mask = actions
    #mask = tf.one_hot(actions, n_outputs)
    with tf.GradientTape() as tape:
        all_Q_values = model(preprocces_img_more(states))
        Q_values = tf.reduce_sum(all_Q_values * mask, axis=1, keepdims=True)
        loss = tf.reduce_mean(loss_fn(target_Q_values, Q_values))
    grads = tape.gradient(loss, model.trainable_variables)
    optimizer.apply_gradients(zip(grads, model.trainable_variables))

In [None]:
rewards = [] 
best_score = 0

training 

In [None]:
env.close()
env = retro.make(game='SpaceInvaders-Atari2600')

maxsteps = 50000
env.reset()
for episode in range(2):  #change range for desired number of episodes
    obs = env.reset()    
    for step in range(maxsteps):
        epsilon = max(1 - episode / 100, 0.01)
        obs, reward, done, info = play_one_step(env, obs, epsilon)
        if done:
            break
    rewards.append(step) 
    if step > best_score: 
        best_weights = model.get_weights() 
        best_score = step
        model.save('best_model_v1')
    print("\rEpisode: {}, Steps: {}, eps: {:.3f}".format(episode, step + 1, epsilon), end="")
    if episode > 0:
        training_step(batch_size)
    if episode % 5:
        model.save('model_v1_chckpnt')

saving the model

In [None]:
model.save('test_v1')