In [36]:
import os
import json
import time
import random
import numpy as np
import pandas as pd
import tensorflow as tf
from tensorflow.keras import layers, Model
from tensorflow.keras.models import Sequential, load_model
from tensorflow.keras.callbacks import EarlyStopping, Callback

**MiniMax engine**

In [37]:
class TicTacToe:
    moves = []
    player = None
    game_status = None
    engine_X = None
    engine_0 = None
    RULES = [
        [0, 1, 2], [3, 4, 5], [6, 7, 8],  # horizontal
        [0, 3, 6], [1, 4, 7], [2, 5, 8],  # vertical
        [0, 4, 8], [2, 4, 6]  # diagonal
    ]

    def __init__(self):
        pass

    def new(self, engine_X, engine_0):
        self.player = 1
        self.moves = []
        self.game_status = None
        self.engine_X = engine_X
        self.engine_0 = engine_0

    # 1 if won <X>, -1 if won <0>, 0 if draw, None if game is not finished
    def status(self, board=None):
        if board is None:
            return self.game_status

        is_draw = True
        for rule in self.RULES:
            abs_sum = abs(board[rule[0]] + board[rule[1]] + board[rule[2]])
            if abs_sum == 3:
                return board[rule[0]]  # return player won (1, -1)
            if is_draw:
                sum_abs = abs(board[rule[0]]) + abs(board[rule[1]]) + abs(board[rule[2]])
                is_draw = False if abs_sum == sum_abs else is_draw
        return 0 if is_draw else None  # return draw or game is not finished

    def board(self, pos=None):
        board = 9 * [0.]
        pos = len(self.moves) if pos is None else pos
        last = pos if pos >= 0 else len(self.moves) + pos
        turn = 1
        for idx in self.moves[0:last]:
            board[idx] = float(turn)
            turn = -turn
        return board

    def timeseries(self):
        moves = 9 * [0.]
        time_series = [moves.copy()]
        turn = 1
        for move_idx in self.moves:
            moves[move_idx] = 1. if turn == 1 else -1.
            time_series.append(moves.copy())
            turn = -turn
        return time_series

    def is_tie(self, status=None):
        status = status if status else self.game_status
        return status is None

    def is_game_over(self, status=None):
        return not self.is_tie(status)

    def remove_illegal_moves(self, Y, moves = None):
        moves = moves if moves else self.moves
        for pos in moves:
            Y[pos] = -1
        return Y

    def auto_play(self, player_X, player_0):
        self.new(player_X, player_0)
        for i in range(0, 9):
            self.auto_move()
            self.engine_X.train(self)
            self.engine_0.train(self)
            if self.is_game_over():
                break

    def auto_move(self):
        if self.is_tie():  # game is not finished
            best_move = self.engine_X.move(self) if self.player == 1 else self.engine_0.move(self)
            if best_move in self.moves:
                print('!!!!!!!!!! Wrong move')
                exit(0)
            self.moves.append(best_move)
        # update status
        self.game_status = self.status(self.board())
        if self.is_tie():  # game is continue we change player, else left winner or last moved as is
            self.player = -self.player

    def print(self):
        board = self.board()
        print(board)
        print("\n", self.engine_X.code, self.engine_0.code)
        for r in range(0, 3):
            for c in range(0, 3):
                mark = ' '
                mark = 'X' if board[3*r + c] == 1 else mark
                mark = '0' if board[3 * r + c] == -1 else mark
                print(mark, end='')
            print()

