# Playing Simple Catch Videogame with Keras and Evolution Strategy

Eder Santana blog post: https://goo.gl/chauJT

Eder Santana GitHub repo: https://github.com/EderSantana/EvolutionMVE

Small correction and addition of this iPython Notebook by Claude COULOMBE - PhD candidate - TÉLUQ / UQAM - Montréal

### Introduction

Whenever I want to learn a new machine learning method, I write a single file example with a Minimum Viable Experiment (MVE). Last time I did an MVE to learn <a href="http://edersantana.github.io/articles/keras_rl/">RL and Q-Learning</a>. This time I want to learn <a href="https://arxiv.org/abs/1703.03864">Evolution Strategies</a>.

<img src="CatchVideogame_ES_Original.gif" width=200>
Before I show you the code I came up with, let us talk a bit about a fun recurring equation. Say you want to create a model of a variable  $\textbf{y}$ (ex. price of a stock, say if an image shows a cat or a dog, the steering of a self driving car, etc). You go ahead and use a model  $\textbf{f(x)}$  to approximate $\textbf{y}$. Where $\textbf{x}$ are inputs (ex. previous stock values, a jpeg image, a video of the road, etc). That model $\textbf{f}$ also have parameters $\textbf{w}$ to be tuned. Just like in a neural network that at this point everybody heard about.

A simple way to measure how well your model performs is to calculate the average value of 
$L = (y-f(x))^2$. Then, you can update the parameters $\textbf{w}$ using information about the gradient of L: 

$\nabla L = -2(y-f(x))*\nabla f(x)$.

