# Tic Tac Toe
With Neural Network for State Value estimation

### Outline of approach:
1. Pretrain: Play atleast 100 games and get the training data for state and values as list.  \
    Train a 2 value predictor network on this data - one for P1 and another for P2  
2. RL Train: In a loop \
    a) play n games using the trained networks and epsilon greedy approach \
    b) record the outcomes and compute state values \
    c) use this data to retrain the two networks \
3. Train till convergence

Reference url for Tic Tac Toe environment: https://github.com/MJeremy2017/reinforcement-learning-implementation/blob/master/TicTacToe/ticTacToe.py 

### Basic package imports

In [1]:
import numpy as np
import pickle
import pandas as pd
import matplotlib.pyplot as plt
import re
import keras
from keras import layers
from keras.layers import Dense, Activation
BOARD_ROWS = 3
BOARD_COLS = 3

2024-01-28 13:45:58.301394: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcudart.so.11.0'; dlerror: libcudart.so.11.0: cannot open shared object file: No such file or directory
2024-01-28 13:45:58.301411: I tensorflow/stream_executor/cuda/cudart_stub.cc:29] Ignore above cudart dlerror if you do not have a GPU set up on your machine.


### Classes and Keras Models

In [2]:
class State:
    '''
    Definition of a Tic-Tac-Toe board
    '''
    def __init__(self, p1, p2):
        self.board = np.zeros((BOARD_ROWS, BOARD_COLS))
        self.p1 = p1
        self.p2 = p2
        self.isEnd = False
        self.boardHash = None
        # init p1 plays first
        self.playerSymbol = 1

    # get unique hash of current board state
    def getHash(self):
        self.boardHash = str(self.board.reshape(BOARD_COLS * BOARD_ROWS))
        return self.boardHash

    def winner(self):
        # row
        for i in range(BOARD_ROWS):
            if sum(self.board[i, :]) == 3:
                self.isEnd = True
                return 1
            if sum(self.board[i, :]) == -3:
                self.isEnd = True
                return -1
        # col
        for i in range(BOARD_COLS):
            if sum(self.board[:, i]) == 3:
                self.isEnd = True
                return 1
            if sum(self.board[:, i]) == -3:
                self.isEnd = True
                return -1
        # diagonal
        diag_sum1 = sum([self.board[i, i] for i in range(BOARD_COLS)])
        diag_sum2 = sum([self.board[i, BOARD_COLS - i - 1] for i in range(BOARD_COLS)])
        diag_sum = max(abs(diag_sum1), abs(diag_sum2))
        if diag_sum == 3:
            self.isEnd = True
            if diag_sum1 == 3 or diag_sum2 == 3:
                return 1
            else:
                return -1

        # tie
        # no available positions
        if len(self.availablePositions()) == 0:
            self.isEnd = True
            return 0
        # not end
        self.isEnd = False
        return None

    def availablePositions(self):
        positions = []
        for i in range(BOARD_ROWS):
            for j in range(BOARD_COLS):
                if self.board[i, j] == 0:
                    positions.append((i, j))  # need to be tuple
        return positions

    def updateState(self, position):
        self.board[position] = self.playerSymbol
        # switch to another player
        self.playerSymbol = -1 if self.playerSymbol == 1 else 1

    # only when game ends
    def giveReward(self):
        result = self.winner()
        # backpropagate reward
        if result == 1:
            self.p1.feedReward(1)
            self.p2.feedReward(-1)
        elif result == -1:
            self.p1.feedReward(-1)
            self.p2.feedReward(1)
        else:
            self.p1.feedReward(0)
            self.p2.feedReward(0)

    # board reset
    def reset(self):
        self.board = np.zeros((BOARD_ROWS, BOARD_COLS))
        self.boardHash = None
        self.isEnd = False
        self.playerSymbol = 1

    def play(self, rounds=100):
        winlist = []
        for i in range(rounds):
            if i % 1000 == 0:
                print("Rounds {}".format(i))
            if i % 100 == 0:
                self.p1.setEps(rounds, i)
                self.p2.setEps(rounds, i)
            while not self.isEnd:
                # Player 1
                positions = self.availablePositions()
                p1_action = self.p1.chooseAction(positions, self.board, self.playerSymbol)
                # take action and upate board state
                self.updateState(p1_action)
                board_hash = self.getHash()
                self.p1.addState(board_hash)
                # check board status if it is end

                win = self.winner()
                if win is not None:
                    # self.showBoard()
                    # ended with p1 either win or draw
                    self.giveReward()
                    self.p1.reset()
                    self.p2.reset()
                    self.reset()
                    break

                else:
                    # Player 2
                    positions = self.availablePositions()
                    p2_action = self.p2.chooseAction(positions, self.board, self.playerSymbol)
                    self.updateState(p2_action)
                    board_hash = self.getHash()
                    self.p2.addState(board_hash)

                    win = self.winner()
                    if win is not None:
                        # self.showBoard()
                        # ended with p2 either win or draw
                        self.giveReward()
                        self.p1.reset()
                        self.p2.reset()
                        self.reset()
                        break
            winlist.append(win)
        return (winlist)
    # play with human
    def play2(self):
        while not self.isEnd:
            # Player 1
            positions = self.availablePositions()
            p1_action = self.p1.chooseAction(positions, self.board, self.playerSymbol)
            # take action and upate board state
            self.updateState(p1_action)
            self.showBoard()
            # check board status if it is end
            win = self.winner()
            if win is not None:
                if win == 1:
                    print(self.p1.name, "wins!")
                else:
                    print("tie!")
                self.reset()
                break

            else:
                # Player 2
                positions = self.availablePositions()
                p2_action = self.p2.chooseAction(positions)

                self.updateState(p2_action)
                self.showBoard()
                win = self.winner()
                if win is not None:
                    if win == -1:
                        print(self.p2.name, "wins!")
                    else:
                        print("tie!")
                    self.reset()
                    break
                    
                    
    def NNPlay(self, rounds=500, innerrounds=50):
        '''
        train innerrounds, capture replay buffer and train the s value networks on this data
        after every inner round the replay buffer is emptied and the process repeated again
        '''
        train_batches = int(rounds/innerrounds) + 1
        
        print ('training for {} rounds, with {} training batches and {} inner rounds'.format(rounds, train_batches, innerrounds))
        
        for i in range(train_batches):
            print ('training batch', i)
            self.play(innerrounds)
            # train player p1
            self.p1.sVNNtrain()
            # train player p2
            self.p2.sVNNtrain()
        return ()
    
    def showBoard(self):
        # p1: x  p2: o
        for i in range(0, BOARD_ROWS):
            print('-------------')
            out = '| '
            for j in range(0, BOARD_COLS):
                if self.board[i, j] == 1:
                    token = 'x'
                if self.board[i, j] == -1:
                    token = 'o'
                if self.board[i, j] == 0:
                    token = ' '
                out += token + ' | '
            print(out)
        print('-------------')


