In [1]:
import numpy as np


In [2]:
import numpy as np


class Board():        
    # private:
    def __init__(self):
        self.reset()
        
    def determine_board_occupation(self, board_status=None):
        """from the board, determine which player own the cell
        """
        if board_state is None:
            board_status = self.state['board'].copy()
            
        new_board_occupation = np.zeros(9)
        
        for cell in range(9):
            cell_status = board_status[cell]
            
            if cell_status[2] == 0:
                if cell_status[1] == 0:
                    if cell_status[0] == 0:
                        new_board_occupation[cell] = 0
                    elif cell_status[0] == 1:
                        new_board_occupation[cell] = 1
                    elif cell_status[0] == 2:
                        new_board_occupation[cell] = 2
                    else:
                        print('Error in def update_board_occupation()\n')
                        return False
                elif cell_status[1] == 1:
                    new_board_occupation[cell] = 1
                elif cell_status[1] == 2:
                    new_board_occupation[cell] = 2
                else:
                    print('Error in def update_board_occupation()\n')
                    return False
            elif cell_status[2] == 1:
                new_board_occupation[cell] = 1
            elif cell_status[2] == 2:
                new_board_occupation[cell] = 2
            else:
                print('Error in def update_board_occupation()\n')
                return False
            
        return new_board_occupation
    
    def update_board_occupation(self):
        """update board_occupation
        """
        self.state['board_occupy'] = self.determine_board_occupation()
         
        return True       
    
    def is_valid_action(self, action, player):
        move_from = action.get('from')
        move_to = action.get('to')
        move_size = action.get('size')        
        board = self.state['board'].copy()
        
        # if no inventory, then can't move
        if move_from == 9 and self.state['%sp_inventory' % player][action.get('size')] == 0:
            print('Invalid move! Player %s does not have size %s chess in the inventory!\n' % (player, move_size))
            return False
        # if the chess is not on the board, cannot move
        elif board[move_from][move_size] != player:
            print('Invalid move! On cell %s size %s, the chess does not belong to player %s! \n' % (move_from, move_size, player))
            return False
        
        # if size smaller than the current block, can't move
        target_cell = board[move_to]
        is_small_size_occupy_target = (target_cell[0] != 0)
        is_medium_size_occupy_target = (target_cell[1] != 0)
        is_large_size_occupy_target = (target_cell[2] != 0)
        
        if move_size == 0 and (is_small_size_occupy_target or is_medium_size_occupy_target or is_large_size_occupy_target):
            print('Invalid move! Cannot move to a cell that have same size or larger size chess occupied!\n')
            return False
        
        elif move_size == 1 and (is_medium_size_occupy_target or is_large_size_occupy_target):
            print('Invalid move! Cannot move to a cell that have same size or larger size chess occupied!\n')
            return False
        
        elif move_size == 2 and (is_large_size_occupy_target):
            print('Invalid move! Cannot move to a cell that have same size or larger size chess occupied!\n')
            return False
        
        # cannot move chess that is under other chess
        origin_cell = board[move_from]
        is_medium_size_occupy_origin = (target_cell[1] != 0)
        is_large_size_occupy_origin = (target_cell[2] != 0)
        
        if move_size == 0 and (is_medium_size_occupy_origin or is_large_size_occupy_origin):
            print('Invalid move! Cannot move to a cell that is under another chess!\n')
            return False
        
        elif move_size == 1 and (is_large_size_occupy_origin):
            print('Invalid move! Cannot move to a cell that is under another chess!\n')
            return False
            
        # if opponent wins due to this move, then can't move
        board[move_from][move_size] = 0
        board_occupation = self.determine_board_occupation(board)
        is_terminal_state, winner = self.check_if_terminal_state(board_occupation)
        if is_terminal_state:
            print('Invalid move! Opponent will win due to this move! \n')
            return False
             
        return True
     
    def update_state(self, action):
        """given a valid movement, update the board state
        action: {'from': 0, 'to': 0, 'size': 0}
        player: 1 or 2
        """
        move_from = action.get('from')
        move_to = action.get('to')
        move_size = action.get('size')
        
        player = self.state['player_turn']
        
        if move_from == 9:
            # if from = 9 -> move chess from inventory
            self.state['%sp_inventory' % player][action.get('size')] -= 1
        else:
            # if move from the board, remove the chess on that cell 
            self.state['board'][move_from][move_size] = 0    
            
        # put the chess on the new cell
        self.state['board'][move_to][move_size] = player
        
        # update board occupation
        self.update_board_occupation()

        # change player turn
        if self.state['player_turn'] == 1:
            self.state['player_turn'] = 2
        else:
            self.state['player_turn'] = 1
        
        return True
    
    def determine_reward(self):
        is_terminal_state, winner = self.check_if_terminal_state()
        if is_terminal_state:
            if winner == 1:
                return (100, -100)
            elif winner == 2:
                return (-100, 100)
            else:
                print("Error in determining winner. Current winner is %s.\n" % winner)
        else:
            ### TODO: set reward for occupying cell?
            return (0, 0)
        
    def update_observation_from_state(self):   
        # update observation        
        self.observation['board'] = self.state['board'].copy()
        self.observation['1p_inventory'] = self.state['1p_inventory'].copy()
        self.observation['2p_inventory'] = self.state['2p_inventory'].copy()
        return True
    
    def get_activate_player(self):
        pass
    
    
    
    # public:
    def reset(self):
        self.state = {}
        self.state['board'] = np.zeros([9, 3])
        self.state['1p_inventory'] = np.array([2, 2, 2])
        self.state['2p_inventory'] = np.array([2, 2, 2])
        self.state['board_occupy'] = np.zeros(9)
        # 1st player moves first
        self.state['player_turn'] = 1
        
        self.observation = {}
        self.observation['board'] = np.zeros([9, 3])
        self.observation['1p_inventory'] = np.array([2, 2, 2])
        self.observation['2p_inventory'] = np.array([2, 2, 2])
    
    def check_if_terminal_state(self, board_occupation=None):
        # if no input board occupation, then get board occupation from self.state['board_occupy']
        if board_occupation is None:
            board_occupation = self.state['board_occupy']
        
        for row in range(3):
            current_row = board_occupation[row*3:(row+1)*3-1]
            
            if np.array_equal(current_row, [1, 1, 1]):
                return [True, 1]
                
            if np.array_equal(current_row, [2, 2, 2]):
                print('2nd player wins!')
                return [True, 2]
            
        for col in range(3):
            current_col = board_occupation[col, col+3, col+6]
            
            if np.array_equal(current_col, [1, 1, 1]):
                return [True, 1]
            
            if np.array_equal(current_col, [2, 2, 2]):
                return [True, 2]
            
        if (board_occupation[[0, 4, 8]] == [1, 1, 1]) or (board_occupation[[2, 4, 6]] == [1, 1, 1]):
            return [True, 1]
        
        if (board_occupation[[0, 4, 8]] == [2, 2, 2]) or (board_occupation[[2, 4, 6]] == [2, 2, 2]):
            return [True, 2]
        
        # if both win, the mover loss (because it removes the chess and let the other player wins first)
        
        # if no one wins
        return [False, 0]     
    
    def step(self, action):
        """ get the activate player from self memory. assume the action is valid, perform the action and 
        update the state. then swap the active player. return the new observation, reward
        """
        if not is_valid_action(action):
            print('Error in def step. Current state is %s. Current action is %s.\n' % (self.state, action))
            return None
        
        is_terminal_state, winner = self.check_if_terminal_state()
        
        self.update_state(action, player)
        
        self.update_observation_from_state()
        
        reward = self.determine_reward()
               
        return self.observation, self.state['player_turn'], reward, is_terminal_state
    
    def get_current_info(self):
        return self.observation, self.state['player_turn'], 0, is_terminal_state

    
    def calculate_reward(self):
        pass
    

