## Config

In [1]:
import os
from collections import deque
from matplotlib import animation
import matplotlib.pyplot as plt
import numpy as np
from random import sample, choice

from gym import Env, make
from gym.spaces import Discrete, Box
from gym.utils import seeding
from IPython.display import HTML
from keras import layers, models, optimizers

IMAGE_SIZE = 4
BATCH_SIZE = 32

Using TensorFlow backend.


## Game

In [2]:
class Game(object):

    STATE_INIT                 = 0
    STATE_COLLECTED            = 1
    STATE_MOVED                = 2
    STATE_MOVED_AND_HIT_BORDER = 3
    STATE_WON                  = 4
    
    UP    = TOP    = 0
    RIGHT          = 1
    DOWN  = BOTTOM = 2
    LEFT           = 3
    
    X = 0
    Y = 1

    bd_colors = {
        "BLACK": 0,
        "GREEN": 1,
        "BLUE" : 2
    }

    im_colors = {
        "BLACK": np.array([0, 0, 0]),
        "GREEN": np.array([0, 255, 0]),
        "BLUE" : np.array([0, 0, 255])
    }

    def __init__(self, img_size=4, collectibles=4):

        self.img_size = img_size
        self.collectibles_count = collectibles
        self.collectibles_left = collectibles
        self.im = np.zeros((self.img_size, self.img_size, 3), dtype=np.uint8)
        self.bd = np.zeros((self.img_size, self.img_size), dtype=np.uint8)

    def reset(self):
        self.color_hit_after_move = "BLACK"

        self.collectibles_left = self.collectibles_count

        self.last_player_pos = [0, 0]
        self.player_pos      = self.last_player_pos.copy()

        self.game_state = Game.STATE_INIT

        self.bd = np.zeros((self.img_size, self.img_size), dtype=np.uint8)
        self.im = np.zeros((self.img_size, self.img_size, 3), dtype=np.uint8)

        self.__set_color(self.player_pos, "BLUE")

        self.__generate_collectibles()

        return self.get_frame()

    def __set_color(self, p, c):
        self.bd[p[0]][p[1]] = Game.bd_colors[c]
        self.im[p[0]][p[1]] = Game.im_colors[c]

    def __get_color(self, p):
        return self.bd[p[0]][p[1]]

    def __hit_border(self, border):

        hit = False

        if border == Game.TOP:
            if self.player_pos[Game.X] - 1 < 0:
                hit = True

        elif border == Game.RIGHT:
            if self.player_pos[Game.Y] + 1 > self.img_size - 1:
                hit = True

        elif border == Game.BOTTOM:
            if self.player_pos[Game.X] + 1 > self.img_size - 1:
                hit = True

        elif border == Game.LEFT:
            if self.player_pos[Game.Y] - 1 < 0:
                hit = True

        return hit

    def move_player(self, direction):

        hit_border = self.__hit_border(direction)

        if not hit_border:

            self.last_player_pos[Game.X] = self.player_pos[Game.X]
            self.last_player_pos[Game.Y] = self.player_pos[Game.Y]

            if direction == Game.UP:
                self.player_pos[Game.X] -= 1

            elif direction == Game.RIGHT:
                self.player_pos[Game.Y] += 1

            elif direction == Game.DOWN:
                self.player_pos[Game.X] += 1

            elif direction == Game.LEFT:
                self.player_pos[Game.Y] -= 1

        self.__update_game(hit_border)

    def get_game_state(self):
        return self.game_state

    def get_frame(self, image=False):
        if image:
            return self.im.copy()
        
        return self.bd.copy()

    def __generate_collectibles(self):

        avail_pos = [
            pos
            for pos in np.ndindex(self.img_size, self.img_size)
            if pos != (0, 0)
        ]

        for idx in range(self.collectibles_count):
            pos = avail_pos.pop(avail_pos.index(choice(avail_pos)))
            self.__set_color(pos, "GREEN")

    def __update_game(self, hit_border):
        self.color_hit_after_move = self.__get_color(self.player_pos)

        if hit_border:
            self.game_state = Game.STATE_MOVED_AND_HIT_BORDER
            
        elif not hit_border:
            if self.color_hit_after_move == self.bd_colors["BLACK"]:
                self.game_state = Game.STATE_MOVED
                
            elif self.color_hit_after_move == self.bd_colors["GREEN"]:
                self.game_state = Game.STATE_COLLECTED
                
                self.collectibles_left -= 1
                
                if(self.collectibles_left <= 0):
                    self.game_state = Game.STATE_WON

        self.__set_color(self.player_pos, "BLUE")
        self.__set_color(self.last_player_pos, "BLACK")