In [38]:
class Stats:
    STEP = 10
    WINDOW = 100

    def __init__(self, engines):
        self.counter = 0
        self.start_time = int(time.time() * 1000)
        self.engines = engines
        self.statuses = []

    def add(self, game):
        self.counter += 1

        won = game.status()
        code_x = game.engine_X.code
        code_0 = game.engine_0.code

        stats = {
            code_x: {'g': 1, 'w': 0, 'd': 0, 'l': 0},
            code_0: {'g': 1, 'w': 0, 'd': 0, 'l': 0}
        }

        if won:
            win, lost = (code_x, code_0) if won == 1 else (code_0, code_x)
            stats[win]['w'] = 1
            stats[lost]['l'] = 1
        else:
            stats[code_x]['d'] = 1
            stats[code_0]['d'] = 1

        self.statuses.append(stats)
        self.pop()

        if self.counter == 1:
            self.header()
        if self.counter % self.STEP == 0:
            self.print()

    def pop(self):
        if len(self.statuses) > self.WINDOW:
            self.statuses.pop(0)

    def header(self):
        print(f"Stats config: STEP = {self.STEP}, WINDOW = {self.WINDOW}")

    def print(self):
        loop = int(time.time() * 1000)
        p = lambda x, c: round(100*x/c)
        print(f"Game {self.counter:>5} | {int((loop - self.start_time) / 1000):>5}sec", end=' | ')
        stats_str = []
        for item in self.engines:
            engine = item.code
            cnt = win = drw = lst = 0
            for row in self.statuses:
                if engine in row:
                    cnt += row[engine]['g']
                    win += row[engine]['w']
                    drw += row[engine]['d']
                    lst += row[engine]['l']
            stats_str.append(
                f"{engine.upper()}: {p(win,cnt):>2}% wins, {p(drw,cnt):>2}% draws, {p(lst,cnt):>2}% losts")
        print(' | '.join(stats_str))


In [39]:
class EngineMiniMax:
    INF = 1000

    def __init__(self, code='MinMax', max_depth=5):
        self.code = f"{code}-{max_depth}"
        self.max_depth = max_depth
        
    def move(self, game):
        best_moves = self.__minimax(game, game.board())
        moves = self.__legal_moves(best_moves)
        return moves.index(max(moves))

    def train(self, game):
        pass

    def __legal_moves(self, best_moves):
        y = 9 * [0]
        for move in best_moves:
            y[move] = 1 + random.uniform(-0.5, 0.5)
        y = game.remove_illegal_moves(y)
        #print('MM legal', y)
        return y

    def __minimax(self, game, board, turn=1, depth=0):
        if depth:
            status = game.status(board)
            if game.is_game_over(status):
                return status / depth   # win

            if depth >= self.max_depth:
                return 0

        best_score = -self.INF if turn > 0 else self.INF
        best_moves = []

        for i in range(len(board)):
            if board[i] != 0:  # skip zero squares
                continue
            # only 1 and -1
            board[i] = turn
            score = self.__minimax(game, board, -turn, depth + 1)
            if turn > 0 and score >= best_score:
                if best_score != score:
                    best_moves = []
                best_score = score
                if depth == 0:
                    best_moves.append(i)
            elif turn < 0 and score <= best_score:
                if best_score != score:
                    best_moves = []
                best_score = score
                if depth == 0:
                    best_moves.append(i)
            board[i] = 0

        return best_score if depth else best_moves


