In [1]:
import keras
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd

Using TensorFlow backend.


In [2]:
width = 5
height = 5
n_mines = 3

def get_init_stage(width=width, height=height, n_mines=n_mines):
    # set stage
    stage = np.zeros((width, height, 3), dtype='int8')
    # assign mines
    selected_long = np.zeros((width * height), dtype='bool')
    selected_long[np.random.choice(range(0, width * height), n_mines, replace=False)] = True
    selected = selected_long.reshape((width, height))
    stage[selected, 1] = 1
    # set neighbour mine counts
    mines_padded = np.zeros((width + 2, height + 2), dtype='int8')
    mines_padded[1:-1, 1:-1] = stage[:, :, 1]
    for x in range(width):
        for y in range(height):
            if stage[x, y, 1] == 0:
                stage[x, y, 2] = np.sum(mines_padded[x: x + 3, y: y + 3])
    return stage          

def print_stage(stage):
    return stage[:, :, 2] - stage[:, :, 1]

def show_visible(stage):
    return stage[:, :, 0] * print_stage(stage)

def expand(stage, coord):
    mines_padded = np.zeros((width + 2, height + 2), dtype='int8') - 1
    mines_padded[1:-1, 1:-1] = stage[:, :, 1]
    n = []
    n.append((coord[0], coord[1] + 1))
    n.append((coord[0] + 1, coord[1]))
    n.append((coord[0] + 2, coord[1] + 1))
    n.append((coord[0] + 1, coord[1] + 2))
    return [(x[0] - 1, x[1] - 1) for x in n if mines_padded[x[0], x[1]] == 0
           and stage[x[0] - 1, x[1] - 1, 0] == 0]

def step_on(stage, coord):
    new_stage = stage.copy()
    new_stage[coord[0], coord[1], 0] = 1
    # if not mine, expand all (directly) neighbouring non-mine tiles
    if is_dead(new_stage):
        return False, new_stage
    elif stage[coord[0], coord[1], 2] == 0:
        eligible_neighbours = expand(new_stage, coord)
        for c in eligible_neighbours:
            new_stage[c[0], c[1], 0] = 1
            if new_stage[c[0], c[1], 2] == 0:
                eligible_neighbours += expand(new_stage, c)
        return True, new_stage
    else:
        return True, new_stage
    
def is_dead(stage):
    return np.sum(show_visible(stage) < 0)

def pretty(stage):
    pres = pd.DataFrame(print_stage(stage))
    pres = pres.replace(0, '.')
    for c in np.argwhere(stage[:, :, 0] == 0):
        pres.loc[c[0], c[1]] = '*'
    return pres

def is_won(stage):
    return np.sum(stage[:, :, 0]) == width * height - n_mines

In [3]:
# test simulator
temp_stage = get_init_stage()
print(print_stage(temp_stage))
print(pretty(temp_stage))
alive, temp_after = step_on(temp_stage, (2, 2))
print(alive)
print(pretty(temp_after))
print(show_visible(temp_after))

[[ 0  0  0  0  0]
 [ 2  2  1  0  0]
 [-1 -1  2  1  1]
 [ 2  2  2 -1  1]
 [ 0  0  1  1  1]]
   0  1  2  3  4
0  *  *  *  *  *
1  *  *  *  *  *
2  *  *  *  *  *
3  *  *  *  *  *
4  *  *  *  *  *
True
   0  1  2  3  4
0  *  *  *  *  *
1  *  *  *  *  *
2  *  *  2  *  *
3  *  *  *  *  *
4  *  *  *  *  *
[[0 0 0 0 0]
 [0 0 0 0 0]
 [0 0 2 0 0]
 [0 0 0 0 0]
 [0 0 0 0 0]]


In [4]:
def border_padding(stage):
    padded = np.zeros((3, width + 4, height + 4), dtype='int8')
    padded[0, 2:-2, 2:-2] = 1
    padded[1, 2:-2, 2:-2] = stage[:, :, 0]
    padded[2, 2:-2, 2:-2] = show_visible(stage)
    return padded

In [5]:
print(get_init_stage().shape)
border_padding(get_init_stage()).shape

