<a href="https://colab.research.google.com/github/KenCoder/td-gammon/blob/main/TD_Gammon.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Import requirements

In [None]:
colab_requirements = [
    "numpy",
    "pandas",
]
import sys, subprocess
def run_subprocess_command(cmd):
    # run the command
    process = subprocess.Popen(cmd.split(), stdout=subprocess.PIPE)
    # print the output
    for line in process.stdout:
        print(line.decode().strip())
        
if "google.colab" in sys.modules:
    for i in colab_requirements:
        run_subprocess_command("pip install " + i)

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/


Backgammon board

In [None]:
# Copyright 2021 Andrew Dunstall

import numpy as np


PLAYER_X = 0
PLAYER_O = 1


class Board:
    _NUM_POINTS = 24
    _STATE_SIZE = 198
    _NUM_CHECKERS = 15

    _MIN_MOVE = 1
    _MAX_MOVE = 6

    def __init__(self):
        self._x_points = np.array([
            0, 0, 0, 0, 0, 5, 0, 3, 0, 0, 0, 0,
            5, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2
        ])
        self._o_points = np.array([
            0, 0, 0, 0, 0, 5, 0, 3, 0, 0, 0, 0,
            5, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 2
        ])
        self._x_bar = 0
        self._o_bar = 0
        self._x_removed = 0
        self._o_removed = 0

    def x_bar(self):
        return self._x_bar

    def o_bar(self):
        return self._o_bar

    def won(self, player):
        if player == PLAYER_X:
            return self._x_removed == self._NUM_CHECKERS
        if player == PLAYER_O:
            return self._o_removed == self._NUM_CHECKERS
        return False

    def x_points(self):
        return self._x_points

    def o_points(self):
        return self._o_points

    def permitted_moves(self, rolls, player):
        # Ensure rolls unique.
        rolls = list(set(rolls))

        permitted = []
        for steps in rolls:
            for position in range(24):
                if self._move_permitted(position, steps, player):
                    permitted.append((position, steps))
            if self._move_permitted("bar", steps, player):
                permitted.append(("bar", steps))

        return permitted

    def move(self, position, steps, player) -> bool:
        player_points = self._x_points if player == PLAYER_X else self._o_points
        opponent_points = self._o_points if player == PLAYER_X else self._x_points

        if not self._move_permitted(position, steps, player):
            return False

        if position == "bar":
            new_position = steps - 1
        else:
            new_position = position + steps

        # Bearing off.
        if new_position == self._NUM_POINTS:
            player_points[position] -= 1
            if player == PLAYER_X:
                self._x_removed += 1
            if player == PLAYER_O:
                self._o_removed += 1
            return True

        n_occupied = opponent_points[self._NUM_POINTS - new_position - 1]
        if n_occupied == 1:
            # Hit
            opponent_points[self._NUM_POINTS - new_position - 1] = 0
            if player == PLAYER_X:
                self._o_bar += 1
            else:
                self._x_bar += 1

        if position == "bar":
            player_points[new_position] += 1
            if player == PLAYER_X:
                self._x_bar -= 1
            if player == PLAYER_O:
                self._o_bar -= 1

        else:
            player_points[position] -= 1
            player_points[position + steps] += 1
        return True

    def state(self):
        return {
            "x_bar": self._x_bar,
            "o_bar": self._o_bar,
            "x_removed": self._x_removed,
            "o_removed": self._o_removed,
            "x_points": [int(n) for n in self._x_points],
            "o_points": [int(n) for n in self._o_points]
        }

    def encode_state(self, turn):
        state = np.zeros(self._STATE_SIZE)

        for point in range(self._NUM_POINTS):
            index = point * 4
            state[index:index+4] = encode_point(self._x_points[point])

        for point in range(self._NUM_POINTS):
            index = (point + 24) * 4
            state[index:index+4] = encode_point(self._o_points[point])

        state[192] = self._x_bar / 2
        state[193] = self._o_bar / 2
        state[194] = self._x_removed / self._NUM_CHECKERS
        state[195] = self._o_removed / self._NUM_CHECKERS
        state[196] = 1 - turn
        state[197] = turn

        return state

    def _move_permitted(self, position, steps, player) -> bool:
        player_points = self._x_points if player == PLAYER_X else self._o_points
        opponent_points = self._o_points if player == PLAYER_X else self._x_points

        if steps < self._MIN_MOVE or steps > self._MAX_MOVE:
            return False

        if position == "bar":
            if player == PLAYER_X and self._x_bar == 0:
                return False
            if player == PLAYER_O and self._o_bar == 0:
                return False

            new_position = steps - 1
            n_occupied = opponent_points[self._NUM_POINTS - new_position - 1]
            if n_occupied >= 2:
                return False

            return True

        if player == PLAYER_X and self._x_bar != 0:
            return False
        if player == PLAYER_O and self._o_bar != 0:
            return False

        # No checkers to move at this position.
        if player_points[position] == 0:
            return False

        new_position = position + steps
        # Note may be 24 if bearing off.
        if new_position > self._NUM_POINTS:
            return False

        if new_position == self._NUM_POINTS:
            return True

        # Point occupied by opponent.
        n_occupied = opponent_points[self._NUM_POINTS - new_position - 1]
        if n_occupied >= 2:
            return False

        return True


