In [1]:
import sys, os, random, shutil

import numpy as np
from tqdm  import tqdm
import tensorflow as tf

# sys.path.append(os.path.dirname(os.path.dirname(os.path.dirname(os.path.abspath(__file__)))))
sys.path.append('../')
sys.path.append('../../')

In [2]:
X_train = np.load('../../ds/1000/boards.npy')
y_train = np.load('../../ds/1000/scores.npy')

print(X_train.shape)
print(y_train.shape)

(1000, 4, 5, 5)
(1000,)


In [3]:
def build_model(conv_size, conv_depth):
    model = tf.keras.Sequential([
        tf.keras.layers.Input(shape=(4, 5, 5)),
        tf.keras.layers.Conv2D(filters= conv_size, kernel_size= (3, 3), padding='same', activation='relu'),
        tf.keras.layers.Conv2D(filters= conv_size, kernel_size= (3, 3), padding='same', activation='relu'),
        tf.keras.layers.Conv2D(filters= conv_size, kernel_size= (3, 3), padding='same', activation='relu'),
        tf.keras.layers.Conv2D(filters= conv_size, kernel_size= (3, 3), padding='same', activation='relu'),
        tf.keras.layers.Flatten(),
        tf.keras.layers.Dense(64, activation='relu'),
        tf.keras.layers.Dense(1, activation='sigmoid')
    ])

    return model

model = build_model(5, 4)
tf.keras.utils.plot_model(model, to_file='model_plot.png', show_shapes=True, show_layer_names=False)

