Tic Tac Toe
---
Two players against each other

<img style="float:left" src="board.png" alt="drawing" width="200"/>

In [1]:
import numpy as np
import pickle
import random
from collections import Counter

In [None]:
BOARD_ROWS = 3
BOARD_COLS = 3

### Board State
---
Reflect & Judge the state

2 players p1 and p2; p1 uses symbol 1 and p2 uses symbol 2, vacancy as 0

In [None]:
class State:
    def __init__(self, p1, p2):
        self.board = np.zeros((BOARD_ROWS, BOARD_COLS))
        self.p1 = p1
        self.p2 = p2
        self.isEnd = False
        self.boardHash = None
        # init p1 plays first
        self.playerSymbol = 1
        self.results = []
        
    
    # get unique hash of current board state
    def getHash(self):
#         print('state', self.board)
#        board_str = ''
#        for pos in self.board.reshape(BOARD_COLS*BOARD_ROWS):
#            if pos == -1:
#                board_str += '2'
#            else:
#                board_str += str(int(pos))
#       self.boardHash = board_str
        self.boardHash = str(self.board.reshape(BOARD_COLS*BOARD_ROWS))
        return self.boardHash
    
    def winner(self):
        # row
        for i in range(BOARD_ROWS):
            if sum(self.board[i, :]) == 3:
                self.isEnd = True
                return 1
            if sum(self.board[i, :]) == -3:
                self.isEnd = True
                return -1
        # col
        for i in range(BOARD_COLS):
            if sum(self.board[:, i]) == 3:
                self.isEnd = True
                return 1
            if sum(self.board[:, i]) == -3:
                self.isEnd = True
                return -1
        # diagonal
        diag_sum1 = sum([self.board[i, i] for i in range(BOARD_COLS)])
        diag_sum2 = sum([self.board[i, BOARD_COLS-i-1] for i in range(BOARD_COLS)])
        diag_sum = max(diag_sum1, diag_sum2)
        if diag_sum == 3:
            self.isEnd = True
            return 1
        if diag_sum == -3:
            self.isEnd = True
            return -1
        
        # tie
        # no available positions
        if len(self.availablePositions()) == 0 and self.collapsable == 0:

            self.isEnd = True
            return 0
        # not end
        self.isEnd = False
        return None
    
    def collapsable(self):
        for i in range(BOARD_ROWS):
            for j in range(BOARD_COLS):
                if self.board[i, j] == 0.5 or self.board[i, j] == -0.5:
                    return 1
        return 0
    
    def collapse(self):
        index1 =np.argwhere(self.board == 0.5)
        index2 =np.argwhere(self.board == -0.5)
        seed1 = random.sample(range(0,len(index1)),int((len(index1)/2)))
        seed2 = random.sample(range(0,len(index2)),int((len(index2)/2)))

        for i1, j1 in enumerate(seed1):
            self.board[index1[j1][0], index1[j1][1]] = 1
        for i2, j2 in enumerate(seed2):
            self.board[index2[j2][0], index2[j2][1]] = -1
        for m in range(BOARD_ROWS):
            for n in range(BOARD_COLS):
                if self.board[m, n] != 1 and self.board[m,n] != -1:
                    self.board[m,n] = 0
      
        return self.board
    
    def availablePositions(self):
        positions = []
        for i in range(BOARD_ROWS):
            for j in range(BOARD_COLS):
                if self.board[i, j] == 0:
                    positions.append((i, j))  # need to be tuple
        return positions
    
    def updateState(self, position1, position2):
        self.board[position1] += self.playerSymbol * 0.5 
        self.board[position2] += self.playerSymbol * 0.5
        if len(self.availablePositions()) == 0 and self.collapsable == 1:
            self.board = self.collapse()
        # switch to another player
        self.playerSymbol = -1 if self.playerSymbol == 1 else 1

    
    # only when game ends
    def giveReward(self):
        result = self.winner()
        # backpropagate reward
        if result == 1:
            self.p1.feedReward(1)
            self.p2.feedReward(0)
        elif result == -1:
            self.p1.feedReward(0)
            self.p2.feedReward(1)
        else:
            self.p1.feedReward(0.1)
            self.p2.feedReward(0.5)
    
    # board reset
    def reset(self):
        self.board = np.zeros((BOARD_ROWS, BOARD_COLS))
        self.boardHash = None
        self.isEnd = False
        self.playerSymbol = 1
    
    def play(self, rounds=100):
        for i in range(rounds):
