## Import libraries and set config variables

In [6]:
import tensorflow as tf
import numpy
import keras
from keras import Model, layers, losses

# Config vars
BOARD_WIDTH = 5
BOARD_HEIGHT = 5
NUM_EPOCHS = 10
TRAINING_RATE = 0.1
MOMENTUM = 0.9


## Helper functions

In [7]:
# Prints a tf.tensor contents so [0,0] is in the bottom left and [max, max] is in the top right. Meant to match connect4 board layout
def printTensorFlipped(tensor):
        for row in reversed(range(BOARD_HEIGHT)):
            val = tf.gather(tensor, [row])
            tf.print(val, "")

## Define game class

In [8]:
# Holds game state (the game board) and can check if a move is valid
class GameState:
  def __init__(self):
    self.board = self.createEmptyBoard()
    self.lastWinner = 0

  def reset(self):
    self.board = self.createEmptyBoard()

  def isValidMove(self, column):
    for row in range(BOARD_HEIGHT):
      if self.board[column][row] == 0:
        return True

  # Prints out the game board state so [0, 0] is in the bottom left and [BOARD_HEIGHT, BOARD_WIDTH] is in the top right
  def printGameState(self):
    for row in reversed(range(BOARD_HEIGHT)):
        print("[", end='')
        for column in range(BOARD_WIDTH):
          print(self.board[row][column], end='')
          if column != BOARD_WIDTH - 1:
            print(" ", end='')
        
        print("]")

  # Create 2d array representing an empty game board
  # Game board state:
  # 0 = empty
  # 1 = filled by player 1
  # 2 = filled by player 2
  def createEmptyBoard(self):
    # Aliases to make this loop more clear
    numRows = BOARD_HEIGHT
    numColumns = BOARD_WIDTH

    row = []
    for x in range(0, numRows):
        column = []
        for y in range(0, numColumns):
            column.append(0)

        row.append(column)
            
    board = numpy.array(row)
    return board

  # Returns True if either play has won the game
  def isGameOver(self):
    # Aliases to make this loop more clear
    numRows = BOARD_HEIGHT
    numColumns = BOARD_WIDTH

    # To keep this simple just check every cell for every win type
    for x in range(0, numRows):
      for y in range(0, numColumns):
        if self.isHorizontalWin(x, y):
          return True
        if self.isVerticalWin(x, y):
          return True
        if self.isDiagonalWin(x, y):
          return True
        if self.isBoardFull():
          return True

    return False

  # Returns true if there are 4 slots filled by the same player horizontally here
  def isHorizontalWin(self, row, column):
    cellPlayer = self.board[row][column]
    if column + 3 >= BOARD_WIDTH: # Don't want to go out of bounds
      return False
    if cellPlayer == 0: # If cell is empty return False
      return False

    for i in range(1, 4):
      nextCellPlayer = self.board[row][column + i]
      if nextCellPlayer != cellPlayer: # If next cell isn't filled by same player then didn't connect4 here
        return False

    # If we reached here then there is a horizontal win
    self.lastWinner = cellPlayer
    return True

  # Returns true if there are 4 slots filled by the same player vertically here
  def isVerticalWin(self, row, column):
    cellPlayer = self.board[row][column]
    if row + 3 >= BOARD_HEIGHT: # Don't want to go out of bounds
      return False
    if cellPlayer == 0: # If cell is empty return False
      return False

    for i in range(1, 4):
      nextCellPlayer = self.board[row + i][column]
      if nextCellPlayer != cellPlayer: # If next cell isn't filled by same player then didn't connect4 here
        return False

    # If we reached here then there is a vertical win
    self.lastWinner = cellPlayer
    return True

  # Returns true if there are 4 slots filled by the same player diagonally here
  def isDiagonalWin(self, row, column):
    cellPlayer = self.board[row][column]
    if column + 3 >= BOARD_WIDTH:# Don't want to go out of bounds
      return False
    if row + 3 >= BOARD_HEIGHT: # Don't want to go out of bounds
      return False
    if cellPlayer == 0: # If cell is empty return False
      return False

    for i in range(1, 4):
      nextCellPlayer = self.board[row + i][column + i]
      if nextCellPlayer != cellPlayer: # If next cell isn't filled by same player then didn't connect4 here
        return False

    # If we reached here then there is a diagonal win
    self.lastWinner = cellPlayer
    return True

  # Returns true if the board is full
  def isBoardFull(self):
    # Aliases to make this loop more clear
    numRows = BOARD_HEIGHT
    numColumns = BOARD_WIDTH

    # Return false if any of the cells are empty
    for x in range(0, numRows):
      for y in range(0, numColumns):
        if self.board[x][y] == 0:
          return False

    return True

  # Add disc to lowest open row of provided column
  def dropDisc(self, column, player):
    if not self.isValidMove(column):
      return

    for i in range(0, BOARD_HEIGHT):
      if self.board[i][column] == 0:
        self.board[i][column] = player
        return

  def getLastWinner(self):
    return self.lastWinner


## Define the neural network class

In [9]:
numInputs = BOARD_WIDTH * BOARD_HEIGHT #Inputs the whole board state
numNeuronsLayer0 = 128
numNeuronsLayer1 = 128
numNeuronsLayer2 = 128
numNeuronsLayer3 = 128
numNeuronsLayer4 = 128
numOutputs = BOARD_WIDTH #NN outputs a column to drop a disc down