def encode_point(n_checkers):
    arr = np.zeros(4)
    if n_checkers == 1:  # Blot
        arr[0] = 1
    if n_checkers >= 2:  # Made point
        arr[1] = 1
    if n_checkers == 3:
        arr[2] = 1
    if n_checkers > 3:
        arr[3] = (n_checkers - 3) / 2
    return arr


Game

In [None]:
# Copyright 2021 Andrew Dunstall

import logging
import random
import time

class Game:
    def __init__(self, agent1, agent2):
        self._agents = [agent1, agent2]
        self._board = Board()

    def won(self, player):
        return self._board.won(player)

    def play(self):
        start = time.time()

        turn = random.randint(0, 1)
        logging.debug(f"game started [player = {turn}]")
        steps = 0
        while not self._board.won(turn):
            turn = 1 - turn

            self._agents[turn].turn(self._board)
            self._agents[turn].update(self._board)
            steps += 1

        duration = time.time() - start
        logging.info(f"game complete [duration = {duration}s], [winner = {turn}] [steps = {steps}]")


In [None]:
# Copyright 2021 Andrew Dunstall

import abc
import random


class Agent(abc.ABC):
    @abc.abstractmethod
    def turn(self, board):
        pass

    @abc.abstractmethod
    def update(self, board):
        pass

    def _roll(self):
        roll1 = random.randint(1, 6)
        roll2 = random.randint(1, 6)
        if roll1 == roll2:
            # Doubles.
            return [roll1] * 4
        return [roll1, roll2]


In [None]:
# Copyright 2021 Andrew Dunstall

import random
import logging

class RandomAgent(Agent):
    def __init__(self, player):
        self._player = player

    def turn(self, board):
        rolls = self._roll()

        while len(rolls) > 0:
            permitted = board.permitted_moves(rolls, self._player)
            if len(permitted) == 0:
                return

            move = random.choice(permitted)
            if not board.move(*move, self._player):
                logging.error("td-gammon player requested invalid move")
                continue

            del rolls[rolls.index(move[1])]

    def update(self, board):
        pass


In [None]:
# Copyright 2021 Andrew Dunstall

import logging

class TDGammonAgent(Agent):
    def __init__(self, model, player):
        self._model = model
        self._player = player

    def turn(self, board):
        rolls = self._roll()

        while len(rolls) > 0:
            move = self._model.action(board, rolls, self._player)
            # When no moves remaining end the turn.
            if move is None:
                return

            if not board.move(*move, self._player):
                logging.error("td-gammon player requested invalid move")
                continue

            del rolls[rolls.index(move[1])]

    def update(self, board):
        self._model.update(board, self._player)


