In [17]:
import torch 
from tictactoe import *
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable

import copy 
from pprint import pprint
from collections import deque
from time import time 
from multiprocessing import Pool 

import random 
import torch.optim as optim
from matplotlib import pyplot as plt

# Variants of Tic-Tac-Toe (https://en.wikipedia.org/wiki/Tic-tac-toe_variants)

$m,n,k$ game = play on $m$ by $n$ board to try to get $k$ in a row.

We have that TicTacToe extends Game, meaning it must have a checkGameOver function. We modify it so that in the initializer it takes in an additional parameter, 'gameOverChecker', a function that takes a board and logger and returns $0$ if nobody won, $1$ if Player one won, or $2$ if Player $2$ won. 

This allows us to easily implement any variants of the standard 3x3 game that only modify that winning conditions. For example, Misere Tic-tac-toe, or 'inverse' Tic-Tac-Toe, is the game where Player 1 wins if Player 2 gets 3 in a row (we make wrappers for each of the game versions).

Now we would like to support variants of the game where instead of alternating turns, there is some different rule for check whose turn it is. For this, we will add another parameter to TicTacToe constructor, turnChooser. This is passed into the Game superclass. Note that $turnChooser$ is a function that takes in the current player and gives the next player. This lets us support $random$ $turn$ $tic$-$tac$-$toe$.

Now we would like to support larger boards. To do this, we pass in an additional parameter $dimension$. This then creates a board of size $dimension$ by $dimension$. Although this allows us to define boards of arbitrary size, we create $FourByFourTicTacToe$ and $FiveByFiveTicTacToe$. The most logical win condition for an $n$ by $n$ board is $n$ in a row. For this reason we modify the $checkGameOver$ function to take in $n$ - the number of pieces in a row required for a win. Note that other variants are possible - for example, getting a diamond in $4$ by $4$ could also be considered a win. We can then create an $n$ in a row win condition function by using partial functions with $checkGameOver$.

Next, we would like to support games in 3 dimensions. We do this by adding yet another paremeter to the initialization - $threeDims$. In order to make it easier to handle 2D vs 3D games, we will always assume the board is 3D - n by n by n, but for 2D we can just get the n by n board by calling board[0].

We would like easily define combinations of these variants to create custom games. In order to do this, we define a TicTacToeConfig class. Now, TicTacToe simply takes a TicTacToeConfig. This config has all of the default values so we can set any combinations of the ones we want. 


## Visualizing the Board

Looking at the board in the command line is annoying, and we would like some way of seeing what the algorithm is actually doing in a way that is easier to interpret. We will use PyGame to do this. First, we create a `display` method in TicTacToe.

The correct way to do this would be to have some event that is triggered when we make a move to update the display. Due to lazyness, we will just spawn a different thread. This then renders the grid 60 times per second and colors it according to the current board state. 

http://programarcadegames.com/index.php?lang=en&chapter=array_backed_grids
https://www.pygame.org/docs/tut/ChimpLineByLine.html

### Learning the Value Function with a Neural Network

First, we define the neural network. We use tanh to bound the result between -1 and 1 (since this is the bound of our value function). We use standard activation functions, testing first ReLu and then Leaky ReLu.

In [18]:
# Regular Feed forward network with only dense layers 
class DenseNetRELU(nn.Module):
    def __init__(self, dimension):
        super(DenseNetRELU, self).__init__()
        # 9 input features (each of the positions in the board), with a bias
        # 1 hidden layer with 9 inputs, 1 output (the value of the state)
        numStates = dimension * dimension
        self.first = nn.Linear(numStates, numStates, True)
        self.hiddenOne = nn.Linear(numStates, numStates, True)
        self.hiddenTwo = nn.Linear(numStates, 1, True)
        
    def forward(self, x):
        x = F.relu(self.first(x)) 
        x = F.relu(self.hiddenOne(x)) 
        x = F.tanh(self.hiddenTwo(x)) 
        return x
    
def weights_init_uniform(m):
    classname = m.__class__.__name__
    if classname.find('Linear') != -1:
        m.weight.data.uniform_(0.0, 1.0)
        m.bias.data.fill_(0)
    
