# Imports

In [1]:
import numpy as np
import _pickle as pickle
import gym
import time
import matplotlib.pyplot as plt
import tensorflow as tf
import tensorflow.keras.layers as layers
import tensorflow.keras.initializers as initializers

# Memory

In [2]:
class Memory:
    def __init__(self, max_memory):
        self.max_memory = max_memory
        self.samples = []
    
    def add_sample(self, sample):
        self.samples.append(sample)
        if len(self.samples) > self.max_memory:
            self.sample.pop(0)
            
    def sample(self, no_samples):
        if no_samples > len(self.samples):
            return random.sample(self.samples, len(self.samples))
        else:
            return random.sample(self.samples, no_samples)

# Model

In [3]:
initializer = initializers.GlorotNormal

x_in = layers.Input(shape = (6400,))
x = layers.Dense(200, kernel_initializer= initializer, activation="relu")(x_in)
x_out = layers.Dense(1, activation="sigmoid")(x)

model = tf.keras.Model(x_in, x_out)

model.compile(optimizer = "adam", loss = "binary_crossentropy", metrics = ["acc"])

In [4]:
model.summary()

Model: "model"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
input_1 (InputLayer)         [(None, 6400)]            0         
_________________________________________________________________
dense (Dense)                (None, 200)               1280200   
_________________________________________________________________
dense_1 (Dense)              (None, 1)                 201       
Total params: 1,280,401
Trainable params: 1,280,401
Non-trainable params: 0
_________________________________________________________________


# GameRunner

In [16]:
class GameRunner:
    def __init__(self, env, model, memory, epsilon, max_eps, min_eps, game_dimensions, epsilon_greedy_frames, resume = False, render = True):
        self.env = env
        self.model = model
        self.memory = memory
        self.eps = epsilon
        self.max_eps = max_eps
        self.min_eps = min_eps
        self.render = render
        self.resume = resume
        self.epsilon_greedy_frames = epsilon_greedy_frames
        self.gameDimensions = game_dimensions
        self.rewards = []
        self.max_x = []
    
    def run(self):
        observation = self.env.reset()
        reward_sum = 0
        running_reward = None
        prev_frame = None
        episode_number = 0
        
        while True:
            if self.render:
                env.render()

            if self.resume:
                model.load_weights("ModelWeights")
            
            curr_frame = self.prepro(observation)
            change_in_frame = curr_frame - prev_frame if prev_frame is not None else np.zeros(self.gameDimensions)
            prev_frame = curr_frame
            
            action = self.choose_action(change_in_frame)
            
            observation, reward, done, _ = self.env.step(action) 

            y = 0 if action == 2 else 1

            self.memory.add_sample((change_in_frame, y, reward))
            #self.replay()

            # Decay probability of taking random action
            epsilon_interval = (self.max_eps - self.min_eps)
            self.eps -= epsilon_interval / self.epsilon_greedy_frames
            self.eps = max(self.eps, self.min_eps)
            
            reward_sum += reward

            if done:
                running_reward = reward_sum if running_reward is None else running_reward * 0.99 + reward_sum * 0.01
                print ('resetting env. episode reward total was %f. running mean: %f' % (reward_sum, running_reward))
                reward_sum = 0
                observation = env.reset() # reset env
                prev_frame = None
                episode_number += 1

                model.save_weights("ModelWeights")

            if reward != 0: # Pong has either +1 or -1 reward exactly when game ends.
                print('ep %d: game finished, reward: %f, epsilon: %g' % (episode_number, reward, self.eps) + ('' if reward == -1 else ' !!!!!!!!'))
                
    def choose_action(self, state):
        if np.random.random() < self.eps:
                return np.random.randint(2, 4)
        else:
            change_in_frame = change_in_frame.reshape((1,6400))
            up_prob = model.predict(change_in_frame)
            if up_prob >= .5:
                return 2
            else:
                return 3
            
    def prepro(self, input_frame):
        """ prepro 210x160x3 uint8 frame into 6400 (80x80) 1D float vector """
        input_frame = input_frame[34:194] # crop
        input_frame = input_frame[::2,::2,0] # downsample by factor of 2 (halves the resolution of the image)
        #This takes every other pixel in the image
        input_frame[input_frame == 144] = 0 # erase background (background type 1)
        input_frame[input_frame == 109] = 0 # erase background (background type 2)
        input_frame[input_frame != 0] = 1 # everything else (paddles, ball) just set to 1
        return input_frame.astype(np.float).ravel()
    
    def discount_rewards(self, rewards):
        """ take 1D float array of rewards and compute discounted reward """
        discounted_r = np.zeros_like(rewards)
        running_add = 0
        for t in reversed(range(0, rewards.size)):
            if rewards[t] != 0: running_add = 0 # reset the sum, since this was a game boundary (pong specific!)
            running_add = running_add * gamma + rewards[t]
            discounted_r[t] = running_add
        return discounted_r

# Hyperparamters

In [6]:
gamma = 0.99
batch_size = 10

# Main Code

In [17]:
env = gym.make("Pong-v0")

mem = Memory(5000)

eps = 1.0
max_eps = 0.000001
min_eps = 1.0
eps_greedy_frames = 100000.0

game_dimensions = 80*80

gr = GameRunner(env, model, mem, eps, max_eps, min_eps, game_dimensions, eps_greedy_frames, False, True)

gr.run()

ep 0: game finished, reward: -1.000000, epsilon: 1.00085
ep 0: game finished, reward: -1.000000, epsilon: 1.00132
ep 0: game finished, reward: -1.000000, epsilon: 1.00177
ep 0: game finished, reward: -1.000000, epsilon: 1.00223
ep 0: game finished, reward: -1.000000, epsilon: 1.00269
ep 0: game finished, reward: -1.000000, epsilon: 1.00318
ep 0: game finished, reward: -1.000000, epsilon: 1.00366
ep 0: game finished, reward: -1.000000, epsilon: 1.00412
ep 0: game finished, reward: -1.000000, epsilon: 1.00455
ep 0: game finished, reward: -1.000000, epsilon: 1.00502
ep 0: game finished, reward: -1.000000, epsilon: 1.00548
ep 0: game finished, reward: -1.000000, epsilon: 1.00598
ep 0: game finished, reward: -1.000000, epsilon: 1.00641
ep 0: game finished, reward: -1.000000, epsilon: 1.00687
ep 0: game finished, reward: -1.000000, epsilon: 1.0082
ep 0: game finished, reward: -1.000000, epsilon: 1.00865
ep 0: game finished, reward: -1.000000, epsilon: 1.00912
ep 0: game finished, reward: -1.

AttributeError: 'function' object has no attribute 'pop'

In [5]:
env.observation_space

Box(0, 255, (210, 160, 3), uint8)

In [6]:
env.unwrapped.get_action_meanings()

# NOOP is the same as FIRE (standing still)
# LEFT is the same as LEFTFIRE (down)
# RIGHT is the same as RIGHTFIRE (up)

['NOOP', 'FIRE', 'RIGHT', 'LEFT', 'RIGHTFIRE', 'LEFTFIRE']

In [8]:
env.unwrapped.get_keys_to_action()

{(): 0, (32,): 1, (100,): 2, (97,): 3, (32, 100): 4, (32, 97): 5}