#             if i%1000 == 0:
#                 print("Rounds {}".format(i))
            while not self.isEnd:
                # Player 1
                positions = self.availablePositions()
                p1_action1,p1_action2 = self.p1.chooseAction(positions, self.board, self.playerSymbol)
                # take action and upate board state
                self.updateState(p1_action1, p1_action2)

                board_hash = self.getHash()
                self.p1.addState(board_hash)
                # check board status if it is end

                win = self.winner()
                if win is not None:
                    # self.showBoard()
                    # ended with p1 either win or draw
                    self.results.append(win)
                    self.giveReward()
                    self.p1.reset()
                    self.p2.reset()
                    self.reset()
                    break

                else:
                    # Player 2
                    positions = self.availablePositions()
                    p2_action1,p2_action2 = self.p2.chooseAction(positions, self.board, self.playerSymbol)
                    self.updateState(p2_action1, p2_action2)
                    board_hash = self.getHash()
                    self.p2.addState(board_hash)
                    
                    win = self.winner()
                    if win is not None:
                        # self.showBoard()
                        # ended with p2 either win or draw
                        self.results.append(win)
                        self.giveReward()
                        self.p1.reset()
                        self.p2.reset()
                        self.reset()
                        break
    
    # play with human
    def play2(self):
        while not self.isEnd:
            # Player 1
            positions = self.availablePositions()
            p1_action1,p1_action2 = self.p1.chooseAction(positions, self.board, self.playerSymbol)
            # take action and upate board state
            self.updateState(p1_action1,p1_action2)
            self.showBoard()
            # check board status if it is end
            win = self.winner()
            if win is not None:
                if win == 1:
                    print(self.p1.name, "wins!")
                else:
                    print("tie!")
                self.reset()
                break

            else:
                # Player 2
                positions = self.availablePositions()
                p2_action1,p2_action2 = self.p2.chooseAction(positions)

                self.updateState(p2_action1,p2_action2)
                self.showBoard()
                win = self.winner()
                if win is not None:
                    if win == -1:
                        print(self.p2.name, "wins!")
                    else:
                        print("tie!")
                    self.reset()
                    break

    def showBoard(self):
        # p1: x  p2: o
        for i in range(0, BOARD_ROWS):
            print('-------------')
            out = '| '
            for j in range(0, BOARD_COLS):
                if self.board[i, j] == 1:
                    token = 'X'
                if self.board[i, j] == -1:
                    token = 'O'
                if self.board[i, j] == 0.5:
                    token = 'x'
                if self.board[i, j] == -0.5:
                    token = 'o'
                if self.board[i, j] == 0:
                    token = ' '
                out += token + ' | '
            print(out)
        print('-------------')    

In [None]:
class Player:
    def __init__(self, name, exp_rate=0.3, player_symbol=1, update_method='sarsa'):
        self.name = name
        self.states = []  # record all positions taken
        self.lr = 0.2
        self.exp_rate = exp_rate
        self.decay_gamma = 0.9
        self.states_value = {}  # state -> value
        self.player_symbol = player_symbol
        self.update_method = update_method
    
#     def getHash(self, board):
#         boardHash = str(board.reshape(BOARD_COLS*BOARD_ROWS))
#         return boardHash
    
        # get unique hash of current board state
    def getHash(self, board):
#         print('self', board)
        board_str = ''
        for pos in board.reshape(BOARD_COLS*BOARD_ROWS):
            if pos == -1:
                board_str += '2'
            else:
                board_str += str(int(pos))
        self.boardHash = board_str
