# Playing Simple Catch Videogame with RL and QLearning using Keras
Eder Santana blog post: http://edersantana.github.io/articles/keras_rl/

Eder Santana GitHub gist repo: https://gist.github.com/EderSantana/c7222daa328f0e885093

Small correction and addition of this iPython Notebook by Claude COULOMBE - PhD candidate - TÉLUQ / UQAM - Montréal

### Introduction - A Minimum Viable Experiment (MVE)

Get started with reinforcement learning in less than 200 lines of code with Keras (Theano or Tensorflow, it’s your choice).

<img src="CatchVideogame_RL_Qlearning_Original.gif" width=200>

So you are a (Supervised) Machine Learning practitioner that was also sold the hype of making your labels weaker and to the possibility of getting neural networks to play your favorite games. You want to do Reinforcement Learning (RL), but you find it hard to read all those full featured libraries just to get a feeling of what is actually going on.

Here we’ve got your back: we took the game engine complexities out of the way and show a minimal Reinforcement Learning example with less than 200 lines of code. And yes, the example does use Keras, your favorite deep learning library! That's a Minimum Viable Experiment (MVE).

Before I give you a link to the code make sure you read Nervana’s blog post <a href="http://www.nervanasys.com/demystifying-deep-reinforcement-learning/">Demystifying Deep Reinforcement Learning</a>. There you will learn about Q-learning, which is one of the many ways of doing RL. Also, at this point you already know that neural nets love mini-batches and there you will see what Experience Replay is and how to use it to get you them batches - even in problems where an agent only sees one sample of the environment state at a time.

### Code walkthrough

So here is the <a href="https://gist.github.com/EderSantana/c7222daa328f0e885093">link to our code</a>. In that code Keras plays the catch videogame, where it should catch a single pixel “fruit” using a three pixel “basket”. The fruit falls one pixel per step and the Keras network gets a reward of +1 if it catches the fruit and -1 otherwise (code below). 
``` Python
def _get_reward(self):
    fruit_row, fruit_col, basket = self.state[0]
    if fruit_row == self.grid_size-1:
        if abs(fruit_col - basket) <= 1:
            return 1
        else:
            return -1
    else:
        return 0
```
The networks see the entire 10x10 pixels grid as input (code below) 
``` Python
def _draw_state(self):
    im_size = (self.grid_size,)*2
    state = self.state[0]
    canvas = np.zeros(im_size)
    canvas[state[0], state[1]] = 1  # draw fruit
    canvas[-1, state[2]-1:state[2] + 2] = 1  # draw basket
    return canvas
```
and outputs three values, each value corresponds to an action (move left, stay, move right). 
``` Python
model.add(Dense(num_actions))
```
Since these values represent the expected accumulated future reward, we just go greedy and pick the action corresponding to the largest value (code below):
``` Python
    q = model.predict(input_tm1)
    action = np.argmax(q[0])
```

One thing to note though, is that this network is not quite like you in exotic restaurants, it doesn’t take the very same action exploiting what it already knows at every time, once in a while we force system to take a random action (code below). This would be the equivalent of you learning that life is more than just `Penang Curry` with `fried Tempeh` by trial and error.
``` Python
if np.random.rand() <= epsilon:
    action = np.random.randint(0, num_actions, size=1)
```

In the link you will also find scripts that plays the game with no random actions and generates the pictures for the animation above.

Enjoy!

### FAQ

#### 1) How does this Q-learning thing even work? 

C’mon read the blog post I just mentioned above… Anyway, think like this: the fruit is almost hitting the ground and your model is just one pixel away from a “catching” position. The model will face similar cases many many times. If it decides to stay or move left, it will be punished (imagine it smelling a bunch of rotten fruits in the ground because it was lazy). Thus, it learns to assign a small Q-value (sounds much better than just “output of neural net”, han?) to those two actions whenever it sees that picture as input. But, since catching the fruit also gives a juicy +1 reward, the model will learn to assign a larger Q-value to the “move right” action in that case. This is what minimizing the reward - Q-value error does (code below).
``` Python
    # There should be no target values for actions not taken.
    # Thou shalt not correct actions not taken #deep
    targets[i] = model.predict(state_t)[0]
    Q_sa = np.max(model.predict(state_tp1)[0])
    if game_over:  # if game_over is True
        targets[i, action_t] = reward_t
    else:
        # reward_t + gamma * max_a' Q(s', a')
        targets[i, action_t] = reward_t + self.discount * Q_sa
```

One step before that, there will be no reward in the next step.

I liked how the previous phrase sounded, so I decided to give it its own paragraph. But, although in that case there is no juicy reward right after, the model can be trained using the maximum Q-value of the future state in the next step. Think about it. If you’re in the kitchen you know that you can just open the fridge to get food. But now you’re in your bedroom writing bad jokes and feel hungry. But you have this vague memory that going to the kitchen could help with that. You just go to the kitchen and there you figure how to help yourself. You have to learn all that by living the game. I know, being Markovian is hard! But then the rest is just propagating these reward expectations further and further into the past, assigning high values for good choices and low values for bad choices (don’t forget that sometimes you hit those random choices in college so you learn the parts of life they don’t talk about in school). For everything else, if you believe in Stochastic Gradient Descent then it is easy to see this actually making sense… I hope…