class DenseNetLeakyRELU(nn.Module):
    def __init__(self, dimension):
        super(DenseNetLeakyRELU, self).__init__()
        
        numStates = dimension * dimension
        self.first = nn.Linear(numStates, numStates, True)
        self.hiddenOne = nn.Linear(numStates, numStates, True)
        self.hiddenTwo = nn.Linear(numStates, 1, True)
        
    def forward(self, x):
        x = F.leaky_relu(self.first(x)) 
        x = F.leaky_relu(self.hiddenOne(x)) 
        x = F.tanh(self.hiddenTwo(x)) 
        return x
    
# Convolutional neural network (for 2D)
class ConvNet(nn.Module):
    def __init__(self, dimension): 
        super(ConvNet, self).__init__()
        
        
        # input is a single channel, we perform 5 convolutions on each entry 
        # with kernel size of 2 
        if dimension == 3:
            self.first = nn.Conv2d(1, 5, 2, padding=0) # This gives us a 2x2
            self.flattenLayer = nn.Flatten()
            self.hidden = nn.Linear(20, 1, True)
        else: 
            print("NOT YET IMPLEMENTED")
            self.first = nn.Conv2d(1, 5, 3, padding=1)

    def forward(self, x):
        x = F.relu(self.first(x))
        x = self.flattenLayer(x)
        x = F.tanh(self.hidden(x)) # Make sure output is between -1 and 1 
        return x 
    
class ConvNetLeaky(nn.Module):
    def __init__(self, dimension):
        super(ConvNetLeaky, self).__init__()
        
        if dimension == 3:
            self.first = nn.Conv2d(1, 5, 2, padding=0) # This gives us a 2x2
            self.flattenLayer = nn.Flatten()
            self.hidden = nn.Linear(20, 1, True)
        else: 
            print("NOT YET IMPLEMENTED")
            self.first = nn.Conv2d(1, 5, 3, padding=1)
        
    def forward(self, x):
        x = F.leaky_relu(self.first(x))
        x = self.flattenLayer(x)
        x = F.tanh(self.hidden(x)) # Make sure output is between -1 and 1
        return x 
        

nets = dict()

for dim in range(3, 11): 
    netOne   = DenseNetRELU(dim)
    netTwo   = DenseNetLeakyRELU(dim)
    netThree = ConvNet(dim)
    netFour  = ConvNetLeaky(dim)
    
    nets[dim] = [netOne, netTwo, netThree, netFour]

print(nets[10])