(5, 5, 3)


(3, 9, 9)

In [6]:
# define model for q evaluation
from keras.models import Sequential, Model
from keras.layers import Dense, Activation
from keras.layers.core import Reshape
from keras.layers.core import Flatten
from keras.layers.core import Dropout
from keras.layers.core import Lambda
from keras.layers import Input, merge, Convolution2D
from keras.layers.normalization import BatchNormalization
from keras.optimizers import Adam, RMSprop, SGD
from rl.agents.dqn import DQNAgent
from rl.agents.ddpg import DDPGAgent
from rl.policy import LinearAnnealedPolicy, EpsGreedyQPolicy
from rl.memory import SequentialMemory
import random
from IPython.display import clear_output
from keras.utils.visualize_util import plot

In [7]:
model = Sequential()
model.add(Convolution2D(nb_filter=10, nb_row=5, nb_col=5, border_mode='valid', 
                        input_shape=(3, width + 4, height + 4)))
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(BatchNormalization())
model.add(Convolution2D(nb_filter=5, nb_row=5, nb_col=5, border_mode='same', 
                        input_shape=(3, width, height)))
model.add(Activation('relu'))
model.add(Dropout(0.2))
model.add(BatchNormalization())
model.add(Convolution2D(nb_filter=1, nb_row=1, nb_col=1, border_mode='same', 
                        input_shape=(3, width, height)))
model.add(Activation('linear'))
model.compile(optimizer='rmsprop', loss='mse')

target_model = Sequential()
target_model.add(Convolution2D(nb_filter=10, nb_row=5, nb_col=5, border_mode='valid', 
                        input_shape=(3, width + 4, height + 4)))
target_model.add(Activation('relu'))
target_model.add(Dropout(0.2))
target_model.add(BatchNormalization())
target_model.add(Convolution2D(nb_filter=5, nb_row=5, nb_col=5, border_mode='same', 
                        input_shape=(3, width, height)))
target_model.add(Activation('relu'))
target_model.add(Dropout(0.2))
target_model.add(BatchNormalization())
target_model.add(Convolution2D(nb_filter=1, nb_row=1, nb_col=1, border_mode='same', 
                        input_shape=(3, width, height)))
target_model.add(Activation('linear'))
target_model.compile(optimizer='rmsprop', loss='mse')

In [8]:
plot(model, to_file='model.png', show_shapes=True)

In [9]:
example = border_padding(get_init_stage())
print(example.shape)
pred = model.predict(example.reshape(1, 3, width+4, height+4))
print(pred)
print(pred[0, 0, :, :].shape)
np.unravel_index(pred[0, 0, :, :].argmax(), pred[0, 0, :, :].shape)

(3, 9, 9)
[[[[ 0.05631768 -0.00087915 -0.01718659 -0.0118984  -0.01615041]
   [ 0.01571422 -0.00163792  0.00555199 -0.00901891 -0.01382114]
   [ 0.02998096  0.00398127  0.03865868 -0.00086712 -0.00327395]
   [ 0.03632027  0.02880283  0.08948079  0.04337301  0.02176034]
   [ 0.03509468  0.05766553  0.08709553  0.04773781  0.03326273]]]]
(5, 5)


(3, 2)

In [10]:
penalty = 50

In [11]:
def perform_action(stage, coord):
    alive, next_stage = step_on(stage, coord)
    if not alive:
        return False, -penalty, next_stage
    else:
        return True, np.sum(next_stage[:, :, 0]) - np.sum(stage[:, :, 0]), next_stage

In [12]:
def prepare_input(stage):
    padded = border_padding(stage)
    return padded.reshape(1, 3, width+4, height+4)

In [13]:
def pick_action(stage, epsilon):
    if epsilon == 0 or random.random() > epsilon:
        values = model.predict(prepare_input(stage)) - 100000 * stage[:, :, 0]
        ind = np.unravel_index(values[0, 0, :, :].argmax(), values[0, 0, :, :].shape)
        return ind
    else:
        candidates = np.where(stage[:, :, 0] == 0)
        selected_id = random.randint(0, len(candidates[0]) - 1)
        ind = (candidates[0][selected_id], candidates[1][selected_id])
        return ind