In [3]:
class Player:
    '''
    Class for one Tic Tac Toe player
    '''
    def __init__(self, name, eps_decay=False, start_exp_rate=0.3, end_exp_rate=0.05):
        self.name = name
        self.states = []  # record all positions taken
        self.lr = 0.3
        self.exp_rate = start_exp_rate
        
        self.decay_gamma = 0.9
        self.states_value = {}  # state -> value

        self.eps_decay = eps_decay
        self.start_exp_rate = start_exp_rate
        self.end_exp_rate = end_exp_rate
        self.state_value_model = self.sValueNN()
        
    def getHash(self, board):
        boardHash = str(board.reshape(BOARD_COLS * BOARD_ROWS))
        return boardHash
    
    def getSVal(self, board):
        bs = board.reshape(BOARD_COLS * BOARD_ROWS)
        bost = np.reshape(bs, (-1, 9))
        sval = self.state_value_model.predict(bost, verbose=False)[0][0]
        return sval
    
    def sValueNN(self):
        model = keras.models.Sequential()
        model.add(Dense(units=4, input_dim=9, activation='linear'))
        model.add(Dense(1, activation='linear'))
        model.compile(loss='mean_squared_error', optimizer='sgd')
        return (model)
    
    def getbuffer(self):
        data = self.states_value
        ll = []
        for k in data.keys():
            yy = re.findall( r'[-/+]?\d+\.*\d*', k)
            zz = data.get(k)
            yy.append(zz)
            ll.append(yy)
        lldf = pd.DataFrame(ll)
        cols = ['x'+str(i) for i in range(9)]
        cols.append('val')
        lldf.columns = cols
        lldf.to_csv('Buffer.csv', index=False)
        return 

    def sVNNtrain(self, Xin = None, Yin = None):
        # call this function to create buffer
        self.getbuffer()
        # read the buffer
        df = pd.read_csv('Buffer.csv')
        print ('length of buffer is', len(df))
        traincols=['x' + str(i) for i in range(9)]
        testcol = 'val'
        if Xin is None:    
            Xin = df[traincols]
            Yin = df[testcol]
        self.state_value_model.fit(Xin, Yin, epochs=10, verbose=False)
        
        # empty the replay buffer after this 
        self.states_value = {}
        return 

    def loadSVNNmodel(self, path):
        self.state_value_model = keras.models.load_model(path)
    
    def setEps(self, total_games, current_game):
        if (self.eps_decay == False):
            return
        else:
            if int(current_game/100) > 0:   
            #if (np.mod(current_game+1, 100) == 0):
                self.exp_rate = self.start_exp_rate * (1. - current_game/total_games) + self.end_exp_rate * (current_game/total_games)
                if (np.mod(current_game, 1000) == 0):
                    print ('decay rate modified at {} with current value of {}'.format(str(current_game), str(self.exp_rate)))
        return 

    def chooseAction(self, positions, current_board, symbol):
        if np.random.uniform(0, 1) <= self.exp_rate:
            # take random action
            idx = np.random.choice(len(positions))
            action = positions[idx]
        else:
            value_max = -999
            for p in positions:
                next_board = current_board.copy()
                next_board[p] = symbol
                next_boardHash = self.getHash(next_board)
                #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
                # Default method uses a dictionary to get the value of a state
                #value = 0 if self.states_value.get(next_boardHash) is None else self.states_value.get(next_boardHash)
                
                # New method use the keras model to predict the state value 
                value = self.getSVal(next_board)
                #~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
                # print("value", value)
                if value >= value_max:
                    value_max = value
                    action = p
        # print("{} takes action {}".format(self.name, action))
        return action

    # append a hash state
    def addState(self, state):
        self.states.append(state)

    # at the end of game, backpropagate and update states value
    def feedReward(self, reward):
        for st in reversed(self.states):
            if self.states_value.get(st) is None:
                self.states_value[st] = 0
            self.states_value[st] += self.lr * (self.decay_gamma * reward - self.states_value[st])
            reward = self.states_value[st]

    def reset(self):
        self.states = []

    def savePolicy(self):
        fw = open('policy_' + str(self.name), 'wb')
        pickle.dump(self.states_value, fw)
        fw.close()

    def loadPolicy(self, file):
        fr = open(file, 'rb')
        self.states_value = pickle.load(fr)
        fr.close()