In [3]:
def swap_observation_view(observation, player_id):
    '''
    player_id either 1 or -1. Swap the observation such that 1 means self and -1 means the opponent 
    e.g. 
    if player_id = 1, no need to swap the view,
    input = array([[1, -1, 0],
                   [0, 0, 0],
                   [-1, -1, 1],
                   [1, -1, 1],
                   [-1, 1, -1]])
           
    output = array([[-1, 1, 0],
                    [0, 0, 0],
                    [1, 1, -1],
                    [-1, 1, -1],
                    [1, -1, 1]])
    
    '''
    swapped_view_observation = observation.copy()
    np.putmask(swapped_view_observation, swapped_view_observation == 1, 3)
    np.putmask(swapped_view_observation, swapped_view_observation == 2, 1)
    np.putmask(swapped_view_observation, swapped_view_observation == 3, 2)
    
    return swapped_view_observation

if False:
    observation = np.array([[1,2,0], [0,0,0], [2,2,1], [1,2,1], [2,1,2]])
    observation
    swap_observation_view(observation)

In [4]:
def process_memory(memory, is_terminal_state):
    if (len(memory) == memory_size + 1) and (is_terminal_state):
        # special handle
        pass
    
    # do all the shifting 
    for each record:
        if player is 2:
            swap observation, next_observation, reward
        
    pass