## Environment

In [3]:
class MyEnv(Env):

    def __init__(self, img_size=IMAGE_SIZE):
        self.action_space = Discrete(4)
        self.observation_space = Box(
           low=0,
           high=255,
           shape=(img_size, img_size, 3),
           dtype=np.uint8
        )
        self.done = False
        self.info = {}

        self.g = Game()

    def step(self, action):
        self.obs = self.g.get_frame()

        self.g.move_player(action)

        game_state = self.g.get_game_state()

        if game_state == self.g.STATE_COLLECTED:
            self.reward = 20

        elif game_state == self.g.STATE_MOVED:
            self.reward = 5

        elif game_state == self.g.STATE_MOVED_AND_HIT_BORDER:
            self.reward = 0

        elif game_state == self.g.STATE_WON:
            self.reward = 50
            self.done = True

        return self.obs, self.reward, self.done, self.info

    def reset(self):
        obs = self.g.reset()
        self.reward = 0
        self.done = False
        self.info = {}
        return obs

    def render(self):
        return self.g.get_frame(True)

    def seed(self, seed=None):
        self.np_random, seed = seeding.np_random(seed)
        return [seed]

    def close(self):
        pass


## Agent

In [4]:
class Memory:
    def __init__(self, size, data_type):

        self.memory = deque(maxlen=size)
        self.data_type = data_type

    def save(self, content):
        assert isinstance(content, self.data_type)
        self.memory.append(content)

    def load(self):
        return self.memory

    def erase(self):
        self.memory.clear()

        
class Agent:
    def __init__(self, input_shape, env):
        self.__env = env

        self.__hparams = {
            "EPSILON": 1.0,          # Exploration factor
            "EPSILON_DECAY": 0.995,   # Exploration factor discount
            "EPSILON_LIMIT": 0.01,   # Exploration factor min value
            "GAMMA": 0.99,           # Discount Rate for Bellman's equation
            "LEARNING_RATE": 0.001,  # Learning Rate for DNN optimizer
            "TAU": 0.125             # Target model training interval
        }

        self.__memory = Memory(2000, tuple)

        self.__input_shape = input_shape
        self.__model = self.__build_model()
        self.__target_model = self.__build_model()
        
        print(self.__model.summary())

    def choose_action(self, state):

        # Apply discount to exploration rate
        self.__hparams["EPSILON"] *= self.__hparams["EPSILON_DECAY"]

        # If exploration rate reached or exceeded limit, set it to min
        self.__hparams["EPSILON"] = max(
            self.__hparams["EPSILON_LIMIT"], self.__hparams["EPSILON"]
        )

        # Each time, it gets harder to explore and easier to exploit
        # If algorithm is exploring, return a random action
        if np.random.rand() < self.__hparams["EPSILON"]:
            return self.__env.action_space.sample()

        # Else, predict Q-values (future discounted rewards for each action) for current state
        tf_state = np.expand_dims(state, axis=0)
        Q_sa = self.__model.predict(tf_state)[0]

        # Return index of the highest Q-value
        return np.argmax(Q_sa)

#     def play(self, batch_size):

#         if len(self.__memory.load()) < batch_size:
#             return

#         # Get <batch_size> random memory entries
#         batch = sample(self.__memory.load(), batch_size)

#         for state, action, reward, new_state, done in batch:

#             # TensorFlow requires an extra dimension specifing # of inputs
#             tf_state = np.expand_dims(state, axis=0)
#             tf_new_state = np.expand_dims(new_state, axis=0)

#             Q_sa = self.__target_model.predict(tf_state)

#             if done:
#                 # Game ended, our target is the current state reward
#                 Q_sa[0][action] = reward
#             else:
#                 # Game still running, our target is the current state reward
#                 # plus next state discounted reward (Bellman's equation)
#                 Q_sa[0][action] = reward + self.__hparams["GAMMA"] \
#                     * np.max(self.__target_model.predict(tf_new_state)[0])

