In [None]:
from matplotlib import pyplot as plt
from scipy import signal
import numpy as np
import random

In [2]:
%matplotlib inline

In [3]:
shape = (19,19)

# Implement game logic

In [4]:
class Gomoku:
    
    def __init__(self, shape):
        self.shape = shape
        self.reset()
        
    def reset(self):
        self.last_player = 1
        self.board = np.stack((np.zeros(self.shape), np.zeros(self.shape),np.ones(self.shape)), axis=2)
        self.previous_board = np.copy(self.board)
    
    def draw(self):
        plt.imshow(self.board)
    
    def list_actions(self):
        return np.transpose(np.nonzero(self.board[:,:,2])).tolist()
        
    def take_action(self, action):
        self.previous_board = np.copy(self.board)
        self.last_player = 1 - self.last_player 
        pixel = np.zeros((3))
        pixel[self.last_player] = 1
        self.board[action] = pixel
        self.last_action = action
        return self.__revard()
    
    def get_last_action(self):
        return self.last_action
    
    def get_state(self):
        return self.convert_state_for_player(self.board, self.last_player)
    
    def get_raw_state(self):
        return self.board
    
    def get_previous_state(self):
        return self.convert_state_for_player(self.previous_board, self.last_player)
    
    def convert_state_for_player(self, board, player):
        result = np.copy(board)
        
        if player == 1:
            tmp = np.copy(result[:,:,0])
            result[:,:,0] = result[:,:,1]
            result[:,:,1] = tmp
        
        return result

    def game_over(self):
        return self.__won(0) or self.__won(1) or np.count_nonzero(self.board[:,:,2]) == 0
    
    def __revard(self):
        return 1 if self.__won(self.last_player) else 0
        
    def __won(self, player):
        board = self.board[:,:, player]
        return (
            self.__has_five_by(np.identity(5), board) or 
            self.__has_five_by(np.fliplr(np.identity(5)), board) or 
            self.__has_five_by(np.ones((1,5)), board) or 
            self.__has_five_by(np.ones((5,1)), board)
        )
    
    def __has_five_by(self, mask, board):
        return np.count_nonzero(signal.convolve2d(mask, board) == 5) > 0
    
game = Gomoku(shape)        

# Deep Q learning

In [5]:
from keras.models import Model
from keras.layers import Input
from keras.layers.core import Dropout
from keras.layers.convolutional import Conv2D
from keras.layers.pooling import GlobalMaxPooling2D
from keras.layers.normalization import BatchNormalization
from keras.optimizers import Nadam

from IPython import display

Using TensorFlow backend.


In [6]:
hidden_layer_count = 40

In [25]:
input_state = Input(shape=game.get_state().shape)
x = Conv2D(hidden_layer_count, (7,7), padding='same', activation='relu')(input_state)
x = Conv2D(hidden_layer_count, (5,5), padding='same', activation='relu')(x)
x = Conv2D(hidden_layer_count, (3,3), padding='same', activation='relu')(x)
x = BatchNormalization(axis=3)(x)
x = Conv2D(hidden_layer_count, (7,7), padding='same', activation='relu')(x)
x = Conv2D(hidden_layer_count, (5,5), padding='same', activation='relu')(x)
x = Conv2D(hidden_layer_count, (3,3), padding='same', activation='relu')(x)
x = BatchNormalization(axis=3)(x)
prob_map = Conv2D(1, (1,1), padding='same', activation='relu')(x)
winner = GlobalMaxPooling2D()(prob_map)

model = Model(input_state, winner)
map_model = Model(input_state, prob_map)

opt = Nadam(lr=1e-5)
model.compile(optimizer=opt, loss='binary_crossentropy')
map_model.compile(optimizer=opt, loss='mean_squared_error')

In [19]:
epsilon = 0.05

In [20]:
def make_move():
    if game.game_over():
        return
    
    if np.random.rand() < epsilon:
        action = tuple(random.choice(game.list_actions()))
    else: 
        q = model.predict(np.expand_dims(game.get_state(), axis=0)).reshape(shape)
        action = tuple(sorted(game.list_actions(), key=lambda x: q[tuple(x)])[-1])
        
    return make_manual_move(action)

