In [1]:
import numpy as np
import theano
import theano.tensor as T
import lasagne

Using gpu device 0: Quadro K4000 (CNMeM is enabled with initial size: 75.0% of memory, cuDNN 5004)


In [2]:
FIELD_SIZE = (3, 3)
SIZE_OF_LINE_TO_WIN = 3

class TicTacToe:
    def __init__(self, field_size, size_of_line_to_win):
        self.field = np.zeros(field_size, dtype = np.int8)
        self.field_size = self.field.shape
        self.size_of_line_to_win = size_of_line_to_win
        self.move = 1
    
    def make_move(self, action):
        """
            action is tuple (row, col)
        """
        self.field[action] = self.move
        self.move = -self.move
        return abs(self.check_win_condition())
        
    def avaliable_moves(self):
        """
            Avaliable moves array of (row, col)
        """
        return zip(*np.where(self.field == 0))
    
    def players(self):
        return [-1, 1]
    
    def player(self):
        return self.move
    
    def finish(self): 
        return self.check_win_condition() != 0 or np.all(self.field != 0)
    
    def action_as_array(self, action):
        res = np.zeros((1, self.field_size[0], self.field_size[1]), dtype = np.float32)
        res[:, action[0], action[1]] = 1
        return res
    
    def state_as_array(self):
        return np.expand_dims(np.array([self.field == -self.move, self.field == 0, self.field == self.move],
                                       dtype = np.float32), axis = 0)
    
    def check_win_condition(self):
        """
            If someone allready win, return winer, otherwise zero
        """
        ### For every submaxtix of size (SIZE_OF_LINE_TO_WIN, SIZE_OF_LINE_TO_WIN) check if there is a winner
        for i in range(self.field_size[0] - self.field_size[0] + 1):
            for j in range(self.field_size[1] - self.field_size[1] + 1):
                sub_matrix = self.field[i:(i + self.size_of_line_to_win),j:(j + self.size_of_line_to_win)]
                #If any equal to SIZE_OF_LINE_TO_WIN we have a winner
                #check rows
                row_sum = sub_matrix.sum(axis = 1)
                #check columns
                col_sum = sub_matrix.sum(axis = 0)
                #check diagonals
                diag_sum = np.array([np.diag(sub_matrix).sum(), np.diag(np.fliplr(sub_matrix)).sum()])
                all_sum = np.hstack([row_sum, col_sum, diag_sum])
                
                if np.any(all_sum == self.size_of_line_to_win):
                    return 1
                if np.any(all_sum == -self.size_of_line_to_win):
                    return -1
        return 0

    
    def __str__(self):
        res = np.empty(FIELD_SIZE, dtype = object)
        res[self.field == 0] = "_"
        res[self.field == 1] = "X"
        res[self.field == -1] = "0"
        return "\n".join(map(lambda x: "".join(x), res))

In [3]:
class TicTacToeStrategy:
    def __init__(self, field_size):
        self.field_size = field_size
        self.state = T.tensor4("state", dtype='float32')
        self.action = T.tensor3("action", dtype='float32')
        self.reward = T.vector("reward", dtype='float32')
    
    def get_net(self):        
        state_input = lasagne.layers.InputLayer(shape = (None, 3, FIELD_SIZE[0], FIELD_SIZE[1]),
                                        input_var = self.state, name = 'state')
        action_input = lasagne.layers.InputLayer(shape = (None, FIELD_SIZE[0], FIELD_SIZE[1]),
                                         input_var = self.action, name = 'action')
        action_reshaped = lasagne.layers.ReshapeLayer(action_input, shape = (-1, 1, FIELD_SIZE[0], FIELD_SIZE[1]))

        net = lasagne.layers.concat([state_input, action_reshaped], axis = 1)

        net = lasagne.layers.DenseLayer(net, num_units = 60, name='hiden1')

        return lasagne.layers.DenseLayer(net, num_units = 1,
                        name='output', nonlinearity=lasagne.nonlinearities.identity)
    
    def compile(self):
        """
            Compile a network, return theano function for training from triplets (state, action, reward)
            and for predicting from (state, action)
        """
        self.net = self.get_net()
        reward_predicted = lasagne.layers.get_output(self.net).reshape((-1, ))
        self.all_weights = lasagne.layers.get_all_params(self.net)
        loss = lasagne.objectives.squared_error(reward_predicted, self.reward).mean()
        updates_sgd = lasagne.updates.sgd(loss, self.all_weights, 0.1)
        self.train_fun = theano.function([self.state, self.action, self.reward], loss, updates = updates_sgd)
        self.predict_fun = theano.function([self.state, self.action], reward_predicted)
    
    def predict_rewards(self, game, actions, batch_size = 10):
        rewards_predicted = np.empty(len(actions), dtype = np.float32)
        for begin in xrange(0, len(actions), batch_size):
            end = min(len(actions), begin + batch_size)
            state_batch = np.vstack([game.state_as_array() for _ in actions[begin:end]])
            action_batch = np.vstack([game.action_as_array(action) for action in actions[begin:end]])
            rewards_predicted[begin:end] = self.predict_fun(state_batch, action_batch)
        return rewards_predicted    

    def train_on_random_batch(self, states, actions, rewards, batch_size = 10):
        index = np.random.choice(len(rewards), min(batch_size, len(rewards)), replace = False)
        return self.train_fun(states[index], actions[index], rewards[index])        