In [None]:
# Copyright 2021 Andrew Dunstall

import copy
import datetime
import logging
import os
import random
import time

import numpy as np
import tensorflow as tf

class Model:
    """
    Model wraps the neural net and provides methods for training and
    action selection by the agent.
    """
    _LAMBDA = 0.7
    _ALPHA = 0.1

    def __init__(self, restore_path = None):
        """Construct a model with random weights.

        Arguments:
        restore_path -- path to stored checkpoint to restore if given
            (default None)
        """
        inputs = tf.keras.Input(shape=(198,))
        x = tf.keras.layers.Dense(40, activation="sigmoid")(inputs)
        outputs = tf.keras.layers.Dense(1, activation="sigmoid")(x)
        self._model = tf.keras.Model(inputs=inputs, outputs=outputs)

        # Lazily initialize trace once the shape of the gradients is known.
        self._trace = []

        game = Game(
            TDGammonAgent(self, 0),
            TDGammonAgent(self, 1)
        )
        self._state = tf.Variable(game._board.encode_state(0))
        self._value = tf.Variable(self._model(self._state[np.newaxis]))

        if restore_path is not None:
            self.load(restore_path)

    def train(self, n_episodes=5000, n_validation=500, n_checkpoint=500, n_tests=1000):
        """Trains the model.

        Arguments:
        n_episodes -- number of episodes to train (default 5000)
        n_validation -- number of episodes between testing the model
            (default 500)
        n_checkpoint -- number of episodes between saving the model
            (default 500)
        n_tests -- number of episodes to test (default 1000)
        """
        logging.info("training model [n_episodes = %d]", n_episodes)
        for episode in range(1, n_episodes + 1):
            logging.info("running episode [episode = %d]", episode)
            if episode % n_validation == 0:
                self.test(n_tests)
            if episode > 1 and episode % n_checkpoint == 0:
                self.save()

            player = random.randint(0, 1)
            game = Game(
                TDGammonAgent(self, player),
                TDGammonAgent(self, 1 - player)
            )
            game.play()

            self._reset_trace()

        self.save()

    def test(self, n_episodes=100):
        """Tests the model against a random agent.

        Arguments:
        n_episodes -- number of episodes to test (default 100)
        """
        logging.info("testing model [n_episodes = %d]", n_episodes)
        wins = 0
        for episode in range(1, n_episodes + 1):
            player = random.randint(0, 1)
            game = Game(
                TDGammonAgent(self, player),
                RandomAgent(1 - player)
            )
            game.play()

            if game.won(player):
                wins += 1

            logging.info("game complete [model wins %d] [episodes %d]", wins, episode)

        logging.info("test complete [model win ratio %f]", wins/n_episodes)

    def action(self, board, roll, player):
        """Predicts the optimal move given the current state.

        This calculates each afterstate for all possible moves given the
        current state and selects the action that leads to the state with
        the greatest afterstate value.

        Arguments:
        board -- board containing the game state
        roll -- list of dice rolls left in the players turn
        player -- number of the player
        """
        start = time.time()

        max_move = None
        max_prob = -np.inf
        permitted = board.permitted_moves(roll, player)
        for move in permitted:
            afterstate = copy.deepcopy(board)
            if not afterstate.move(*move, player):
                logging.error("model requested an invalid move")
                continue

            state = afterstate.encode_state(player)[np.newaxis]
            prob = tf.reduce_sum(self._model(state))
            # The network gives the probability of player 0 winning so must
            # change if player 1.
            prob = 1 - prob if player == 1 else prob

            if prob > max_prob:
                max_prob = prob
                max_move = move

        if self._state is None:
            self._state = tf.Variable(board.encode_state(player))
        if self._value is None:
            self._value = tf.Variable(self._model(self._state[np.newaxis]))

        duration = time.time() - start
        logging.debug("playing move [player = %d] [move = %s] [winning prob = %f] [duration = %ds]", player, str(max_move), max_prob, duration)
        return max_move

    def update(self, board, player):
        """Updates the model given the current state and reward.

        This is expected to be called after the player has made their move.

        The aim is to move the predicted values towards the actual reward
        using TD-lambda.

        Arguments:
        board -- board containing the game state
        roll -- list of dice rolls left in the players turn
        """
        start = time.time()

        x_next = board.encode_state(player)
        with tf.GradientTape() as tape:
            value_next = self._model(x_next[np.newaxis])

        trainable_vars = self._model.trainable_variables
        grads = tape.gradient(value_next, trainable_vars)

        # Lazily initialize when gradient shape known.
        if len(self._trace) == 0:
            for grad in grads:
                self._trace.append(tf.Variable(
                    tf.zeros(grad.get_shape()), trainable=False
                ))

        if player == 0 and board.won(player):
            reward = 1
        else:
            reward = 0

        td_error = tf.reduce_sum(reward + value_next - self._value)
        for i in range(len(grads)):
            self._trace[i].assign((self._LAMBDA * self._trace[i]) + grads[i])

            grad_trace = self._ALPHA * td_error * self._trace[i]
            self._model.trainable_variables[i].assign_add(grad_trace)

        self._state = tf.Variable(x_next)
        self._value = tf.Variable(value_next)

        duration = time.time() - start
        logging.debug("updating model [player = %d] [duration = %ds]", player, duration)

    def load(self, path):
        logging.info("loading checkpoint [path = %s]", path)

        ckpt = tf.train.Checkpoint(model=self._model, state=self._state, value=self._value)
        ckpt.restore(path)

    def save(self):
        if not os.path.exists('checkpoint'):
            os.mkdir('checkpoint')

        directory = 'checkpoint/model-' + str(datetime.datetime.now()).replace(' ', '_')
        if not os.path.exists(directory):
            os.mkdir(directory)

        ckpt = tf.train.Checkpoint(model=self._model, state=self._state, value=self._value)
        path = ckpt.save(directory)

        logging.info("saving checkpoint [path = %s]", path)

        return path

    def _reset_trace(self):
        for i in range(len(self._trace)):
            self._trace[i].assign(tf.zeros(self._trace[i].get_shape()))