In [4]:
class HumanPlayer:
    '''
    Class for a human player
    '''
    def __init__(self, name):
        self.name = name

    def chooseAction(self, positions):
        while True:
            row = int(input("Input your action row:"))
            col = int(input("Input your action col:"))
            action = (row, col)
            if action in positions:
                return action

    # append a hash state
    def addState(self, state):
        pass

    # at the end of game, backpropagate and update states value
    def feedReward(self, reward):
        pass

    def reset(self):
        pass
    ''


def definemodel():
    model = keras.models.Sequential()
    model.add(Dense(units=4, input_dim=9, activation='linear'))
    model.add(Dense(1, activation='linear'))
    model.compile(loss='mean_squared_error', optimizer='sgd')
    return (model)

Define the NN models for P1 and P2, one time. \
They do not change during the course of training, only the weights get updated

In [5]:
p1 = Player("p1", eps_decay = False, start_exp_rate=0.2, end_exp_rate=0.02)
p2 = Player("p2", eps_decay = False, start_exp_rate=0.2, end_exp_rate=0.02)
#p2.setEps(5000, 209)
print (p1.exp_rate, p2.exp_rate)

0.2 0.2


2024-01-28 13:45:59.299386: W tensorflow/stream_executor/platform/default/dso_loader.cc:64] Could not load dynamic library 'libcuda.so.1'; dlerror: libcuda.so.1: cannot open shared object file: No such file or directory
2024-01-28 13:45:59.299411: W tensorflow/stream_executor/cuda/cuda_driver.cc:269] failed call to cuInit: UNKNOWN ERROR (303)
2024-01-28 13:45:59.299427: I tensorflow/stream_executor/cuda/cuda_diagnostics.cc:156] kernel driver does not appear to be running on this host (dsrivallabha-PC): /proc/driver/nvidia/version does not exist
2024-01-28 13:45:59.299881: I tensorflow/core/platform/cpu_feature_guard.cc:151] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 AVX512F FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.