def make_manual_move(action):
    if game.game_over():
        return
    
    revard = game.take_action(action)
    return [game.get_previous_state(), action, revard, game.get_state(), game.game_over()]

In [23]:
model.predict(np.expand_dims(game.get_state(), axis=0))

array([[ 0.07759527,  0.04737442,  0.11939137,  0.08218905,  0.17152865,
         0.08011203,  0.04622764,  0.20202038,  0.07601954,  0.05204586,
         0.1211382 ,  0.0833847 ,  0.06910384,  0.11528851,  0.1069544 ,
         0.08209111,  0.11662121,  0.11993162,  0.06925429,  0.05142764,
         0.08038506,  0.10802316,  0.08072764,  0.09815552,  0.09605538,
         0.14821672,  0.1114252 ,  0.10503768,  0.08283931,  0.12878175,
         0.05962959,  0.04498002,  0.11816934,  0.12710357,  0.07555363,
         0.12029865,  0.02097337,  0.14544865,  0.02945024,  0.12941884]], dtype=float32)

In [26]:
color_map = map_model.predict(np.expand_dims(game.get_state(), axis=0)).reshape(shape)

ValueError: cannot reshape array of size 14440 into shape (19,19)

In [17]:
from IPython.core.display import display as core_display
from IPython.core.display import HTML 

def state_for_display():
    raw_state = game.get_raw_state()
    last_action = game.get_last_action()
    color_map = model.predict(np.expand_dims(game.get_state(), axis=0)).reshape(shape)
    color_map = color_map - np.min(color_map)
    color_map = (color_map / np.max(color_map) * 255).astype(int)
    
    html = '<table>'
    for i in range(shape[0]):
        html += '<tr>'
        for j in range(shape[1]):
                        
            html += "<td style='border:1px solid gray; width:25px; height:25px; fotn-weight:bold; text-align:center; "
            if last_action == (i,j): 
                html += "background-color:rgb(255, "+str(color_map[i,j])+", "+str(255-color_map[i,j])+");" 
            else: 
                html += "background-color:rgb(0, "+ str(color_map[i,j]) +", "+str(255-color_map[i,j])+");" 
                
            html += "'"
            if raw_state[i,j,2]:
                html += " onclick='make_move("+str(i)+","+str(j)+")'"
            html += ">"
            if raw_state[i,j,0] == 1:
                html += 'X'
            if raw_state[i,j,1] == 1:
                html += 'O'
            if raw_state[i,j,0] == 1:
                html += '&nbsp;'                
                
            html += "</td>"
        html += '</tr>'

    html += '</table>'
    if game.game_over():
        html += '<h3>Game Over</h3>'
    print(html)

game.reset()



html = """

<div id='display_div'></div>
<button onclick='next()'>Make Next AI Move</button>
<button onclick='play()'>Autoplay</button>
<button onclick='reset()'>New Game</button>

<script type="text/Javascript">

    function display_state(out) {
        document.getElementById('display_div').innerHTML = out.content.text;
    }

    function next() {
        var kernel = IPython.notebook.kernel;
        kernel.execute('make_move()');
        kernel.execute('state_for_display()', {"iopub" : {"output":display_state}});
    }
    
    function make_move(i,j) {
        var kernel = IPython.notebook.kernel;
        kernel.execute('make_manual_move(('+i+', '+j+'))');
        kernel.execute('state_for_display()', {"iopub" : {"output":display_state}});
        setTimeout(next, 300);
    }
    
    function play() {
        var kernel = IPython.notebook.kernel;
        next();
        kernel.execute('print(game.game_over(), end="")', {"iopub" : {"output":function(out) {
            if(out.content.text == "False") {
                setTimeout(play, 100);
            }
        }}});
        
    }
    
    function reset() {
        var kernel = IPython.notebook.kernel;
        kernel.execute('game.reset()');
        next()
    }
    
    next()
</script>
"""