SyntaxError: invalid syntax (<ipython-input-4-ab283bd9a1f2>, line 7)

In [5]:
memory = np.array([])

next_observation, player_turn, reward, is_terminal_state = env.step(action)
memory.append([original_observation, action, reward, next_observation, player])

if (len(memory) == memory_size + 1) or (is_terminal_state):
    # shift reward, state, done, and assign 1 for my round and 2 for the opponent round
    memory_for_training = process_memory(memory, is_terminal_state)
    if is_terminal_state:
        memory = np.array([])
    else:
        memory = memory[-1]
        
    agent.learn(memory_for_training)

NameError: name 'env' is not defined

In [None]:
# main flow

load_weight_path = 'dummy_path'
model_weight_path = 'dummy_path'

env = make_env()
bot1 = Bot(load_weight_path)
bot2 = Bot()
bot_list = [bot1, bot2]
unprocess_memory = np.array([])

num_episode = 10


for episode in range(num_episode):
    env.reset()
    
    # get initial state
    observation, player_turn, reward, is_terminal_state = env.get_current_info()
    
    while not is_terminal_state:
        # if now is player 2's turn, swap the observation
        if player_turn == 2:
            swapped_observation = swap_observation_view(observation)
            action = bot_list[player_turn-1].select_action(swapped_observation)
        elif player_turn == 1:
            action = bot_list[player_turn-1].select_action(observation)
        else:
            print('Error in player turn. Current player turn is %s.\n' % player_turn)
        
        next_observation, next_player_turn, reward, is_terminal_state = env.step(action)
        
        unprocess_memory.append([observation, action, reward, next_observation, player_turn])
        
        if (len(unprocess_memory) == memory_size + 1) or (is_terminal_state):
            # shift reward, state, done, and assign 1 for my round and 2 for the opponent round
            memory_for_training = process_memory(unprocess_memory, is_terminal_state)
            if is_terminal_state:
                unprocess_memory = np.array([])
            else:
                unprocess_memory = unprocess_memory[-1]

            bot1.learn(memory_for_training)
            
        observation, player_turn = next_observation, next_player_turn
        
    # save the model weight periodically
    if (episode + 1) % 50 == 0:
        bot1.save_weight(model_weight_path)

In [None]:
class Bot():
    def __init__(self, load_weight_path=None):
        self.observation['board'] = np.zeros([9, 3])
        self.observation['1p_inventory'] = np.array([2, 2, 2])
        self.observation['2p_inventory'] = np.array([2, 2, 2])
                
        self.model = self.build_model()
        if load_weight_path is not None:
            self.model = self.load_weight()
        
        self
        pass
    
    def build_model(self):
        pass
    
    def load_model(self):
        pass
    
    def choose_action(self, observation):
        pass

In [None]:
bot1 = Bot()
bot2 = Bot()

player_list = [bot1, bot2]
save_list = []

for episode in range(100):
    # reset environment
    env.reset()
    
    # start from the 1st player
    player_turn = 1
    observation = env.observation
    while game not end:
        action = player_list[state['player_turn']].pick_action(observation)
        
        # if the action is not valid, the program will be break      
        state, next_observation, reward = env.step(action)
        
        save_list.append([observation, action, next_observation, reward])
        observation = next_observation
        

        
        

In [4]:
board = np.zeros([9, 3])
np.zeros(3)

array([0., 0., 0.])

In [6]:
temp = np.array([1,2,3,4,5,6,7,8,9])

array([2, 4, 6])