#### 2) How different is that from AlphaGo?

Not much… But instead of learning Q-values, AlphaGo thought it was smarter to use REINFORCE and learn to output actions probabilities directly. After that, she played several games against herself, so many that it could later learn the probability of winning from each position. Using all that information, during play time she uses a search technique to look for possible actions that would take her to positions with higher probability of winning. But she told me to mention here that she doesn’t search as many possibilities in the future as her older cousin DeepBlue did. She also said that she can play pretty well using just one GPU, the other 99 were running high resolution Netflix series so she can catch up with human culture.

That being said, you should be able to modify this script in 2 or 3 days to get a reimplementation or AlphaGo and Skynet should be 4 weeks away?

JK

#### 3) Your code sucks why don’t you write something better?

<a href="https://github.com/EderSantana/X/blob/master/examples/catcher.py">I’m trying…</a>

#### 4) Did you learn that by yourself?

The bad parts, yes. The good things were taught to me by my friends Evan Kriminger and Matthew Emigh.

### <a href="http://edersantana.github.io/">« Full blog<a>

### Train...

In [5]:
import json
import numpy as np
from keras.models import Sequential
from keras.layers.core import Dense
from keras.optimizers import sgd

class Catch(object):

    def __init__(self, grid_size=10):
        self.grid_size = grid_size
        self.reset()

    def _update_state(self, action):
        """
        Input: action and states
        Ouput: new states and reward
        """
        state = self.state
        if action == 0:  # left
            action = -1
        elif action == 1:  # stay
            action = 0
        else:
            action = 1  # right
        f0, f1, basket = state[0]
        new_basket = min(max(1, basket + action), self.grid_size-1)
        f0 += 1
        out = np.asarray([f0, f1, new_basket])
        out = out[np.newaxis]

        assert len(out.shape) == 2
        self.state = out

    def _draw_state(self):
        im_size = (self.grid_size,)*2
        state = self.state[0]
        canvas = np.zeros(im_size)
        canvas[state[0], state[1]] = 1  # draw fruit
        canvas[-1, state[2]-1:state[2] + 2] = 1  # draw basket
        return canvas

    def _get_reward(self):
        fruit_row, fruit_col, basket = self.state[0]
        if fruit_row == self.grid_size-1:
            if abs(fruit_col - basket) <= 1:
                return 1
            else:
                return -1
        else:
            return 0

    def _is_over(self):
        if self.state[0, 0] == self.grid_size-1:
            return True
        else:
            return False

    def observe(self):
        canvas = self._draw_state()
        return canvas.reshape((1, -1))

    def act(self, action):
        self._update_state(action)
        reward = self._get_reward()
        game_over = self._is_over()
        return self.observe(), reward, game_over

    def reset(self):
        n = np.random.randint(0, self.grid_size-1, size=1)
        m = np.random.randint(1, self.grid_size-2, size=1)
        self.state = np.asarray([0, n, m])[np.newaxis]

class ExperienceReplay(object):
    def __init__(self, max_memory=100, discount=.9):
        self.max_memory = max_memory
        self.memory = list()
        self.discount = discount

    def remember(self, states, game_over):
        # memory[i] = [[state_t, action_t, reward_t, state_t+1], game_over?]
        self.memory.append([states, game_over])
        if len(self.memory) > self.max_memory:
            del self.memory[0]

    def get_batch(self, model, batch_size=10):
        len_memory = len(self.memory)
        num_actions = model.output_shape[-1]
        env_dim = self.memory[0][0][0].shape[1]
        inputs = np.zeros((min(len_memory, batch_size), env_dim))
        targets = np.zeros((inputs.shape[0], num_actions))
        for i, idx in enumerate(np.random.randint(0, len_memory,
                                                  size=inputs.shape[0])):
            state_t, action_t, reward_t, state_tp1 = self.memory[idx][0]
            game_over = self.memory[idx][1]

            inputs[i:i+1] = state_t
            # There should be no target values for actions not taken.
            # Thou shalt not correct actions not taken #deep
            targets[i] = model.predict(state_t)[0]
            Q_sa = np.max(model.predict(state_tp1)[0])
            if game_over:  # if game_over is True
                targets[i, action_t] = reward_t
            else:
                # reward_t + gamma * max_a' Q(s', a')
                targets[i, action_t] = reward_t + self.discount * Q_sa
        return inputs, targets