#         self.boardHash = str(self.board.reshape(BOARD_COLS*BOARD_ROWS))
#         print(self.boardHash)
        return self.boardHash
    
    def chooseAction(self, positions, current_board, symbol):
        # take random action
        idx1 = np.random.choice(len(positions))
        action1 = positions[idx1]
        idx2 = np.random.choice(len(positions))
        action2 = positions[idx2]
        
        if np.random.uniform(0, 1) > self.exp_rate:
            value_max = -999
            for p1 in positions:
              for p2 in positions:
                next_board = current_board.copy()
                next_board[p1] += symbol * 0.5
                next_board[p2] += symbol * 0.5
                next_boardHash = self.getHash(next_board)
                value = 0 if self.states_value.get(next_boardHash) is None else self.states_value.get(next_boardHash)
                # print("value", value)
                if value >= value_max:
                    value_max = value
                    action1 = p1
                    action2 = p2

        # print("{} takes action {}".format(self.name, action))
        return action1, action2
    
    # append a hash state
    def addState(self, state):
        self.states.append(state)
        
    # Find the distance between states
    def is_next_state(self, old_state, new_state):
#         print('Compare', old_state, new_state)
        diff = []
        for pos1, pos2 in zip(old_state, new_state):
            if pos1 == pos2:
                continue
            if pos1 == '0' and pos2 != '0':
                diff.append(pos2)
            else:
                return False
        print('diff', diff)
        return sorted(diff) == ['1','2']
    
    # at the end of game, backpropagate and update states value
    def feedReward(self, reward):
        if self.update_method == 'sarsa':
            for st in reversed(self.states):
                if self.states_value.get(st) is None:
                    self.states_value[st] = 0
                self.states_value[st] += self.lr*(self.decay_gamma*reward - self.states_value[st])
                reward = self.states_value[st]
                
        if self.update_method == 'expected_sarsa':
#             print('ss', self.states, 'reward', reward)
            for st in reversed(self.states):
                if self.states_value.get(st) is None:
                    self.states_value[st] = 0
                possible_states = []
                if reward is None:
                    for next_st in self.states_value:
                        if self.is_next_state(st, next_st):
                            possible_states.append((next_st, self.states_value[next_st]))
                    reward = max([_[1] for _ in possible_states])
#                     print(possible_states, reward)
                self.states_value[st] += self.lr*(self.decay_gamma*reward - self.states_value[st])
#                 print(self.states_value)
                reward = None
                
#             reward = self.states_value[st]
            
        # at the end of game, backpropagate and update states value
#     def feedReward(self, reward):
# #         print("Feeding reward", self.player_symbol)
# #         print(self.states)
#         for st in reversed(self.states):
# #             print(st)
#             if self.states_value.get(st) is None:
#                 self.states_value[st] = 0
#             possible_states = []
#             if reward is None:
#                 for next_st in self.states_value:
#                     if self.find_dist(st, next_st) == 1:
#                         possible_states.append(next_st, self.states_value[st])
#                 reward = max([_[1] for _ in possible_states])
#                 reward = None
# #             print(reward)
#             self.states_value[st] += self.lr*(self.decay_gamma*reward - self.states_value[st])
# #             reward = self.states_value[st]
# #         print(possible_states, reward)
# #         print(self.states_value)
            
    def reset(self):
        self.states = []
        
    def savePolicy(self):
        fw = open('policy_' + str(self.name), 'wb')
        pickle.dump(self.states_value, fw)
        fw.close()

    def loadPolicy(self, file):
        fr = open(file,'rb')
        self.states_value = pickle.load(fr)
        fr.close()

In [None]:
p1 = Player("p1", update_method='sarsa')
p2 = RandomPlayer("p2")
st = State(p1, p2)
st.play(1)

ValueError: ignored

In [None]:
p1 = Player("p1", update_method='sarsa')
# p1 = Player("p1")
p2 = Player("p2", update_method='sarsa')
# p1 = RandomPlayer("p1")
# p2 = RandomPlayer("p2")

st = State(p1, p2)
# print("training...")
# st.play(1000)

for i in range(10):
    # Train
    st.results = []
    p1.exp_rate = 0.3
    p2.exp_rate = 0.3
    st.play(2000)
    print('Train', Counter(st.results)[1] / 2000 * 100, '%, Tie', Counter(st.results)[0] / 2000 * 100, '%')
    
    # Eval
    st.results = []
    p1.exp_rate = 0
    p2.exp_rate = 0
    st.play(200)
