In [1]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [2]:
import numpy as np
import CoGanh as cg
import tensorflow as tf
from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Flatten
from tensorflow.keras.optimizers import SGD, Adam
import time
from copy import deepcopy
import random

## Define Game Environment and Experience Buffer 

In [4]:
INITIAL_BOARD = np.array([
    [1, 1, 1, 1, 1],
    [1, 0, 0, 0, 1],
    [1, 0, 0, 0, -1],
    [-1, 0, 0, 0, -1],
    [-1, -1, -1, -1, -1],
])

class CoGanhEnv():
    def __init__(self, start_player=1):
        self.start_player = start_player
        self.reset()

    def reset(self):
        self.current_player = self.start_player
        self.board = deepcopy(INITIAL_BOARD)
        self.prev_board = deepcopy(INITIAL_BOARD)
        self.prev_move = None

    def _update_board(self, move):
        # move = (scr, dst) --> old and new position of a chess piece
        src, dst = move
        self.prev_board = deepcopy(self.board)
        self.board = cg.board_after_move_and_capturing(src, dst, self.board)
        self.prev_move = move

    def _is_over(self):
        return np.all(self.board >= 0) or np.all(self.board <= 0)

    def _get_reward(self):
        if np.all(self.board >= 0):
            return 1
        if np.all(self.board <= 0):
            return -1
        return 0
    
    def observe(self):
        return np.concatenate((self.board, self.prev_board), axis=0)

    def act(self, move):
        self._update_board(move)
        self.current_player = -self.current_player
        reward = self._get_reward()
        game_over = reward != 0
        return self.observe(), reward, game_over

    def get_all_possible_moves(self):
        return cg.get_all_legal_moves(self.prev_board, self.board, self.current_player)

class ExperienceReplay(object):
    def __init__(self, max_memory=100, discount=.9):
        self.max_memory = max_memory
        self.memory = list()
        self.discount = discount

    def remember(self, states, game_over):
        # memory[i] = [[state_t, action_t, reward_t, state_t+1], game_over?]
        self.memory.append([states, game_over])
        if len(self.memory) > self.max_memory:
            del self.memory[0]

    def get_batch(self, model, batch_size=10, filtered = False):
        len_memory = len(self.memory)
        num_actions = model.output_shape[-1]
        env_dim = self.memory[0][0][0].shape
        # env_dim = model.input_shape[1]
        inputs = np.zeros((min(len_memory, batch_size), *(env_dim)))
        targets = np.zeros((inputs.shape[0], num_actions))
        for i, idx in enumerate(np.random.randint(0, len_memory,
                                                  size=inputs.shape[0])):
            state_t, action_t, reward_t, state_tp1 = self.memory[idx][0]
            game_over = self.memory[idx][1]

            inputs[i:i+1] = state_t
            # There should be no target values for actions not taken.
            # Thou shalt not correct actions not taken #deep
            targets[i] = get_prediction(model, state=state_t, filtered_moves=None)
            all_legal_moves = None
            if filtered:
                all_legal_moves = cg.get_all_legal_moves(current_board=state_tp1[:5], old_board=state_tp1[5:], player=1)
            Q_sa = np.nanmax(get_prediction(model, state=state_tp1, filtered_moves=all_legal_moves))
            if game_over:  # if game_over is True
                targets[i, action_t] = reward_t
            else:
                # reward_t + gamma * max_a' Q(s', a')
                targets[i, action_t] = reward_t + self.discount * Q_sa
        return inputs, targets

## Define Machine Learning model

In [6]:
def create_model():
    hidden_size = 100
    num_actions = 5 * 5 * 8
    model = Sequential()
    model.add(Flatten(input_shape=(10, 5,)))
    model.add(Dense(hidden_size, activation='relu'))
    model.add(Dense(hidden_size, activation='relu'))
    model.add(Dense(hidden_size, activation='relu'))
    model.add(Dense(num_actions))

    optimizer = Adam(learning_rate=0.01, clipnorm=1.0)
    model.compile(optimizer, "mse")
    return model

model = create_model()

model.summary()

Model: "sequential"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
flatten (Flatten)            (None, 50)                0         
_________________________________________________________________
dense (Dense)                (None, 100)               5100      
_________________________________________________________________
dense_1 (Dense)              (None, 100)               10100     
_________________________________________________________________
dense_2 (Dense)              (None, 100)               10100     
_________________________________________________________________
dense_3 (Dense)              (None, 200)               20200     
Total params: 45,500
Trainable params: 45,500
Non-trainable params: 0
_________________________________________________________________