# Examine the defined model 

In [6]:
p1.state_value_model.summary()

Model: "sequential"
_________________________________________________________________
 Layer (type)                Output Shape              Param #   
 dense (Dense)               (None, 4)                 40        
                                                                 
 dense_1 (Dense)             (None, 1)                 5         
                                                                 
Total params: 45
Trainable params: 45
Non-trainable params: 0
_________________________________________________________________


In [7]:
mm = p1.state_value_model
for layer in mm.layers: print(layer.get_config(), layer.get_weights())

{'name': 'dense', 'trainable': True, 'batch_input_shape': (None, 9), 'dtype': 'float32', 'units': 4, 'activation': 'linear', 'use_bias': True, 'kernel_initializer': {'class_name': 'GlorotUniform', 'config': {'seed': None}}, 'bias_initializer': {'class_name': 'Zeros', 'config': {}}, 'kernel_regularizer': None, 'bias_regularizer': None, 'activity_regularizer': None, 'kernel_constraint': None, 'bias_constraint': None} [array([[-0.24497217, -0.43987298, -0.27615482, -0.03567499],
       [ 0.6102681 ,  0.15904182, -0.16795164,  0.3927878 ],
       [ 0.18159759,  0.46008253, -0.59124386, -0.11502939],
       [-0.25175968,  0.40120637,  0.60146165,  0.24036628],
       [ 0.17378628,  0.64320064, -0.64446485,  0.6148833 ],
       [-0.17706686, -0.11982399, -0.02513134, -0.4752825 ],
       [-0.26079535, -0.6009986 ,  0.57542455, -0.56894886],
       [ 0.1557194 ,  0.43464303, -0.33834165, -0.4720738 ],
       [-0.26055256,  0.1727522 , -0.4327246 ,  0.39834523]],
      dtype=float32), array([0

In [8]:
st = State(p1, p2)
st.NNPlay(2000)

training for 2000 rounds, with 41 training batches and 50 inner rounds
training batch 0
Rounds 0
length of buffer is 66
length of buffer is 59
training batch 1
Rounds 0
length of buffer is 68
length of buffer is 64
training batch 2
Rounds 0
length of buffer is 66
length of buffer is 55
training batch 3
Rounds 0
length of buffer is 58
length of buffer is 51
training batch 4
Rounds 0
length of buffer is 60
length of buffer is 54
training batch 5
Rounds 0
length of buffer is 56
length of buffer is 55
training batch 6
Rounds 0
length of buffer is 54
length of buffer is 55
training batch 7
Rounds 0
length of buffer is 65
length of buffer is 68
training batch 8
Rounds 0
length of buffer is 56
length of buffer is 53
training batch 9
Rounds 0
length of buffer is 50
length of buffer is 45
training batch 10
Rounds 0
length of buffer is 67
length of buffer is 66
training batch 11
Rounds 0
length of buffer is 53
length of buffer is 48
training batch 12
Rounds 0
length of buffer is 56
length of buf

()

In [9]:
mm = st.p1.state_value_model
for layer in mm.layers: print(layer.get_config(), layer.get_weights())

{'name': 'dense', 'trainable': True, 'batch_input_shape': (None, 9), 'dtype': 'float32', 'units': 4, 'activation': 'linear', 'use_bias': True, 'kernel_initializer': {'class_name': 'GlorotUniform', 'config': {'seed': None}}, 'bias_initializer': {'class_name': 'Zeros', 'config': {}}, 'kernel_regularizer': None, 'bias_regularizer': None, 'activity_regularizer': None, 'kernel_constraint': None, 'bias_constraint': None} [array([[-0.19710356, -0.44819626, -0.28192848, -0.03402208],
       [ 0.08384394,  0.26053673, -0.14118801,  0.373423  ],
       [ 0.1395128 ,  0.4672206 , -0.579222  , -0.11965451],
       [-0.02389818,  0.3548066 ,  0.5967817 ,  0.24781917],
       [ 0.2193257 ,  0.6259165 , -0.6420938 ,  0.6109241 ],
       [-0.05803855, -0.13624585, -0.02749629, -0.4687885 ],
       [-0.20110391, -0.60669804,  0.56789   , -0.56202483],
       [ 0.11005239,  0.43870124, -0.3401159 , -0.47298747],
       [-0.14242919,  0.14559047, -0.44263825,  0.4016926 ]],
      dtype=float32), array([ 

In [10]:
st.p1.state_value_model.save('p1model.keras')

In [11]:
st.p2.state_value_model.save('p2model.keras')

In [12]:
mm = st.p2.state_value_model
for layer in mm.layers: print(layer.get_config(), layer.get_weights())

{'name': 'dense_2', 'trainable': True, 'batch_input_shape': (None, 9), 'dtype': 'float32', 'units': 4, 'activation': 'linear', 'use_bias': True, 'kernel_initializer': {'class_name': 'GlorotUniform', 'config': {'seed': None}}, 'bias_initializer': {'class_name': 'Zeros', 'config': {}}, 'kernel_regularizer': None, 'bias_regularizer': None, 'activity_regularizer': None, 'kernel_constraint': None, 'bias_constraint': None} [array([[-0.4538742 , -0.17369892,  0.17416154, -0.34442285],
       [ 0.31063217, -0.48849082,  0.21157224,  0.62748605],
       [ 0.5303052 , -0.03440766, -0.44713694,  0.62045246],
       [-0.27354816,  0.11483011, -0.49350196, -0.4947174 ],
       [-0.22272362,  0.6493644 , -0.4952509 , -0.5718568 ],
       [-0.5238724 , -0.44496045,  0.09623165, -0.32418066],
       [ 0.5510279 , -0.5399643 ,  0.02654815,  0.13130866],
       [ 0.67515653, -0.28145   , -0.47561413,  0.30561885],
       [ 0.09916825,  0.17728382, -0.38563403, -0.42024794]],
      dtype=float32), array(

# Check learning by playing against opponent 

In [13]:
p3 = Player("p3", eps_decay = False, start_exp_rate=0, end_exp_rate=0)

In [14]:
p3.loadSVNNmodel('p1model.keras')

In [15]:
p4 = Player("p4", eps_decay = False, start_exp_rate=1, end_exp_rate=0)

In [16]:
human = HumanPlayer('Human')

In [17]:
st = State(p3, p4)

In [18]:
output = st.play()

Rounds 0


In [19]:
np.mean(output)

0.82

In [20]:
print (output.count(1), output.count(-1), output.count(0))

89 7 4


In [23]:
# Check learning by playing two random players against each other
p5 = Player("p5", eps_decay = False, start_exp_rate=1, end_exp_rate=0)
p6 = Player("p6", eps_decay = False, start_exp_rate=1, end_exp_rate=0)

In [24]:
st = State(p5, p6)
output = st.play(100)
print (output.count(1), output.count(-1), output.count(3))

Rounds 0
57 29 0