#     print(Counter(st.results))
#     print(Counter(st.results)[1])
#     Counter(st.results)[0]
    print('Eval P1 win', Counter(st.results)[1] / 200 * 100, '%, Tie', Counter(st.results)[0] / 200 * 100, '%')
    
    
    

ValueError: ignored

In [None]:
class HumanPlayer:
    def __init__(self, name):
        self.name = name 
    
    def chooseAction(self, positions):
        while True:
            row1 = int(input("Input your action row:"))
            col1 = int(input("Input your action col:"))
            action1 = (row1, col1)
            row2 = int(input("Input your action row:"))
            col2 = int(input("Input your action col:"))
            action2 = (row2, col2)
            if action1 in positions and action2 in posotions:
                return action1,action2

            
    
    # append a hash state
    def addState(self, state):
        pass
    
    # at the end of game, backpropagate and update states value
    def feedReward(self, reward):
        pass
            
    def reset(self):
        pass

In [None]:
class RandomPlayer:
    def __init__(self, name):
        self.name = name 
    
    def chooseAction(self,  positions, current_board, symbol):
    #    print(positions)
        return positions[np.random.choice(len(positions))], positions[np.random.choice(len(positions))]
    
    # append a hash state
    def addState(self, state):
        pass
    
    # at the end of game, backpropagate and update states value
    def feedReward(self, reward):
        pass
            
    def reset(self):
        pass

### Training

In [None]:
p1 = Player("p1", player_symbol=1)
p2 = Player("p2", player_symbol=-1)

st = State(p1, p2)
print("training...")
st.play(1000)

training...


AttributeError: ignored

In [None]:
# sorted(p1.states_value.items(), key=lambda x:x[1], reverse=True)

### Train and Eval

In [None]:
p1 = Player("p1", update_method='expected_sarsa')
# p2 = Player("p2")
# p1 = RandomPlayer("p1")
p2 = RandomPlayer("p2")

st = State(p1, p2)
# print("training...")
# st.play(1000)

for i in range(10):
    # Train
    st.results = []
    p1.exp_rate = 0.3
    p2.exp_rate = 0.3
    st.play(200)
    print('Train', Counter(st.results))
    
    # Eval
    st.results = []
    p1.exp_rate = 0
    p2.exp_rate = 0
    st.play(200)
#     print(Counter(st.results))
#     print(Counter(st.results)[1])
#     Counter(st.results)[0]
    print('Eval P1 win', Counter(st.results)[1] / 200 * 100, '%, Tie', Counter(st.results)[0] / 200 * 100, '%')
    
    
    

ValueError: max() arg is an empty sequence

In [None]:
p1.savePolicy()
p2.savePolicy()

AttributeError: 'RandomPlayer' object has no attribute 'savePolicy'

In [None]:
p1.loadPolicy("policy_p1")

### Human vs Computer

In [None]:
p1 = Player("computer", exp_rate=0)
p1.loadPolicy("policy_p1")

p2 = HumanPlayer("human")

st = State(p1, p2)
st.play2()

-------------
|   | x |   | 
-------------
|   |   |   | 
-------------
|   |   |   | 
-------------
Input your action row:2
Input your action col:1
-------------
|   | x |   | 
-------------
|   |   |   | 
-------------
|   | o |   | 
-------------
-------------
|   | x |   | 
-------------
|   |   |   | 
-------------
| x | o |   | 
-------------
Input your action row:3
Input your action col:1
Input your action row:2
Input your action col:2
-------------
|   | x |   | 
-------------
|   |   |   | 
-------------
| x | o | o | 
-------------
-------------
|   | x | x | 
-------------
|   |   |   | 
-------------
| x | o | o | 
-------------
Input your action row:1
Input your action col:1
-------------
|   | x | x | 
-------------
|   | o |   | 
-------------
| x | o | o | 
-------------
-------------
| x | x | x | 
-------------
|   | o |   | 
-------------
| x | o | o | 
-------------
computer wins!


In [None]:
p1 = Player("computer", exp_rate=0)
p1.loadPolicy("policy_p1")

p2 = HumanPlayer("human")

st = State(p1, p2)
st.play2()

-------------
|   | x |   | 
-------------
|   |   |   | 
-------------
|   |   |   | 
-------------


KeyboardInterrupt: Interrupted by user