In [7]:
move_map = [(-1, 1), (0, 1), (1, 1), (-1, 0), (1, 0), (-1, -1), (0, -1), (1, -1)]
def get_move_from_action_number(action):
    """
    0. (0, 0) -> (-1, 1)
    1. (0, 0) -> (0, 1)
    2. (0, 0) -> (1, 1)
    3. (0, 0) -> (-1, 0)
    4. (0, 0) -> (1, 0)
    5. (0, 0) -> (-1, -1)
    6. (0, 0) -> (0, -1)
    7. (0, 0) -> (1, -1)
    """
    bucket = action // 8 
    src = (bucket // 5, bucket % 5)

    order = action % 8
    dst = (src[0] + move_map[order][0], src[1] + move_map[order][1])
    return src, dst

def to_action_number(move):
    # Reverse the calculation of above function
    src, dst = move
    tmp = (dst[0] - src[0], dst[1] - src[1])
    order = move_map.index(tmp)
    bucket = src[0] * 5 + src[1]
    return bucket * 8 + order

In [3]:
def get_prediction(model, state, filtered_moves = None):
    q = model.predict(state.reshape(1, *state.shape))
    if not filtered_moves:
        return q[0]
    filtered_action = [to_action_number(move) for move in filtered_moves]
    adjustment_mat = np.full_like(q[0], np.nan)
    adjustment_mat[filtered_action] = 1
    return q[0] * adjustment_mat

## Training Process

In [None]:
def train_with_policy_punishment(epsilon=.1):
    epoch = 100
    win_cnt = 0
    for e in range(epoch):
        loss = 0.
        env.reset()
        game_over = False
        # get initial input
        input_t = env.observe()

        start = time.time()
        while not game_over:
            input_tm1 = input_t
            # get next action for player 1
            all_legal_moves = env.get_all_possible_moves()
            if np.random.rand() <= epsilon:
                move = random.choice(all_legal_moves)

                # apply action, get rewards and new state
                input_t, reward, game_over = env.act(move)
            else:
                q = get_prediction(model, state=input_tm1, filtered_moves=None)
                action = np.argmax(q)
                move = get_move_from_action_number(action)
                if move not in all_legal_moves:
                    reward = -2
                    game_over = True
                else:
                    input_t, reward, game_over = env.act(move)


            if reward == 1:
                win_cnt += 1

            if not game_over:
                opponent_move = random.choice(env.get_all_possible_moves())
                input_t, reward, game_over = env.act(opponent_move)

            # store experience
            exp_replay.remember([input_tm1, action, reward, input_t], game_over)

            # adapt model
            inputs, targets = exp_replay.get_batch(model, batch_size=50)

            loss += model.train_on_batch(inputs, targets)
        duration = time.time() - start
        print("Epoch {:03d}/999 | Loss {:.4f} | Win count {} | Duration {:.4f}".format(e, loss, win_cnt, duration))

In [None]:
def train_with_policy_filter(epsilon=.1, epsilon_decay=0):
    epoch = 100
    win_cnt = 0
    for e in range(71, epoch):
        loss = 0.
        env.reset()
        game_over = False
        # get initial input
        input_t = env.observe()

        start = time.time()
        while not game_over:
            input_tm1 = input_t
            # get next action for player 1
            all_legal_moves = env.get_all_possible_moves()
            if np.random.rand() <= epsilon:
                move = random.choice(all_legal_moves)
                action = to_action_number(move)

            else:
                q = get_prediction(model, state=input_tm1, filtered_moves=all_legal_moves)
                action = np.nanargmax(q)
                move = get_move_from_action_number(action)

            input_t, reward, game_over = env.act(move)

            if reward == 1:
                win_cnt += 1

            if not game_over:
                opponent_move = random.choice(env.get_all_possible_moves())
                input_t, reward, game_over = env.act(opponent_move)

            # store experience
            exp_replay.remember([input_tm1, action, reward, input_t], game_over)

            # adapt model
            inputs, targets = exp_replay.get_batch(model, batch_size=50, filtered=True)

            loss += model.train_on_batch(inputs, targets)
        duration = time.time() - start
        print("Epoch {:03d}/{} | Loss {:.4f} | Win count {} | Duration {:.4f}".format(e+1, epoch, loss, win_cnt, duration))
        # print("Final board: {}".format(env.board))
        epsilon = max(epsilon * np.exp(-epsilon_decay), .1)
        if e % 10 == 0:
            model.save_weights('/content/drive/MyDrive/AI-checkpoints/cpt-{}'.format(e))
            print('Checkpoint saved at epoch {}'.format(e))


In [5]:
num_actions = 5 * 5 * 8
env = CoGanhEnv()
exp_replay = ExperienceReplay()

In [None]:
train_with_policy_filter(epsilon=np.exp(-0.025) ** 71, epsilon_decay=0.025)

Epoch 072/100 | Loss 6.4227 | Win count 0 | Duration 533.4799
Epoch 073/100 | Loss 0.6693 | Win count 1 | Duration 63.3699
Epoch 074/100 | Loss 3.0619 | Win count 2 | Duration 310.9576
Epoch 075/100 | Loss 2.0223 | Win count 3 | Duration 209.1687
Epoch 076/100 | Loss 6.1562 | Win count 3 | Duration 603.5773
Epoch 077/100 | Loss 3.1458 | Win count 3 | Duration 380.6569
Epoch 078/100 | Loss 4.5278 | Win count 3 | Duration 484.2449
Epoch 079/100 | Loss 0.5422 | Win count 4 | Duration 50.5954
Epoch 080/100 | Loss 6.5304 | Win count 5 | Duration 679.2170
Epoch 081/100 | Loss 13.5875 | Win count 5 | Duration 1566.3799
Checkpoint saved at epoch 80
Epoch 082/100 | Loss 1.2644 | Win count 6 | Duration 188.5893
Epoch 083/100 | Loss 2.1473 | Win count 7 | Duration 290.0671
Epoch 084/100 | Loss 2.2993 | Win count 8 | Duration 319.2916
Epoch 085/100 | Loss 0.8984 | Win count 8 | Duration 117.3280
Epoch 086/100 | Loss 1.8739 | Win count 9 | Duration 208.2245
Epoch 087/100 | Loss 13.0737 | Win count 

In [None]:
model.save("/content/drive/MyDrive/AI-checkpoints/model-decay-epsilon-1.h5")

## Test and Evaluation

In [16]:
def test_self_play(player_model=None, opponent_model=None, match_count=100, epsilon=.1, swap_side=True):
    win_cnt = 0
    models = {
        1: player_model,
        -1: opponent_model
    }
    def play_match(env):
        env.reset()
        game_over = False
        # get initial input
        input_t = env.observe()

        moves_made = 0
        while not game_over:
            input_tm1 = input_t
            model = models[env.current_player]
            # get next action for player 1
            all_legal_moves = env.get_all_possible_moves()
            if model is None or np.random.rand() <= epsilon:
                move = random.choice(all_legal_moves)

            else:
                q = get_prediction(model, state=env.current_player * input_tm1, filtered_moves=all_legal_moves)
                action = np.nanargmax(q)
                move = get_move_from_action_number(action)

            input_t, reward, game_over = env.act(move)
            moves_made += 1

            if reward == 1:
                return moves_made, 1
        
        return moves_made, 0
    
    env = CoGanhEnv(start_player=1)
    epoch = match_count // 2 if swap_side else match_count
    win_cnt = 0
    print("Player goes first")
    for e in range(epoch):
        moves_made, win = play_match(env)
        win_cnt += win
        win_rate = win_cnt / (e + 1)
        print("Match {:03d}/{} | Moves made {} | Win count {} | Win rate {:.4f}".format(e, match_count-1, moves_made, win_cnt, win_rate))
    if swap_side:
        print("---------------------------------------------------")
        print("Opponent goes first")
        env = CoGanhEnv(start_player=-1)
        for e in range(epoch, match_count):
            moves_made, win = play_match(env)
            win_cnt += win
            win_rate = win_cnt / (e + 1)
            print("Match {:03d}/{} | Moves made {} | Win count {} | Win rate {:.4f}".format(e, match_count-1, moves_made, win_cnt, win_rate))

In [11]:
# Test pretrained model
PATH = "/content/drive/MyDrive/AI-checkpoints/"
model_fixed_eps = tf.keras.models.load_model(PATH + "model-policy-filter-fixed-eps-1.h5")
model_decay_eps = tf.keras.models.load_model(PATH + "model-policy-filter-decay-eps-1.h5")



In [17]:
# Test fixed-epsilon model against random-based opponent
test_self_play(player_model=model_fixed_eps, opponent_model=None, match_count=1000, swap_side=True)

Player goes first
Match 000/999 | Moves made 121 | Win count 1 | Win rate 1.0000
Match 001/999 | Moves made 138 | Win count 1 | Win rate 0.5000
Match 002/999 | Moves made 59 | Win count 2 | Win rate 0.6667
Match 003/999 | Moves made 149 | Win count 3 | Win rate 0.7500
Match 004/999 | Moves made 61 | Win count 4 | Win rate 0.8000
Match 005/999 | Moves made 746 | Win count 4 | Win rate 0.6667
Match 006/999 | Moves made 209 | Win count 5 | Win rate 0.7143
Match 007/999 | Moves made 230 | Win count 5 | Win rate 0.6250
Match 008/999 | Moves made 297 | Win count 6 | Win rate 0.6667
Match 009/999 | Moves made 65 | Win count 7 | Win rate 0.7000
Match 010/999 | Moves made 124 | Win count 7 | Win rate 0.6364
Match 011/999 | Moves made 474 | Win count 7 | Win rate 0.5833
Match 012/999 | Moves made 21 | Win count 8 | Win rate 0.6154
Match 013/999 | Moves made 166 | Win count 8 | Win rate 0.5714
Match 014/999 | Moves made 1009 | Win count 9 | Win rate 0.6000
Match 015/999 | Moves made 152 | Win cou

In [None]:
# Test decayed-epsilon model against random-based opponent
test_self_play(player_model=model_decay_eps, opponent_model=None, match_count=1000, swap_side=True)

In [None]:
# Test decayed-epsilon vs fixed-epsilon model against random-based opponent
test_self_play(player_model=model_decay_eps, opponent_model=model_fixed_eps, match_count=1000, swap_side=True)