In [16]:
### Imports

import math,sys,time
import numpy
import scipy,scipy.optimize,scipy.spatial,scipy.spatial.distance,scipy.stats
import json
import sqlite3
from collections import *
from enum import Enum
import random

from keras.models import Sequential
from keras.layers.core import Dense, Dropout, Activation
from keras.optimizers import SGD
from keras import backend as K

import theano,theano.tensor

In [17]:
### Constants

NUM_PLAYERS=2  # X -> player 0, O -> player 1
SQUARES=9 # 9 Squares
SQUARE_VALUES=3 # -1 -> empty, 0 -> X, 1 -> O
NUM_FEATURES = NUM_PLAYERS*SQUARES*SQUARE_VALUES+1  # Total number of features (inputs to the DNN)
NUM_ACTIONS=SQUARES # Maximum number of actions (outputs from the DNN)
CHANCE_TO_RANDOM_ACTION = 0.1 # Chance to move randomly to handle exploration
DISCOUNT_FACTOR = 0.9 # Discounts future rewards based on time from the current state


## Encode State

This function takes data about the game and encodes it as an array that can be entered into a DNN. This is where the "secret sauce" goes, where we describe the state of the board game in a way that is conducive to successful training.

A few items worth noting about the way I encoded Tic Tac Toe:

### All of the features are binary

Continuous (i.e. floating point) features are only effective if they represent a gradient that (somewhat) relates linearly to the output (or linearly to the odds of the output when using sigmoid/tanh).  Note that, although I'm representing a board square as -1/0/+1 for empty/X/O respectively, trying to cram this information into a single square assumes that "empty" is the opposite of "O".  Even if I re-ordered so "X" could be the opposite of "O", this will still cause strange interactions that will make the network ineffective (try it and see for yourself!).

### Initiative is encoded by duplicating the entire game state

Obviously, initiative (whose turn it is to move) is incredibly important.  I encoded this by duplicating the entire board state, and only activating one of the copies at a time.  This is similar to having a separate DNN for player X and player O, except that by having a single network, it can learn some common ideas.  I believe having two separate networks would actually perform better, but haven't tried it.



In [18]:
def EncodeState(currentPlayer, board):
    features = [0]*(NUM_FEATURES)
    playerShift = (currentPlayer)*SQUARES*SQUARE_VALUES
    for x in range(0,SQUARES):
        squareValue = board[x]+1;
        features[playerShift + (x*SQUARE_VALUES) + squareValue] = 1.0
    features[NUM_PLAYERS*SQUARES*SQUARE_VALUES] = 1.0 # Bias term
    return features

In [19]:
# This contains the encoded state and the action/player so value can be assigned from future rewards.
class TrainingExample:
    def __init__(self,turn,features,action,playerIndex):
        self.turn = turn
        self.features = features
        self.action = action
        self.playerIndex = playerIndex

# Takes an action and updates the state
def TakeAction(board, player, move):
    board[move] = player

# Setermines if the current state is terminal
def GetWinner(board):
    for player in range(1,3):
        if board[0]==player and board[1]==player and board[2]==player:
            return player
        if board[3]==player and board[4]==player and board[5]==player:
            return player
        if board[6]==player and board[7]==player and board[8]==player:
            return player
        if board[0]==player and board[3]==player and board[6]==player:
            return player
        if board[1]==player and board[4]==player and board[7]==player:
            return player
        if board[2]==player and board[5]==player and board[8]==player:
            return player
    return -1
    
# Plays a single match and stores training examples in X_input and Y_input.
# Prints some information about the game ifdebug is set to True
def PlayMatch(models, X_input, Y_input, debug, explore):
    trainingExamples = []
    board = [-1]*9
    for turn in range(0,9):
        currentPlayer = turn%2
        encodedState = EncodeState(currentPlayer, board)
        if models is None or models[currentPlayer] is None or (explore and numpy.random.ranf() < CHANCE_TO_RANDOM_ACTION):
            # Play in a random square
            move = -1
            while move == -1:
                move = numpy.random.randint(9)
                if board[move]>=0:
                    move = -1
        else:
            # Query the model
            model = models[currentPlayer]
            features = numpy.array([encodedState,])
            actionValues = model.predict(features, batch_size=1, verbose=0)[0]
            # Choose the action with the highest expected value
            if debug:
                print(currentPlayer,board,actionValues)
            move = -1000000000
            while move == -1000000000:
                move = numpy.argmax(actionValues)
                if board[move]>=0:
                    actionValues[move] = -1000000000
                    move = -1000000000
        if debug:
            print("player",currentPlayer,"makes move",move)
        trainingExamples.append(TrainingExample(turn, encodedState, move, currentPlayer))
        TakeAction(board,currentPlayer,move)
        winner = GetWinner(board)
        if winner > -1:
            break
    if debug:
        print('winner is',winner)
    playerValues = []
    for pIndex in range(0,NUM_PLAYERS):
        if winner == -1:
            value = 8.0 # 8 Reward for a tie
        elif winner == pIndex:
            value = 32.0 # 32 for a win
        else:
            value = 2.0 # 2 for a loss
        playerValues.append(value)
    for t in trainingExamples:
        discountedValue = playerValues[t.playerIndex] * (DISCOUNT_FACTOR ** (turn - t.turn))
        X_input.append(t.features)
        labels = [0]*SQUARES
        labels[t.action] = discountedValue
        Y_input.append(labels)
    return winner