#             self.__model.fit(tf_state, Q_sa, epochs=1, verbose=0)

    def play(self, batch_size):

        if len(self.__memory.load()) < batch_size:
            return

        # Get <batch_size> random memory entries
        batch = sample(self.__memory.load(), batch_size)
        states = np.zeros((batch_size, state.shape[1:]))
        target = np.zeros((batch_size, self.env.action_space.n))
        
        i = 0
        for state, action, reward, new_state, done in batch:
            
            # TensorFlow requires an extra dimension specifing # of inputs
            tf_state = np.expand_dims(state, axis=0)
            tf_new_state = np.expand_dims(new_state, axis=0)

            states[i] = tf_state
            target_y  = self.__model.predict(tf_state)[0]
            Q_sa      = self.__model.predict(tf_new_state)[0]
            
            print(states)
            print(target_y)
            print(Q_sa)

            if done:
                # Game ended, our target is the current state reward
                target_y[i][action] = reward
            else:
                # Game still running, our target is the current state reward
                # plus next state discounted reward (Bellman's equation)
                target_y[i][action] = reward + self.__hparams["GAMMA"] * np.max(Q_sa)

            self.__model.fit(states, target_y, epochs=1, verbose=0)

    def record(self, content):
        self.__memory.save(content)

#     def target_train(self):
#         # Copy weights from main model to target model

#         model_weights = self.__model.get_weights()
#         target_model_weights = self.__target_model.get_weights()
#         tau = self.__hparams["TAU"]

#         for i in range(len(target_model_weights)):
#             target_model_weights[i] = \
#                 model_weights[i] * tau + target_model_weights[i] * \
#                 (1 - tau)

#         self.__target_model.set_weights(target_model_weights)

    def __build_model(self):
        model = models.Sequential()
        
        l_dens1 = layers.Dense(32, activation="relu", input_shape=self.__input_shape)
        l_dens2 = layers.Dense(32, activation="relu")
        l_flat1 = layers.Flatten()
        l_dens3 = layers.Dense(self.__env.action_space.n, activation="sigmoid")

        model.add(l_dens1)
        model.add(l_dens2)
        model.add(l_flat1)
        model.add(l_dens3)

        opt = optimizers.Adam(lr=self.__hparams["LEARNING_RATE"])

        model.compile(loss="mse", optimizer=opt)

        return model

    def load(self, name):
        self.__model.load_weights(name)

    def save(self, name):
        self.__model.save_weights(name)

## Train

In [5]:
EPISODES = 100
STEPS = 40
INPUT_SHAPE = (IMAGE_SIZE, IMAGE_SIZE)

env = MyEnv()

agent = Agent(INPUT_SHAPE, env)

# Play <EPISODES> games
for ep in range(EPISODES):

    state = env.reset()

    # Each game takes <STEPS> steps
    for st in range(STEPS):

        # Get an action and execute it in the environment
        action = agent.choose_action(state)
        new_state, reward, done, _ = env.step(action)

        # Store experience acquired by the "player"
        # after executing an action
        agent.record((state, action, reward, new_state, done))

        # Train main model based on recorded states
        agent.play(BATCH_SIZE)

        # Update target model
#         agent.target_train()

        state = new_state.copy()

        if done:
            agent.save("weights.h5")
            break

agent.save("weights.h5")
print("\nDone!")

_________________________________________________________________
Layer (type)                 Output Shape              Param #   
dense_1 (Dense)              (None, 4, 32)             160       
_________________________________________________________________
dense_2 (Dense)              (None, 4, 32)             1056      
_________________________________________________________________
flatten_1 (Flatten)          (None, 128)               0         
_________________________________________________________________
dense_3 (Dense)              (None, 4)                 516       
Total params: 1,732
Trainable params: 1,732
Non-trainable params: 0
_________________________________________________________________
None


UnboundLocalError: local variable 'state' referenced before assignment

## Play

In [13]:
try:
    agent.load("weights.h5")
except (OSError, ValueError) as e:
    exit(e)

STEPS = 40
    
frames = []
obs = env.reset()
frames.append(env.render())

for st in range(STEPS):
    action = agent.choose_action(obs)

    new_obs, reward, done, info = env.step(action)
    frames.append(env.render())

    obs = new_obs.copy()
    
    if done:
        print("DONE")
        break

## Create gameplay GIF

In [14]:
ANIMATION_INTERVAL = 200

fig, ax = plt.subplots(figsize=(IMAGE_SIZE, IMAGE_SIZE))

def update(i):
    frame = frames[i]
    ax.imshow(frame)
    ax.set_axis_off()

anim = animation.FuncAnimation(fig, update, frames=len(frames), interval=ANIMATION_INTERVAL)
anim.save('play.gif', dpi=80, writer='imagemagick')
plt.close()

### [Click here to view generated GIF](./play.gif)