In [40]:
class EngineDenseRL2x:
    unique_ids = {}
    EPOCHS = 30
    SAVE_EACH_TRAINS = 100

    REWARD_LR = 0.7
    REWARD_WIN = 0.90
    REWARD_DRW = 0.30
    REWARD_LST = 0.10

    def __init__(self, code='EngineDenseRL2x', unique=False):
        self.counter = 0
        self.code = code
        self.unique_ids = {} if unique else False
        self.model_file = f"../tmp/ttt/{self.code}.keras"
        self.extra_file = f"../tmp/ttt/{self.code}.extra.json"
        os.makedirs(os.path.dirname(self.model_file), exist_ok=True)
        self.__init_model()

    def __init_model(self):
        self.model = self.__load()
        if not self.model:
            self.model = Sequential([
                layers.ReLU(72, input_shape=(18,)),    #  2*9 as split X and 0 to separate cells
                #layers.Normalization(),
                layers.ReLU(72),
                layers.Dropout(0.2),
                layers.Normalization(),
                layers.Dense(9, activation='sigmoid')
            ])
            self.model.compile(optimizer='adam', loss='mse')   # binary_crossentropy

    def __save(self):
        if self.counter % self.SAVE_EACH_TRAINS == 0:
            self.model.save(self.model_file)
            extra = {'counter': self.counter}
            with open(self.extra_file, 'w') as extra_file:
                json.dump(extra, extra_file)
            print('saved')

    def __load(self):
        if os.path.exists(self.model_file):
            with open(self.extra_file, 'r') as extra_file:
                params = json.load(extra_file)
                self.counter = params['counter']
            print(f'loaded {self.code}, last game number is {self.counter}')
            return load_model(self.model_file)
        return False


    def __check_uniq(self, game):
        if self.unique_ids:
            game_id = ''.join(str(item) for item in game.moves)
            if game_id in self.unique_ids:
                return True
            self.unique_ids[game_id] = True
        return False

    def __board2x(self, game):
        board = game.board()
        board2x = 9 * [1, 0]
        # if empty cell then first is 1 to decrase count of 0 values. And 0 if cell is X moved
        for i in range(len(board)):
            if board[i] == 1:
                board2x[2 * i] = 0
            if board[i] == -1:
                board2x[2 * i + 1] = 1
        return board2x

    def __timeseries2x(self, game):
        board = game.board()
        series = 9 * [1, 0]
        timeseries2x = []
        for i in game.moves:
            if board[i] == 1:
                series[2 * i] = 0
            if board[i] == -1:
                series[2 * i + 1] = 1
            timeseries2x.append(series.copy())
        return timeseries2x

    def __legal_moves(self, game, Y):
        for i in range(len(Y)):
            Y[i] = game.remove_illegal_moves(Y[i], game.moves[:i])
        #print('NN legal', Y)
        return Y

    def move(self, game):
        X = [self.__board2x(game)]
        Y = self.model.predict(X, verbose=0)
        y = game.remove_illegal_moves(Y[0])
        return np.argmax(Y[0])

    def train(self, game):
        # this engine trains only on the end of game
        if game.is_tie():
            return

        if self.__check_uniq(game):
            return

        self.counter += 1

        X = self.__timeseries2x(game)[:-1]     # skip last series
        Y = self.model.predict(X, verbose=0)
        Y = self.__legal_moves(game, Y)

        turn_a = True
        reward_a = self.REWARD_WIN if game.status() else self.REWARD_DRW
        reward_b = self.REWARD_LST if game.status() else self.REWARD_DRW

        for t in range(len(Y) - 1, -1, -1):
            move = game.moves[t+1]
            # print(t, move, game.moves, X[t], Y[t])
            if turn_a:
                reward_a = Y[t][move] + self.REWARD_LR * (reward_a - Y[t][move])
                Y[t][move] = reward_a
            else:
                reward_b = Y[t][move] + self.REWARD_LR * (reward_b - Y[t][move])
                Y[t][move] = reward_b
            turn_a = not turn_a
        early_stopping = EarlyStopping(monitor='loss', patience=1, restore_best_weights=True)
        self.model.fit(np.array(X), np.array(Y), epochs=self.EPOCHS, batch_size=10, verbose=0, callbacks=[early_stopping])

        self.__save()