# Play several matches with a fixed seed
def PlayMatches(X_input, Y_input, model, numMatches):
    timeSeed = int(time.time())
    random.seed(timeSeed)
    numpy.random.seed(timeSeed)
    for x in range(0,numMatches):
        PlayMatch([model, model], X_input, Y_input, False, True)
    return X_input,Y_input

# This plays against a pure-random opponent and returns (win, lose, tie)
def TestModel(model):
    random.seed(1)
    numpy.random.seed(1)
    winCount=0
    loseCount=0
    tieCount=0
    for x in range(0,1000):
        winner = PlayMatch([model, None], [], [], False, False)
        if winner == -1:
            tieCount = tieCount+1
        elif winner == 0:
            winCount = winCount+1
        elif winner == 1:
            loseCount = loseCount+1
        else:
            print(winner)
    for x in range(0,1000):
        winner = PlayMatch([None, model], [], [], False, False)
        if winner == -1:
            tieCount = tieCount+1
        elif winner == 0:
            loseCount = loseCount+1
        elif winner == 1:
            winCount = winCount+1
        else:
            print(winner)
    return (winCount,loseCount,tieCount)

# Play a match with no model (pure random) and debug just to show what happens.
PlayMatch(None, [], [], True, True)


player 0 makes move 1
player 1 makes move 7
player 0 makes move 4
player 1 makes move 3
player 0 makes move 0
player 1 makes move 6
player 0 makes move 8
player 1 makes move 5
player 0 makes move 2
winner is -1


-1

In [20]:
model = Sequential()
model.add(Dense(512, input_dim=NUM_FEATURES, init='he_normal', activation='relu'))
model.add(Dropout(0.1))
model.add(Dense(256, init='he_normal', activation='relu'))
model.add(Dropout(0.1))
model.add(Dense(128, init='he_normal', activation='relu'))
model.add(Dropout(0.1))
# Although the values can never be negative, using a relu instead of linear here
#    results in significantly worse performance.  Maybe a leaky relu would help?
model.add(Dense(NUM_ACTIONS, init='he_normal', activation='linear'))

# This is a special loss function that does not consider zero outputs.  The reason why we need this is because there
#     is one output for each action, but the network wants to propagate error for all outputs simulatenously.  This
#     isn't possible here because we only know the value from taking a single action.  This loss function fixes
#     the issue by ignoring all zero outputs and only propagating error for the action that was actually taken.
def non_zero_mean_squared_error(y_true, y_pred):
    total = theano.tensor.sum(theano.tensor.square(abs(theano.tensor.sgn(y_true))*(y_pred - y_true)), axis=-1)
    count = theano.tensor.sum(abs(theano.tensor.sgn(y_true)))
    return total/count

model.compile(loss=non_zero_mean_squared_error, optimizer='sgd')

# Print a model to the console
def DumpModel(model):
    print("LAYERS:",len(model.layers))
    for layer in model.layers:
        print("NUM WEIGHT ARRAYS",len(layer.get_weights()))
        print(layer.get_weights()[1])
        weights = layer.get_weights()[0] # list of numpy arrays
        for x in range(0,NUM_FEATURES):
            print(weights[x])

# This keeps a ring buffer of training examples & labels
X_train = []
Y_train = []
for simulation in range(0,500):
    simulationModel = model
    if simulation == 0:
        # For the first iteration, use a pure-random player
        simulationModel = None
    PlayMatches(X_train, Y_train, simulationModel, 200)
    
    
    print("# Training Examples",len(X_train))
    # Trim the ring buffer
    X_train = X_train[-64000:]
    Y_train = Y_train[-64000:]
    history = model.fit(numpy.array(X_train), numpy.array(Y_train), nb_epoch=20, shuffle=True, batch_size=1024, verbose=0)
    model = history.model
    #DumpModel(model)
    print('Finished epoch',simulation)
    sys.stdout.flush()
    results = TestModel(model)
    print('MODEL TEST (W/L/T):',results)
    sys.stdout.flush()


# Training Examples 1720
Finished epoch 0
MODEL TEST (W/L/T): (164, 230, 1606)
# Training Examples 3489
Finished epoch 1
MODEL TEST (W/L/T): (169, 230, 1601)
# Training Examples 5264
Finished epoch 2
MODEL TEST (W/L/T): (208, 252, 1540)
# Training Examples 7027
Finished epoch 3
MODEL TEST (W/L/T): (205, 240, 1555)
# Training Examples 8786
Finished epoch 4
MODEL TEST (W/L/T): (188, 259, 1553)
# Training Examples 10537
Finished epoch 5
MODEL TEST (W/L/T): (247, 280, 1473)
# Training Examples 12308
Finished epoch 6
MODEL TEST (W/L/T): (340, 375, 1285)
# Training Examples 14072
Finished epoch 7
MODEL TEST (W/L/T): (359, 343, 1298)
# Training Examples 15839
Finished epoch 8
MODEL TEST (W/L/T): (377, 362, 1261)
# Training Examples 17606
Finished epoch 9
MODEL TEST (W/L/T): (355, 284, 1361)
# Training Examples 19376
Finished epoch 10
MODEL TEST (W/L/T): (298, 207, 1495)
# Training Examples 21148
Finished epoch 11
MODEL TEST (W/L/T): (405, 176, 1419)
# Training Examples 22536
Finished epoch 12