NOT YET IMPLEMENTED
NOT YET IMPLEMENTED
NOT YET IMPLEMENTED
NOT YET IMPLEMENTED
NOT YET IMPLEMENTED
NOT YET IMPLEMENTED
NOT YET IMPLEMENTED
NOT YET IMPLEMENTED
NOT YET IMPLEMENTED
NOT YET IMPLEMENTED
NOT YET IMPLEMENTED
NOT YET IMPLEMENTED
NOT YET IMPLEMENTED
NOT YET IMPLEMENTED
[DenseNetRELU(
  (first): Linear(in_features=100, out_features=100, bias=True)
  (hiddenOne): Linear(in_features=100, out_features=100, bias=True)
  (hiddenTwo): Linear(in_features=100, out_features=1, bias=True)
), DenseNetLeakyRELU(
  (first): Linear(in_features=100, out_features=100, bias=True)
  (hiddenOne): Linear(in_features=100, out_features=100, bias=True)
  (hiddenTwo): Linear(in_features=100, out_features=1, bias=True)
), ConvNet(
  (first): Conv2d(1, 5, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
), ConvNetLeaky(
  (first): Conv2d(1, 5, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
)]


We first define our game tree by defining a node class.

In [19]:
class Node:
    ''' 
    Node is a single board state in our game tree.
    '''
    def __init__(self, board): 
        self.children = []
        self.parent = None
        self.board = board
        self.currTurn = 0

We then make a generic function which will take some starting node and a game constructor (which must create an object that extends the Game abstractclass) and fills out the game tree. It does this more efficiently by using a thread pool. Note that this also returns a list of all the nodes in the tree, which allows us to choose a random node much more easily.

In [47]:
def getGameTree(game, root, num_nodes): 
    p = Pool(4)
    return findAll(game, root, num_nodes)
    
def findAll(game, startingNode, num_nodes):
    '''
    findAll performs BFS from the startingNode. It uses a 
    '''
    visited = [0] * num_nodes
    index = 0 

    # Initialize a queue with the starting node. 
    unvisited = deque()
    unvisited.append(startingNode)
    
    # Continue until there are no more unvisited nodes.
    while len(unvisited) > 0:
        if index % 5000 == 0:
            print(index, len(unvisited))
        # Store the new visited nodes          
        currNode = unvisited.popleft()
        visited[index] = currNode
        index += 1
        
        # If game is over, do not add the children
        game.board = currNode.board
        res = game.checkGameOver()
        if res != 0: 
            continue
            
        # Find all of the children 
        for action in game.getAllActions():
            child = Node(copy.deepcopy(currNode.board))
            child.currTurn = game.turnChooser(currNode.currTurn)
            pieceToPlay = 1 if currNode.currTurn == 0 else 2
            child.board[action[0]][action[1]][action[2]] = pieceToPlay
            child.parent = currNode
            
            unvisited.append(child)
            currNode.children.append(child)
    
    return visited

Now we generate the game tree for all game variants so we have data to train the neural network. Note that for many games the game tree is small enough to store in memory. There are 9 places to place the first piece, then 8 to place the second, etc.., so there are $9! = 362880$ states, many of which are not reachable because someone would win. 

In [48]:
root = Node([[[0,0,0],[0,0,0],[0,0,0]]])
NUM_NODES = 1000000
game = StandardTicTacToe(None, None, None)

vertices = findAll(game, root, NUM_NODES)

0 1
5000 18910
10000 32510
15000 45990
20000 58526
25000 67392
30000 76372
35000 85376
40000 94254
45000 103240
50000 112202
55000 121119
60000 130090
65000 139004
70000 147942
75000 156896
80000 159952
85000 162668
90000 165360
95000 168206
100000 170946
105000 173480
110000 176312
115000 179304
120000 181878
125000 184384
130000 187504
135000 190046
140000 193158
145000 195620
150000 198312
155000 201310
160000 204126
165000 206434
170000 209454
175000 212266
180000 215102
185000 217690
190000 220430
195000 223132
200000 226066
205000 228560
210000 231324
215000 234390
220000 237014
225000 239398
230000 242588
235000 243152
240000 242330
245000 241412
250000 240272
255000 239326
260000 238279
265000 237481
270000 236437
275000 235557
280000 234400
285000 233445
290000 232616
295000 231752
300000 230555
305000 229645
310000 228504
315000 227830
320000 226750
325000 225701
330000 224936
335000 224094
340000 222911
345000 221937
350000 220888
355000 220108
360000 219059
365000 218172
37

## Clean up the data (if we allocated too much space)

In [49]:
for v in range(len(vertices)):
    if vertices[v] == 0:
        vertices = vertices[0:v]
        break 
        
for v in vertices: 
    v.board_torch = torch.FloatTensor(v.board) 

1000000


## Split into Training Set, Validation Set, and Test Set 

In [56]:
random.shuffle(vertices)

training_set_size = int(size*0.8)
training_set = vertices[0:training_set_size]

test_set_end_index = 0.9size
test_set = vertices[training_set_size:]



537876


First we implement minimax so that we can evaluate a state. We define player 2 winning as -1, and player 1 winning as 1. If currTurn is 0, it's player 1 to move.

In [8]:
memo = dict() # maps each node to a value

def minimax(node): 
    if node in memo: 
        return memo[node] 
    
    game.board = node.board 
    res = game.getAllActions()
    res2 = game.checkGameOver()
    if res2 == 1:
        memo[node] = 1
        return 1 
    elif res2 == 2: 
        memo[node] = -1 
        return -1
    elif len(res) == 0: 
        memo[node] = 0 
        return 0
        
    children = node.children
    if node.currTurn == 0: 
        val = -float('inf')
    else: 
        val = float('inf')
        
    for child in children: 
        child_val = minimax(child)
        if node.currTurn == 0: 
            val = max(val, child_val)
        else: 
            val = min(val, child_val)

    memo[node] = val
    return val

Next we can train it

## = Training on the GPU = (https://medium.com/dsnet/training-deep-neural-networks-on-a-gpu-with-pytorch-11079d89805)

In [10]:
def get_default_device():
    if torch.cuda.is_available():
        return torch.device('cuda')
    else:
        return torch.device('cpu')
    
device = get_default_device()
print(device)

device = torch.device('cpu')

cuda


In [12]:
def to_device(data, device):
    if isinstance(data, (list, tuple)):
        return [to_device(x, device) for x in data]
    return data.to(device, non_blocking=True)

class DeviceDataLoader(): 
    def __init__(self, dl, device):
        self.dl = dl
        self.device = device
        
    def __iter__(self):
        for b in self.dl:
            yield to_device(b, self.device)
        
    def __len__(self):
        return len(self.dl)

In [14]:
import multiprocessing as mp

st = time() 

trainCount = 1000000

def train(net, trainCount, batchCount, num_epochs, start_lr, flatten=True):
    # Move to GPU if necessary 
    to_device(net, device)
    
    start  = start_lr / num_epochs
    for lr_var in reverse(arange(start, start_lr + start, start)):
    
        criterion = nn.MSELoss() # Using mean square error 
        optimizer = optim.SGD(net.parameters(), lr_var) #  create the optimizer 
        batchSize = trainCount // batchCount
        L2_loss = [0] * trainCount 

        for batch in range(batchCount):   

            # Get training data for one batch 
            nodes    = [0] * batchSize 
            expected = [0] * batchSize 
            outputs  = [0] * batchSize

            nodes = random.sample(vertices, batchSize)
            for i in range(batchSize): 

                # Compute the expected result.
                if nodes[i] not in memo: 
                    minimax(nodes[i])
                expected[i] = torch.FloatTensor([memo[nodes[i]]])

                # Put the board in a format the neural net can use 
                if flatten:
                    nodes[i] = nodes[i].board_torch.reshape(-1)
                else:
                    nodes[i] = nodes[i].board_torch.reshape(1, 1, 3, 3)      

            # Move training data to GPU if necessary 
            train_dl = DeviceDataLoader(nodes, device) 

            # Compute the actual and expected results 
            i = 0
            for data in train_dl:
                # Print output to user 
    #             if poo % (batchSize // 2) == 0:
    #                 print(f"Batch {batch}/{batchCount}, {poo}/{batchSize} - {poo/batchSize} - {time() - st}s", end="\r")

                outputs[i] = net(data)
                i += 1

    #       Update the neural network
            for i in range(batchSize):
                optimizer.zero_grad()
                out = outputs[i].to(torch.device('cpu'))
                loss = criterion(out, expected[i])
                loss.backward()
                optimizer.step() 
                L2_loss[i + batch * batchSize] = loss.item()

            if batch % 2 == 0:
                print(f"Batch {batch}/{batchCount} - {batchSize} - {time() - st}s - {len(memo.keys())}", end="\r")
        return L2_loss

results = dict() 
for dim in range(3, 4):
    dimNets = nets[dim]
    for netIndex in [0, 1, 2, 3]:
        if netIndex == 2 or netIndex == 3:
            r = train(dimNets[netIndex], trainCount, 11000, False)
        else:
            r = train(dimNets[netIndex], trainCount, 11000, True)
        results[(dim, netIndex)] = r
        print("")
print("Done!")

ValueError: Sample larger than population or is negative

# Visualizing the Results

In [16]:
def do_plot(L, chunkSize): 
    L2 = []
    for i in range(0, len(L), chunkSize):
        if len(L) - i < chunkSize:
            amount = len(L) - i
        else: 
            amount = chunkSize 
        L2.append(sum(L[i:i+chunkSize])/amount)

    plt.plot(L2)
    plt.show()

CHUNK_SIZE = 600
do_plot(result[(3,0)], CHUNK_SIZE)
do_plot(result[(3,1)], CHUNK_SIZE)
do_plot(result[(3,2)], CHUNK_SIZE)
do_plot(result[(3,3)], CHUNK_SIZE)


NameError: name 'result' is not defined

Finally we pass this network into an agent

In [9]:
def netValueFunction(board): 
    tBoard = torch.FloatTensor(board).reshape(-1)
    print(tBoard) 
    out = net(tBoard)
    for i, v in enumerate(out): 
        if v == out.max():
            return i
        
    return None
            
nAgent = ValueAgent(0, netValueFunction, 3)

NameError: name 'ValueAgent' is not defined

### Comparisons of Greedy Policy to Baseline

Here, we compare the greedy policy from the learned value function to an agent that plays random policies and a minimax agent (which plays perfectly).

In [10]:
game = StandardTicTacToe(nAgent, DumbAgent(1), Log(2))

results = []
for i in range(0, 10): 
    foo = game.play()
    if foo == 0: 
        print("Tie")
        results.append(-1)
    else: 
        results.append(foo)
    game.reset()
print(results)

NameError: name 'StandardTicTacToe' is not defined