In [41]:
class EngineDenseRL3x:
    unique_ids = {}

    EPOCHS = 300

    REWARD_LR = 0.7
    REWARD_WIN = 0.90
    REWARD_DRW = 0.30
    REWARD_LST = 0.10

    def __init__(self, code='EngineDenseRL3x', unique=False):
        self.code = code
        self.unique_ids = {} if unique else False
        self.model = Sequential([
            layers.ReLU(81, input_shape=(27,)),    #  2*9 as split X and 0 to separate cells
            layers.Normalization(),
            layers.ReLU(243),
            layers.Dropout(0.2),
            layers.Normalization(),
            layers.Dense(9, activation='sigmoid')
        ])
        self.model.compile(optimizer='adam', loss='binary_crossentropy')   # binary_crossentropy
                        
    def __check_uniq(self, game):
        if self.unique_ids:
            game_id = ''.join(str(item) for item in game.moves)
            if game_id in self.unique_ids:
                return True
            self.unique_ids[game_id] = True
        return False

    def __board3x(self, game):
        board = game.board()
        board3x = 9 * [0, 0, 0]
        for i in range(len(board)):
            if board[i] == 1:
                board3x[3 * i] = 1
            if board[i] == -1:
                board3x[3 * i + 1] = 1
            if board[i] == 0:
                board3x[3 * i + 2] = 1
        return board3x

    def __timeseries3x(self, game):
        board = game.board()
        series = 9 * [0, 0, 0]
        timeseries3x = []
        for i in game.moves:
            if board[i] == 1:
                series[3 * i] = 1
            if board[i] == -1:
                series[3 * i + 1] = 1
            if board[i] == 0:
                series[3 * i + 2] = 1
            timeseries3x.append(series.copy())
        return timeseries3x

    def __legal_moves(self, game, Y):
        for i in range(len(Y)):
            Y[i] = game.remove_illegal_moves(Y[i], game.moves[:i])
        #print('NN legal', Y)
        return Y

    def move(self, game):
        X = [self.__board3x(game)]
        Y = self.model.predict(X, verbose=0)
        y = game.remove_illegal_moves(Y[0])
        return np.argmax(Y[0])

    def train(self, game):
        # this engine trains only on the end of game
        if game.is_tie():
            return

        if self.__check_uniq(game):
            return

        X = self.__timeseries3x(game)[:-1]     # skip last series
        Y = self.model.predict(X, verbose=0)
        Y = self.__legal_moves(game, Y)

        turn_a = True
        reward_a = self.REWARD_WIN if game.status() else self.REWARD_DRW
        reward_b = self.REWARD_LST if game.status() else self.REWARD_DRW

        for t in range(len(Y) - 1, -1, -1):
            move = game.moves[t+1]
            #print(t, move, game.moves, X[t], Y[t])
            if turn_a:
                reward_a = Y[t][move] + self.REWARD_LR * (reward_a - Y[t][move])
                Y[t][move] = reward_a
            else:
                reward_b = Y[t][move] + self.REWARD_LR * (reward_b - Y[t][move])
                Y[t][move] = reward_b
            turn_a = not turn_a
        early_stopping = EarlyStopping(monitor='loss', patience=1, restore_best_weights=True)
        self.model.fit(np.array(X), np.array(Y), epochs=self.EPOCHS, batch_size=10, verbose=0, callbacks=[early_stopping])

**Game loop**

In [42]:
engines = [
    EngineMiniMax('Mini-Max', max_depth=5),
    EngineDenseRL2x('RL-2x', unique=False),
    #EngineDenseRL3x('RL-3x', unique=False),
]

game = TicTacToe()
stats = Stats(engines)

for game_num in range(1, 20000):
    engine_X, engine_0 = random.sample(engines, k=2)
    game.auto_play(engine_X, engine_0)
    stats.add(game)

loaded RL-2x, last game number is 2600


Stats config: STEP = 10, WINDOW = 100
Game    10 |     5sec | MINI-MAX-5: 90% wins, 10% draws,  0% losts | RL-2X:  0% wins, 10% draws, 90% losts
Game    20 |    10sec | MINI-MAX-5: 80% wins, 20% draws,  0% losts | RL-2X:  0% wins, 20% draws, 80% losts
Game    30 |    14sec | MINI-MAX-5: 77% wins, 23% draws,  0% losts | RL-2X:  0% wins, 23% draws, 77% losts
Game    40 |    19sec | MINI-MAX-5: 78% wins, 22% draws,  0% losts | RL-2X:  0% wins, 22% draws, 78% losts
Game    50 |    23sec | MINI-MAX-5: 78% wins, 22% draws,  0% losts | RL-2X:  0% wins, 22% draws, 78% losts
Game    60 |    28sec | MINI-MAX-5: 80% wins, 20% draws,  0% losts | RL-2X:  0% wins, 20% draws, 80% losts
Game    70 |    32sec | MINI-MAX-5: 81% wins, 19% draws,  0% losts | RL-2X:  0% wins, 19% draws, 81% losts
Game    80 |    37sec | MINI-MAX-5: 80% wins, 20% draws,  0% losts | RL-2X:  0% wins, 20% draws, 80% losts
Game    90 |    42sec | MINI-MAX-5: 79% wins, 21% draws,  0% losts | RL-2X:  0% wins, 21% draws, 79% losts

KeyboardInterrupt: 