In [None]:
from IPython.display import clear_output
class Trainer:
    def __init__(self, game_factory, strategy, epsilon_policy = lambda i_game: 0.5, future_rewards_decay = 0.9,
                 batch_size = 25, replay_memory_size = 100, n_games = 10001):
        
        self.strategy = strategy
        self.strategy.compile()
        self.game_factory = game_factory 
        
        game = self.game_factory()
        game_state_shape = game.state_as_array().shape
        action_state_shape = game.action_as_array(game.avaliable_moves()[0]).shape
        
        self.replay_states = np.empty([replay_memory_size] + list(game_state_shape)[1:], dtype = np.float32)
        self.replay_actions = np.empty([replay_memory_size] + list(action_state_shape)[1:], dtype = np.float32)
        self.replay_rewards = np.empty([replay_memory_size], dtype = np.float32)
        
        self.replay_memory_size = replay_memory_size
        self.replay_next_index = 0
        self.replays_in_memory = 0
        self.epsilon_policy = epsilon_policy
        self.n_games = n_games
        self.gamma = future_rewards_decay
        
        self.batch_size = batch_size
    
    def chose_action(self, game, epsilon):
        actions = game.avaliable_moves()
        rewards_predicted = self.strategy.predict_rewards(game, actions, self.batch_size)
        probs_action = [epsilon / len(rewards_predicted)] * len(rewards_predicted)
        probs_action[np.argmax(rewards_predicted)] += (1 - epsilon)  
        i_action = np.random.choice(range(len(actions)), 1, p = probs_action)
        return actions[i_action]
    
    def store_in_replay_memory(self, state, action, reward):        
        self.replay_states[self.replay_next_index] = state
        self.replay_actions[self.replay_next_index] = action
        self.replay_rewards[self.replay_next_index] = reward
        self.replays_in_memory += 1
        self.replays_in_memory = min(self.replays_in_memory, self.replay_memory_size)
        self.replay_next_index = (self.replay_next_index + 1) % self.replay_memory_size
        
    def play_random_replays(self):        
        return self.strategy.train_on_random_batch(self.replay_states[:self.replays_in_memory],
                                                   self.replay_actions[:self.replays_in_memory],
                                                   self.replay_rewards[:self.replays_in_memory],
                                                   self.batch_size)
    
    def train(self, verbose = True):
        loss = 0
        for i_game in xrange(self.n_games):
            game = self.game_factory()
            epsilon = self.epsilon_policy(i_game)

            finished = False
            while not finished:
                action = self.chose_action(game, epsilon)
                state_memory, action_memory = game.state_as_array(), game.action_as_array(action)
                reward_memory = game.make_move(action)
                finished = game.finish()
                if not finished:                    
                    reward_memory -= self.gamma * np.max(self.strategy.predict_rewards(game, game.avaliable_moves()))
                
                self.store_in_replay_memory(state_memory, action_memory, reward_memory)
                loss += self.play_random_replays()
                
            if i_game % 50 == 0 and verbose:        
                print "Game number %s, loss is %s, epsilon %s" % (i_game, loss / 50, epsilon)
                loss = 0
                clear_output(wait = True)

In [None]:
strategy = TicTacToeStrategy(field_size = FIELD_SIZE)
epsilon_policy = lambda i_game: max(0.3, 0.99 * (0.99) ** (i_game / 100))
trainer = Trainer(lambda: TicTacToe(FIELD_SIZE, SIZE_OF_LINE_TO_WIN), strategy, epsilon_policy=epsilon_policy)
trainer.train()

Game number 3150, loss is 0.808865172789, epsilon 0.724980335958


In [None]:
class Bot:
    def __init__(self, strategy):
        self.strategy = strategy
    
    def get_next_action(self, game):
        moves = game.avaliable_moves()
        rewards = strategy.predict_rewards(game, moves, 10)
        return moves[np.argmax(rewards)]

In [None]:
class RandomBot:
    def get_next_action(self, game):
        index = np.random.choice(len(game.avaliable_moves()))
        return game.avaliable_moves()[index]

In [None]:
import time
players = {1 : RandomBot(), -1 : Bot(strategy)}
game = TicTacToe(FIELD_SIZE, SIZE_OF_LINE_TO_WIN)
while not game.finish():
    action = players[game.player()].get_next_action(game)
    game.make_move(action)
    print game
    clear_output(wait = True)
    time.sleep(2)