Note that there are two main terms multiplied in the last equation. A quality measure $-2(y-f(x))$ and a model sensitivity $\nabla f(x)$. If we forget precise formulations for a bit and just think about matching possible quality measures and model sensitivity, we can see that this same equation is used elsewhere, especially in reinforcement learning (RL). For example, the REINFORCE algorithm also updates model parameters according to $R*\nabla(log(p(x))$ where $\textbf{R}$ is a reward measure and $log(p(x))$ is a log-probability of certain actions of a policy function.

Taking this to an extreme, where we assume that a model sensitivity is solely defined by small random jitters $\textbf{n}$  we add to our model parameters $\textbf{w}$, we get $\nabla L = R * n$. Where again $\textbf{R}$ are rewards defined by completing a given task. Obviously that approximation is not really precise and should have a lot of errors when estimating good values to update $\textbf{w}$. Good thing about it is we can usually compensate for lack of precision with exhaustive trial, error and lots of data.

And that's it! We can now try to solve the same catch videogame from the previous post, but this time with a much simpler formulation. The trade off is more compute. Now you can <a href="https://github.com/EderSantana/EvolutionMVE">check out the code</a>. It is pretty short and should be simple to understand, but let me know if something isn’t clear.

P.-S.: After you are done understanding the basics of evolution strategies, I’d recommend using more serious code bases like <a href="https://github.com/openai/evolution-strategies-starter">OpenAI ES</a>, <a href="https://github.com/hardmaru/estool">estools</a> or <a href="https://github.com/alirezamika/evostra">evostra</a> for further experiments. Also check out <a href="http://blog.otoro.net/2017/10/29/visual-evolution-strategies/">A Visual Guide to Evolution Strategies</a> for more evolution strategies and nice visualizations.

In [1]:
import json
import numpy as np
import tensorflow as tf
import keras

Sequential =  keras.models.Sequential
Dense = keras.layers.Dense
sgd = keras.optimizers.SGD

class Catch(object):
    def __init__(self, grid_size=10):
        self.grid_size = grid_size
        self.reset()

    def _update_state(self, action):
        """
        Input: action and states
        Ouput: new states and reward
        """
        state = self.state
        if action == 0:  # left
            action = -1
        elif action == 1:  # stay
            action = 0
        else:
            action = 1  # right
        f0, f1, basket = state[0]
        new_basket = min(max(1, basket + action), self.grid_size-1)
        f0 += 1
        out = np.asarray([f0, f1, new_basket])
        out = out[np.newaxis]

        assert len(out.shape) == 2
        self.state = out

    def _draw_state(self):
        im_size = (self.grid_size,)*2
        state = self.state[0]
        canvas = np.zeros(im_size)
        canvas[state[0], state[1]] = 1  # draw fruit
        canvas[-1, state[2]-1:state[2] + 2] = 1  # draw basket
        return canvas

    def _get_reward(self):
        fruit_row, fruit_col, basket = self.state[0]
        if fruit_row == self.grid_size-1:
            if abs(fruit_col - basket) <= 1:
                return 1
            else:
                return 0 
        else:
            return 0

    def _is_over(self):
        if self.state[0, 0] == self.grid_size-1:
            return True
        else:
            return False

    def observe(self):
        canvas = self._draw_state()
        return canvas.reshape((1, -1))

    def act(self, action):
        self._update_state(action)
        reward = self._get_reward()
        game_over = self._is_over()
        return self.observe(), reward, game_over

    def reset(self):
        n = np.random.randint(0, self.grid_size-1, size=1)
        m = np.random.randint(1, self.grid_size-2, size=1)
        self.state = np.asarray([0, n, m])[np.newaxis]


def play_round(model, grid_size):
    # Define environment/game
    env = Catch(grid_size)
    input_t = env.observe()
    game_over = False

    while not game_over:
        input_tm1 = input_t
        # get next action
        q = model.predict(input_tm1)
        action = np.argmax(q[0])

        # apply action, get rewards and new state
        input_t, reward, game_over = env.act(action)

    return reward

if __name__ == "__main__":
    # parameters
    num_actions = 3  # [move_left, stay, move_right]
    epoch = 5000  # how many runs 
    hidden_size = 100
    population_size = 100  # how many different random agents per run
    learning_rate = .1
    sigma = .5  # std of the noise that defines weight perturbation (with large sigma you explore further, but may never really get anywhere)
    grid_size = 10
    games_per_agent = 5

    # this model is smaller than the Q-learning model (see the comment oriented programming, COP, down there?)
    model = Sequential()
    model.add(Dense(hidden_size, input_shape=(grid_size**2,), activation='relu'))
    #model.add(Dense(hidden_size, activation='relu'))
    model.add(Dense(num_actions))
    model.compile(sgd(lr=.2), "mse")

    # If you want to continue training from a previous model, just uncomment the line bellow (I love COP)
    # model.load_weights("model.h5")

    # Train
    for e in range(epoch):
        weights = model.get_weights()
        all_eps = []
        rewards = np.zeros(population_size)
        for i, p in enumerate(range(population_size)):
            new_weights = []
            eps = []
            for w in weights:
                eps.append(np.random.randn(*w.shape))
                new_weights.append(w + sigma * eps[-1])
            all_eps.append(eps)
            model.set_weights(new_weights) 
            for j in range(games_per_agent):
                rewards[i] += play_round(model, grid_size)

        # total reward of everything explored
        win_cnt = rewards.sum()
        reward_norm = (rewards - rewards.mean()) / rewards.std()

        # correlate reward with noise and update weights
        updated_weights = []
        for index, w in enumerate(weights):
            X = np.array([eps[index] for eps in all_eps])  # all noise added to weight[index]
            w = w + learning_rate / (population_size * sigma) * X.T.dot(reward_norm).T
            updated_weights.append(w)

        # your new model is good to go
        model.set_weights(updated_weights)

        # decay learning rate
        # learning_rate *= .993

        print("Epoch {:03d}/{:04d} | Win count {}".format(e, epoch-1, win_cnt))
             
    # Save trained model weights and architecture, this will be used by the visualization code
    model.save_weights("model.h5", overwrite=True)
    with open("model.json", "w") as outfile:
        json.dump(model.to_json(), outfile)
        
    print("Training completed!")

  from ._conv import register_converters as _register_converters
Using TensorFlow backend.


Epoch 000/4999 | Win count 145.0
Epoch 001/4999 | Win count 142.0
Epoch 002/4999 | Win count 146.0
Epoch 003/4999 | Win count 153.0
Epoch 004/4999 | Win count 140.0
Epoch 005/4999 | Win count 118.0
Epoch 006/4999 | Win count 166.0
Epoch 007/4999 | Win count 153.0
Epoch 008/4999 | Win count 172.0
Epoch 009/4999 | Win count 156.0
Epoch 010/4999 | Win count 159.0
Epoch 011/4999 | Win count 146.0
Epoch 012/4999 | Win count 155.0
Epoch 013/4999 | Win count 179.0
Epoch 014/4999 | Win count 143.0
Epoch 015/4999 | Win count 175.0
Epoch 016/4999 | Win count 141.0
Epoch 017/4999 | Win count 167.0
Epoch 018/4999 | Win count 161.0
Epoch 019/4999 | Win count 156.0
Epoch 020/4999 | Win count 157.0
Epoch 021/4999 | Win count 171.0
Epoch 022/4999 | Win count 165.0
Epoch 023/4999 | Win count 143.0
Epoch 024/4999 | Win count 161.0
Epoch 025/4999 | Win count 156.0
Epoch 026/4999 | Win count 181.0
Epoch 027/4999 | Win count 157.0
Epoch 028/4999 | Win count 185.0
Epoch 029/4999 | Win count 159.0
Epoch 030/

In [2]:
import json
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf
import keras
from evolve import Catch

model_from_json = keras.models.model_from_json

if __name__ == "__main__":
    # Make sure this grid size matches the value used fro training
    grid_size = 10

    with open("model.json", "r") as jfile:
        model = model_from_json(json.load(jfile))
    model.load_weights("model.h5")
    model.compile("sgd", "mse")

    # Define environment, game
    env = Catch(grid_size)
    c = 0
    for e in range(10):
        loss = 0.
        env.reset()
        game_over = False
        # get initial input
        input_t = env.observe()

        plt.imshow(input_t.reshape((grid_size,)*2),
                   interpolation='none', cmap='gray')
        plt.axis("off")
        for n in range(5):
            plt.savefig("%04d.png" % c)
            c += 1
        while not game_over:
            input_tm1 = input_t

            # get next action
            q = model.predict(input_tm1)
            action = np.argmax(q[0])

            # apply action, get rewards and new state
            input_t, reward, game_over = env.act(action)

            plt.imshow(input_t.reshape((grid_size,)*2),
                       interpolation='none', cmap='gray')
            plt.axis("off")
            for n in range(5):
                plt.savefig("%04d.png" % c)
                c += 1

print("Prediction and images generation completed!")

Prediction and images generation completed!


### Create an animated Gif on Mac OS X with imagemagick (similar on Linux)

    > brew install imagemagick --build-from-source
    > brew upgrade imagemagick
    > for f in *.png; do mv "$f" "${f#0}"; done
    # Note: Take the last 150 images since the agent has learned a lot and plays better.
    > convert -resize 50% -delay 10 -loop 0 {350..499}.png CatchVideogame_ES_Me.gif

The resulting animated gif image:

<img src="CatchVideogame_ES_Me.gif" width=200>