In [None]:
# Training
m = Model()
m.train(5)

In [None]:
import logging
logging.getLogger().setLevel(logging.DEBUG)


In [None]:
m.test(5)

INFO:root:testing model [n_episodes = 5]
DEBUG:root:game started [player = 0]
DEBUG:root:playing move [player = 1] [move = (5, 2)] [winning prob = 0.047180] [duration = 0s]
DEBUG:root:playing move [player = 1] [move = (5, 3)] [winning prob = 0.047462] [duration = 0s]
DEBUG:root:updating model [player = 1] [duration = 0s]
DEBUG:root:playing move [player = 1] [move = (23, 1)] [winning prob = 0.050074] [duration = 0s]
DEBUG:root:playing move [player = 1] [move = (7, 6)] [winning prob = 0.049836] [duration = 0s]
DEBUG:root:updating model [player = 1] [duration = 0s]
DEBUG:root:playing move [player = 1] [move = ('bar', 4)] [winning prob = 0.050004] [duration = 0s]
DEBUG:root:playing move [player = 1] [move = (13, 6)] [winning prob = 0.052590] [duration = 0s]
DEBUG:root:updating model [player = 1] [duration = 0s]
DEBUG:root:playing move [player = 1] [move = (12, 5)] [winning prob = 0.052117] [duration = 0s]
DEBUG:root:playing move [player = 1] [move = (17, 2)] [winning prob = 0.051685] [dura