class NeuralNetwork:
  def __init__(self, playerId):
    self.model = keras.Sequential([
      layers.Dense(numNeuronsLayer0, activation=tf.nn.relu, input_shape=(BOARD_WIDTH, BOARD_HEIGHT)),
      layers.Dense(numNeuronsLayer1, activation=tf.nn.relu),
      layers.Dense(numNeuronsLayer2, activation=tf.nn.relu),
      layers.Dense(numNeuronsLayer3, activation=tf.nn.relu),
      layers.Dense(numNeuronsLayer4, activation=tf.nn.relu),
      layers.Dense(numOutputs, activation=tf.nn.softmax)
    ])
    optimizer = keras.optimizers.Adam(learning_rate = TRAINING_RATE)
    self.model.compile(loss=losses.CategoricalCrossentropy(), 
                       optimizer=optimizer, 
                       metrics=tf.metrics.BinaryAccuracy(threshold=0.0))

    self.roundState = [] #Array of each step of the current match
    self.playerId = playerId

  def resetRoundData(self):
    self.roundState = []

  def getNextMove2(self, gameState):
    epochs = 10

    val = 1
    for i in range(0, 4):
      out = self.model(gameState.board)
      gameState.board[i][i] = val
      if val == 1:
        val = 2
      elif val == 2:
        val = 1

      print("\n\nModel output: ")
      printTensorFlipped(out)
      print('Board state:')
      gameState.printGameState()

    print("\n")
    self.model.summary()

  def getNextMove(self, gameState):
      # Run current gamestate through model and get output
      output = self.model(gameState.board)

      maxScore = -10000
      maxScoredColumn = -1
      for index, prediction in enumerate(output[0]):
        if prediction > maxScore and gameState.isValidMove(index):
          maxScore = prediction
          maxScoredColumn = index

      self.roundState.append((gameState.board, maxScoredColumn))
      gameState.dropDisc(maxScoredColumn, self.playerId)

      print(f'Player {self.playerId} put disc in column {maxScoredColumn}')

  def train(self, board, winnerId):
    if winnerId == self.playerId:
      print(f'Player {self.playerId} won! Training based on match data')

  def saveState(self):
    print('Not yet implemented') # Putting these here since python cries about empty funcs

## Make sure tensorflow is using a gpu and setup an empty game board

In [10]:
# Note: Make sure Edit > Notepad settings > Hardware accelerator == GPU
# Make sure TF is using a gpu for training
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
  # Restrict TensorFlow to only use the first GPU
  try:
    tf.config.experimental.set_visible_devices(gpus[0], 'GPU')
    logical_gpus = tf.config.experimental.list_logical_devices('GPU')
    print(len(gpus), "Physical GPUs,", len(logical_gpus), "Logical GPU\n")
  except RuntimeError as e:
    # Visible devices must be set before GPUs have been initialized
    print(e)


# Create neural net instance
network0 = NeuralNetwork(1)
network1 = NeuralNetwork(2)

# Run NUM_EPOCHS matches of connect4
for currentEpoch in range(0, NUM_EPOCHS):
  print('\n\nStarting new game with network0 vs network1')
  # Create empty game board and print it's contents
  gameState = GameState()
  print(f'Created empty game board. Dimensions: [{BOARD_WIDTH}, {BOARD_HEIGHT}]')
  print('Initial board state:')
  gameState.printGameState()

  # Have networks play match against each other
  network0.resetRoundData()
  network1.resetRoundData()
  while not gameState.isGameOver():
    network0.getNextMove(gameState)
    if gameState.isGameOver():
      break

    network1.getNextMove(gameState)
    

  # Train winner based on match data
  winnerId = gameState.getLastWinner()
  network0.train(gameState, winnerId)
  network1.train(gameState, winnerId)
  print('End board state:')
  gameState.printGameState()

#network0.getNextMove2(gameState)
#network0.getNextMove(gameState)
#print(network0.roundState)


1 Physical GPUs, 1 Logical GPU



Starting new game with network0 vs network1
Created empty game board. Dimensions: [5, 5]
Initial board state:
[0 0 0 0 0]
[0 0 0 0 0]
[0 0 0 0 0]
[0 0 0 0 0]
[0 0 0 0 0]
Player 1 put disc in column 0
Player 2 put disc in column 2
Player 1 put disc in column 4
Player 2 put disc in column 2
Player 1 put disc in column 4
Player 2 put disc in column 2
Player 1 put disc in column 4
Player 2 put disc in column 2
Player 2 won! Training based on match data
End board state:
[0 0 0 0 0]
[0 0 2 0 0]
[0 0 2 0 1]
[0 0 2 0 1]
[1 0 2 0 1]


Starting new game with network0 vs network1
Created empty game board. Dimensions: [5, 5]
Initial board state:
[0 0 0 0 0]
[0 0 0 0 0]
[0 0 0 0 0]
[0 0 0 0 0]
[0 0 0 0 0]
Player 1 put disc in column 0
Player 2 put disc in column 2
Player 1 put disc in column 4
Player 2 put disc in column 2
Player 1 put disc in column 4
Player 2 put disc in column 2
Player 1 put disc in column 4
Player 2 put disc in column 2
Player 2 won! Training b