In [14]:
pick_action(get_init_stage(), 1)

(0, 0)

In [15]:
def simulate_games(init_eps=1, epochs=1000, win_bonus=100, gamma=0.99, learn_interval=20,
                   batch_size=20, memory_size=500, update_interval=100, warmup=100, training=True):
    epsilon = init_eps
    memory = []
    n_wins = 0
    n_losses = 0
    memory_counter = 0
    target_model.set_weights(model.get_weights())
    for i in range(epochs):
        if i % update_interval == 0 and i > warmup: # time to update target model
            target_model.set_weights(model.get_weights())
        clear_output(wait=True)
        print("Epoch = %s" % i)
        print("wins: %s" % n_wins)
        print("losses: %s" % n_losses)
        state = get_init_stage()
        is_finished = False
        step_counter = 0
        while not is_finished:
            action = pick_action(state, epsilon)
            alive, delta, next_state = perform_action(state, action)
            if alive:
                done = is_won(next_state)
                is_finished = done
                reward = delta + win_bonus * done
            else:
                done = False
                is_finished = True
                reward = delta
            if is_finished:
                n_wins += done
                n_losses += (not alive)
            if len(memory) < memory_size:
                memory.append((state, action, reward, next_state, is_finished))
            else:
                if memory_counter < (memory_size - 1):
                    memory_counter += 1
                else:
                    memory_counter = 0
                memory[memory_counter] = (state, action, reward, next_state, is_finished)
                if training and i > warmup and i % learn_interval == 0: # time to train
                    minibatch = random.sample(memory, batch_size)
                    X_train = []
                    y_train = []
                    for experience in minibatch:
                        s0, a, r, s1, f = experience
                        q0 = target_model.predict(prepare_input(s0))
                        q1 = target_model.predict(prepare_input(s1))
                        max_q1 = q1.max()
                        y = np.zeros((width, height))
                        y[:] = q0[:]
                        if not f:
                            update = r + gamma * max_q1
                        else:
                            update = r
                        y[a[0], a[1]] = update
                        X_train.append(prepare_input(s0).reshape(3, width+4, height+4))
                        y_train.append(y)
                    X_train = np.array(X_train)
                    y_train = np.array(y_train).reshape(batch_size, 1, width, height)
                    model.train_on_batch(X_train, y_train)
            state = next_state
            step_counter += 1
        if epsilon > 0.1:
            epsilon -= (1/epochs)

In [16]:
simulate_games(init_eps=1, epochs=2000, win_bonus=100, gamma=0.99, learn_interval=5,
                   batch_size=10, memory_size=100, update_interval=20)

Epoch = 1999
wins: 178
losses: 1821


In [17]:
temp_stage = get_init_stage()
print_stage(temp_stage)

array([[ 0,  0,  1, -1,  1],
       [ 1,  1,  3,  2,  2],
       [ 1, -1,  2, -1,  1],
       [ 1,  1,  2,  1,  1],
       [ 0,  0,  0,  0,  0]], dtype=int8)

In [18]:
alive, temp_stage = step_on(temp_stage, (0, 0))
print(print_stage(temp_stage))
print(pretty(temp_stage))

[[ 0  0  1 -1  1]
 [ 1  1  3  2  2]
 [ 1 -1  2 -1  1]
 [ 1  1  2  1  1]
 [ 0  0  0  0  0]]
   0  1  2  3  4
0  .  .  1  *  *
1  1  1  *  *  *
2  *  *  *  *  *
3  *  *  *  *  *
4  *  *  *  *  *


In [19]:
print(model.predict(prepare_input(temp_stage)))

[[[[-0.6047172  -0.61043119 -0.60587913 -0.61504924  0.08700705]
   [-0.6047172  -0.61043119 -0.60587913 -0.61504924 -0.44591853]
   [-1.12930512 -0.84289938 -0.60587913 -0.61504924 -0.60199255]
   [-0.6047172  -0.81678778 -0.60587913 -0.61504924 -0.60199255]
   [-0.74663669 -1.02203429 -0.72721362 -0.84854138 -0.7439276 ]]]]