You must install pydot (`pip install pydot`) and install graphviz (see instructions at https://graphviz.gitlab.io/download/) for plot_model to work.


In [4]:
import tensorflow.keras.callbacks as callbacks

model.compile(optimizer=tf.keras.optimizers.Adam(5e-4), loss='mean_squared_error')
model.summary()
model.fit(X_train, y_train,
          batch_size=20,
          epochs=1000,
          verbose=1,
          validation_split=0.1,
          callbacks=[callbacks.ReduceLROnPlateau(monitor='loss', patience=10),
                     callbacks.EarlyStopping(monitor='loss', patience=15, min_delta=1e-4)])

model.save('model.h5')

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 conv2d (Conv2D)             (None, 4, 5, 5)           230       
                                                                 
 conv2d_1 (Conv2D)           (None, 4, 5, 5)           230       
                                                                 
 conv2d_2 (Conv2D)           (None, 4, 5, 5)           230       
                                                                 
 conv2d_3 (Conv2D)           (None, 4, 5, 5)           230       
                                                                 
 flatten (Flatten)           (None, 100)               0         
                                                                 
 dense (Dense)               (None, 64)                6464      
                                                                 
 dense_1 (Dense)             (None, 1)                 6

In [36]:
from algorithms.problem import State, Problem
MAX_DEPTH = 5

def move(prev_board, board, player, remain_time_x, remain_time_y):
    '''
        Get random move

        Input
        ----------
            board: map(5*5);
            player: 1 or -1, represent for player
            remain_time_x: Time remain (ms)
            remain_time_y: Time remain (ms)
        Output  
        ----------
            optimize action from all possible action.
            eg. ((1,1),(1,2)).  
    '''

    state = State(board, player)
    prev_state = State(prev_board, -player) if prev_board is not None else None
    problem = Problem()
    visited_states = {}

    def _hash_state(state: State):
        hash_value = ''
        for coor_y in range(state.height):
            for coor_x in range(state.width):
                hash_value +=   chr(98+state.board[coor_y][coor_x])
        hash_value += chr(98+state.player)
        return hash_value

    def _is_visited(hased_state, depth):
        return hased_state in visited_states and \
            visited_states[hased_state][0] >= depth

    def _add_visited_state(hased_state, depth, score):
        if hased_state not in visited_states:
            visited_states[hased_state] = [depth,score]
        elif visited_states[hased_state][0] < depth:
            visited_states[hased_state][0] = depth
            visited_states[hased_state][1] = score

    def _calculate_score(state: State):
        return np.sum(state.board)
    
    def minimax_eval(prev_state, state):
        board3d = np.zeros((4, 5, 5))

        if state.player == 1:
            board3d[0] = (state.board == -1).astype(np.int8)
            board3d[1] = (state.board == 1).astype(np.int8)
            board3d[2] = (prev_state.board == -1).astype(np.int8)
            board3d[3] = (prev_state.board == 1).astype(np.int8)

        else:
            board3d[0] = (state.board == 1).astype(np.int8)
            board3d[1] = (state.board == -1).astype(np.int8)
            board3d[2] = (prev_state.board == 1).astype(np.int8)
            board3d[3] = (prev_state.board == -1).astype(np.int8)
        
        board3d = np.expand_dims(board3d, 0)

        result = model.predict(board3d)[0][0]
        return result
        
    def _minimax(prev_state, state, depth, alpha, beta):
        if state.check_winning_state() != 0:
            return (), 1000*state.check_winning_state()

        if(depth == 0):
            # return (), _calculate_score(state)
        # if(depth == 0):
            return (), minimax_eval(prev_state, state)
        # if (depth == 0 or state.check_winning_state() != 0):
            # return (), minimax_eval(prev_state, state)

        hased_state = _hash_state(state)

        # Get all possible actions
        dict_possible_moves = problem.get_possible_moves(prev_state, state)

        # Get all possible state and their info (move to get, score)
        next_states_info = []
        for start, possible_ends in dict_possible_moves.items():
            for end in possible_ends:
                next_move = (start, end)
                next_state = problem.move(state, next_move)
                # score = _calculate_score(next_state)
                score = minimax_eval(state, next_state)
                next_states_info.append((score,next_move,next_state))

        best_move = None
        best_score  = 0

        if(state.player == 1):
            next_states_info.sort(key=lambda x: x[0], reverse=True) # sort by score

            best_score = -1000
            for _, next_move, next_state in next_states_info:
                hased_next_state = _hash_state(next_state)

                if(_is_visited(hased_next_state, depth-1)):
                    value = visited_states[hased_next_state][1]
                else:
                    _, value = _minimax(state, next_state, depth-1, alpha, beta)
                    _add_visited_state(hased_next_state, depth-1, value)

                if value > best_score:
                    best_score = value
                    best_move = next_move
                    
                if alpha < best_score:
                    alpha = best_score

                if(beta <= alpha): break
            
        else:
            next_states_info.sort(key=lambda x: x[0], reverse=False) # sort by score

            best_score = 1000
            for _, next_move, next_state in next_states_info:
                hased_next_state = _hash_state(next_state)

                if(_is_visited(hased_next_state, depth-1)):
                    value = visited_states[hased_next_state][1]
                else:
                    _, value = _minimax(state, next_state, depth-1, alpha, beta)
                    _add_visited_state(hased_next_state, depth-1, value)

                if value < best_score:
                    best_score = value
                    best_move = next_move

                if beta > best_score:
                    beta = best_score

                if(beta <= alpha): break

        if best_move is None:
            for start in dict_possible_moves.keys():
                best_move = (start,dict_possible_moves[start][0])
                break

        return best_move, best_score

    action, value = _minimax(prev_state, state, MAX_DEPTH, -1000, 1000)
    return action    

In [37]:
init_game = {
    "prev_board": [[ 0,  1,  1,  1,  1],
                  [ 1,  1,  0,  0,  1],
                  [ 1,  0,  0,  0, -1],
                  [-1,  0,  0,  0, -1],
                  [-1, -1, -1, -1, -1]],
    "board": [[ 0,  1,  1,  1,  1],
              [ 1,  1,  0,  0,  1],
              [ 1,  0,  0,  0, -1],
              [-1,  0,  0,  -1, -1],
              [-1, -1, -1, -1, 0]],     
}

# prev_board = np.array(init_game["prev_board"])
# board = np.array(init_game["board"])


prev_board = init_game["prev_board"]
board = init_game["board"]

# board3d = np.zeros((4, 5, 5))

# # if state.player == 1:
# board3d[0] = (board == -1).astype(np.int8)
# board3d[1] = (board == 1).astype(np.int8)
# board3d[2] = (prev_board == -1).astype(np.int8)
# board3d[3] = (prev_board == 1).astype(np.int8)

# # else:
# #     board3d[0] = (state.board == 1).astype(np.int8)
# #     board3d[1] = (state.board == -1).astype(np.int8)
# #     board3d[2] = (prev_state.board == 1).astype(np.int8)
# #     board3d[3] = (prev_state.board == -1).astype(np.int8)

# board3d = np.expand_dims(board3d, 0)

# print(model.predict(board3d)[0][0])


action = move(prev_board, board, player= 1, remain_time_x=1, remain_time_y=1)
print(action)



KeyboardInterrupt: 