if __name__ == "__main__":
    # parameters
    epsilon = .1  # exploration
    num_actions = 3  # [move_left, stay, move_right]
    epoch = 1000
    max_memory = 500
    hidden_size = 100
    batch_size = 50
    grid_size = 10

    model = Sequential()
    model.add(Dense(hidden_size, input_shape=(grid_size**2,), activation='relu'))
    model.add(Dense(hidden_size, activation='relu'))
    model.add(Dense(num_actions))
    model.compile(sgd(lr=.2), "mse")

    # If you want to continue training from a previous model, just uncomment the line bellow
    # model.load_weights("model.h5")

    # Define environment/game
    env = Catch(grid_size)

    # Initialize experience replay object
    exp_replay = ExperienceReplay(max_memory=max_memory)

    # Train
    win_cnt = 0
    for e in range(epoch):
        loss = 0.
        env.reset()
        game_over = False
        # get initial input
        input_t = env.observe()

        while not game_over:
            input_tm1 = input_t
            # get next action
            if np.random.rand() <= epsilon:
                action = np.random.randint(0, num_actions, size=1)
            else:
                q = model.predict(input_tm1)
                action = np.argmax(q[0])

            # apply action, get rewards and new state
            input_t, reward, game_over = env.act(action)
            if reward == 1:
                win_cnt += 1

            # store experience
            exp_replay.remember([input_tm1, action, reward, input_t], game_over)

            # adapt model
            inputs, targets = exp_replay.get_batch(model, batch_size=batch_size)

#             loss += model.train_on_batch(inputs, targets)[0]
            loss += model.train_on_batch(inputs, targets)
        print("Epoch {:03d}/999 | Loss {:.4f} | Win count {}".format(e, loss, win_cnt))

    # Save trained model weights and architecture, this will be used by the visualization code
    model.save_weights("model.h5", overwrite=True)
    with open("model.json", "w") as outfile:
        json.dump(model.to_json(), outfile)
        
print("Training completed!")

Epoch 000/999 | Loss 0.0660 | Win count 1
Epoch 001/999 | Loss 0.1317 | Win count 1
Epoch 002/999 | Loss 0.1505 | Win count 1
Epoch 003/999 | Loss 0.1930 | Win count 1
Epoch 004/999 | Loss 0.2347 | Win count 1
Epoch 005/999 | Loss 0.1623 | Win count 1
Epoch 006/999 | Loss 0.1986 | Win count 1
Epoch 007/999 | Loss 0.1931 | Win count 2
Epoch 008/999 | Loss 0.2256 | Win count 2
Epoch 009/999 | Loss 0.2228 | Win count 2
Epoch 010/999 | Loss 0.2068 | Win count 2
Epoch 011/999 | Loss 0.2234 | Win count 2
Epoch 012/999 | Loss 0.1819 | Win count 2
Epoch 013/999 | Loss 0.1722 | Win count 2
Epoch 014/999 | Loss 0.1817 | Win count 2
Epoch 015/999 | Loss 0.1620 | Win count 3
Epoch 016/999 | Loss 0.1731 | Win count 3
Epoch 017/999 | Loss 0.1244 | Win count 4
Epoch 018/999 | Loss 0.1605 | Win count 4
Epoch 019/999 | Loss 0.1705 | Win count 4
Epoch 020/999 | Loss 0.2126 | Win count 5
Epoch 021/999 | Loss 0.1582 | Win count 5
Epoch 022/999 | Loss 0.1830 | Win count 5
Epoch 023/999 | Loss 0.1779 | Win 

### Test...

In [7]:
import json
import matplotlib.pyplot as plt
import numpy as np
from keras.models import model_from_json
from qlearn import Catch

if __name__ == "__main__":
    # Make sure this grid size matches the value used fro training
    grid_size = 10

    with open("model.json", "r") as jfile:
        model = model_from_json(json.load(jfile))
    model.load_weights("model.h5")
    model.compile("sgd", "mse")

    # Define environment, game
    env = Catch(grid_size)
    c = 0
    for e in range(10):
        loss = 0.
        env.reset()
        game_over = False
        # get initial input
        input_t = env.observe()

        plt.imshow(input_t.reshape((grid_size,)*2),
                   interpolation='none', cmap='gray')
        plt.savefig("%03d.png" % c)
        c += 1
        while not game_over:
            input_tm1 = input_t

            # get next action
            q = model.predict(input_tm1)
            action = np.argmax(q[0])

            # apply action, get rewards and new state
            input_t, reward, game_over = env.act(action)

            plt.imshow(input_t.reshape((grid_size,)*2),
                       interpolation='none', cmap='gray')
            plt.savefig("%03d.png" % c)
            c += 1

print("Prediction and images generation completed!")

Prediction and images generation completed!


### Create an animated Gif on Mac OS X with imagemagick (similar on Linux)

    > brew install imagemagick --build-from-source
    > brew upgrade imagemagick
    > for f in *.png; do mv "$f" "${f#0}"; done
    # Note: Take the last 25 images since the agent has learned a lot and plays better.
    > convert -resize 50% -delay 10 -loop 0 {75..99}.png CatchVideogame_RL_Qlearning_Me.gif

The resulting animated gif image:

<img src="CatchVideogame_RL_Qlearning_Me.gif" width=200>