core_display(HTML(html))

# Loading real games

In [None]:
import os
from tqdm import tqdm

experiences = []
wins = []

def letter_to_index(letter):
    if(ord(letter) < ord('J')):
        return ord(letter)-65
    else:
        return ord(letter)-66

def game_log_to_tuples(log):
    return list(map(lambda l: (letter_to_index(l[0]),int(l[1:-1])-1),log))

files = os.listdir('/data/bsd_gomoku_games')

skipped_files = 0

for filename in tqdm(files):
    if filename[-4:] == '.log':
        with open('/data/bsd_gomoku_games/'+filename) as f:
            content = game_log_to_tuples(f.readlines())

        if len(content) < 200:
            game.reset()
            for move in content:
                make_manual_move(move)
        else:
            skipped_files += 1

print(skipped_files)

In [None]:
discount = 0.9

for k in tqdm(range(1000)):
    seed_batch = random.sample(experiences,len(wins)) + wins

    original_q = model.predict(np.array([item[0] for item in seed_batch]))

    revards = np.array([item[2] for item in seed_batch])
    game_over = np.array([item[4] for item in seed_batch]).astype(int)

    desired_q = np.zeros(original_q.shape)
    for index in range(len(desired_q)):
        desired_q[index][seed_batch[index][1]] = revards[index]

    loss = model.train_on_batch(np.array([item[0] for item in seed_batch]), desired_q)
    

In [None]:
discount = 0.9

for k in tqdm(range(1000)):
    if k % 300:
        opt.lr = opt.lr/10
        
    seed_batch = random.sample(experiences, min(2000,len(experiences)))

    original_q = model.predict(np.array([item[0] for item in seed_batch]))
    new_q = model.predict(np.array([item[3] for item in seed_batch]))

    new_q_max = np.squeeze(np.max(np.max(new_q, axis=1), axis=1), axis=1)

    revards = np.array([item[2] for item in seed_batch])
    game_over = np.array([item[4] for item in seed_batch]).astype(int)
    calculated_q = (revards + discount * new_q_max * (1-game_over)).tolist()

    desired_q = np.copy(original_q)
    for index in range(len(calculated_q)):
        desired_q[index][seed_batch[index][1]] = calculated_q[index]

    loss = model.train_on_batch(np.array([item[0] for item in seed_batch]), desired_q)

    display.clear_output(wait=True)
    print(loss)
    

# Training

In [None]:
protect_until = len(experiences)

In [None]:
protect_until

In [None]:
epsilon = 0.05

In [None]:
opt.lr = 1e-7

In [None]:
discount = 0.9

In [None]:
for i in tqdm(range(12000)):
    game.reset()
    game_length = 0
    while not game.game_over():
        if np.random.rand() < epsilon:
            action = tuple(random.choice(game.list_actions()))
        else: 
            q = model.predict(np.expand_dims(game.get_state(), axis=0)).reshape(shape)
            action = tuple(sorted(game.list_actions(), key=lambda x: q[tuple(x)])[-1])

        revard = game.take_action(action)

        experiences.append([game.get_previous_state(), action, revard, game.get_state(), game.game_over()])
        game_length+=1

    experiences[-2][2] = -1
    
    for k in range(3):
        seed_batch = random.sample(experiences[:protect_until], min(1000,protect_until)) + random.sample(experiences[protect_until:], min(1000,len(experiences)-protect_until))

        original_q = model.predict(np.array([item[0] for item in seed_batch]))
        new_q_max = training_model.predict(np.array([item[3] for item in seed_batch]))

        revards = np.array([item[2] for item in seed_batch])
        calculated_q = revards + discount * np.squeeze(new_q_max)

        loss = training_model.train_on_batch(np.array([item[0] for item in seed_batch]), calculated_q)
    
    display.clear_output(wait=True)
    print(i, game_length, loss, '(', len(experiences), ')')
    
    if len(experiences) > 100000:
        experiences = experiences[:protect_until] + random.sample(experiences[protect_until:], 50000)
    