<a href="https://colab.research.google.com/github/emm32449/MCTS-in-Python/blob/main/AddingGame_T4_GPU_TensorFlow.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Assume we have a simple game state
class GameState:
    def __init__(self, state):
        self.state = state

    def __str__(self):
        return str(self.state)

    def get_legal_moves(self):
        return [0, 1, 2, 3]
        #return [-10, -9, -8, -7, -6, -5, -4, -3, -2, -1, 0, 1, 2, 3]

    def get_initial_state(self):
        return GameState(0)

    def make_move(self, move):
        return GameState(self.state + move)

    def is_terminal(self):
        return self.state >= 10

    def get_reward(self):
        # Give a positive reward if the state is exactly 10
        if self.state == 10:
            return 1
        # Give a negative reward if the state exceeds 10
        elif self.state > 10:
            return -1
        # Give a reward based on how close the state is to 10
        else:
            return 1 - abs(self.state - 10) / 10

    def reset(self):
        self.state = 0

    def copy(self):
        return GameState(self.state)

    def to_array(self):
        return [self.state]

If you plan to process multiple samples at once (in a batch), you’ll need to ensure that your tensors are correctly shaped. For example, if you want to process a batch of B samples, each of shape (N,), you’ll need to stack them into a single tensor of shape (B, N) before passing them to the model. In this case, you won’t need the None index (expansion fcn), because the batch dimension will already be present.

In [None]:
import math
import random
import numpy as np

import tensorflow as tf

class Node:
    def __init__(self, game_state, policy, parent=None, move=None):
        self.game_state = game_state
        self.policy = policy
        self.parent = parent
        self.move = move
        self.children = []
        self.visits = 0
        self.wins = 0
    def __str__(self):
        return f"GameState: {self.game_state}, Move: {self.move}, Visits: {self.visits}, Wins: {self.wins}"

class MCTS:
    def __init__(self, model, root):
        self.model = model
        self.root = root
        self.optimizer = tf.keras.optimizers.Adam(learning_rate=0.001)
        self.scheduler = tf.keras.optimizers.schedules.ExponentialDecay(
            initial_learning_rate=0.1,
            decay_steps=100,
            decay_rate=0.1
        )
        self.optimizer.learning_rate = self.scheduler

    def choose_action(self, node):
        # Check the mode of the model
        if self.model.training:
            # If the model is in training mode, use probabilistic action selection
            visit_counts = np.array([child.visits for child in node.children])
            visit_dist = visit_counts / visit_counts.sum()
            chosen_action = node.children[np.random.choice(len(node.children), p=visit_dist)].move
        else:
            # If the model is in evaluation mode, use deterministic action selection
            max_visits = max(child.visits for child in node.children)
            chosen_action = [child.move for child in node.children if child.visits == max_visits][0]
        return chosen_action

    def selection(self, node):
        # While the current node has children (i.e., it's not a leaf node)
        while len(node.children) > 0:
            # Initialize the maximum value and the selected node
            max_value = -np.inf
            selected_node = None

            # Loop over each child of the current node
            for child in node.children:
                # Convert the game state to a tensor
                state_tensor = tf.convert_to_tensor(node.game_state.to_array(), dtype=tf.float32)
                # Use the neural network to evaluate the game state and estimate the policy and value
                policy, value = self.model(state_tensor[None, ...])

                # Compute the Q-value (average reward) of the child node
                Q = child.wins / child.visits if child.visits != 0 else 0
                # Compute the U-value (exploration term) of the child node
                # U = tf.reduce_mean(policy) * tf.math.sqrt(node.visits) / (1 + child.visits)
                U = tf.reduce_mean(policy) * tf.math.sqrt(tf.cast(node.visits, tf.float32)) / (1 + tf.cast(child.visits, tf.float32))
                # The value of the node is the sum of the Q-value and U-value
                node_value = Q + U

                # If the node's value is greater than the current maximum value
                if node_value > max_value:
                    # Update the maximum value and the selected node
                    max_value = node_value
                    selected_node = child

            # Move to the selected node
            node = selected_node

        # Return the final selected node
        return node

    def expansion(self, node):
        # Get the list of legal moves from the game state
        legal_moves = node.game_state.get_legal_moves()

        # For each legal move, create a new node and add it to the children of the current node
        for move in legal_moves:
            new_game_state = node.game_state.make_move(move)
            state_tensor = tf.convert_to_tensor(new_game_state.to_array(), dtype=tf.float32)
            policy, value = self.model(state_tensor[None, ...])
            child_node = Node(new_game_state, policy, parent=node, move=move)
            node.children.append(child_node)

        return node.children

    def simulation(self, node):
        # Make a copy of the game state
        game_state = node.game_state.copy()

        # While the game is not over
        while not game_state.is_terminal():
            # Convert the game state to a tensor
            state_tensor = tf.convert_to_tensor(game_state.to_array(), dtype=tf.float32)
            # Use the neural network to estimate the policy and value
            policy, _ = self.model(state_tensor[None, ...])
            # Convert the policy to a probability distribution
            policy_dist = tf.nn.softmax(policy).numpy()[0]
            # Get the list of legal moves
            legal_moves = game_state.get_legal_moves()
            # Choose a move based on the policy
            move = np.random.choice(legal_moves, p=policy_dist)
            # Apply the move to get the next game state
            game_state = game_state.make_move(move)

        # Return the reward associated with the terminal state
        return game_state.get_reward()

    def backpropagation(self, node, reward):
        # While node is not None
        while node is not None:
            # Update the visit count of the node
            node.visits += 1

            # Update the win count of the node
            node.wins += reward

            # Move to the parent node
            node = node.parent

    def run(self, simulations):
        for _ in range(simulations):
            # Start from the root node
            node = self.root

            # Selection
            node = self.selection(node)

            # Skip expansion, simulation, and backpropagation if a terminal node is selected
            if node.game_state.is_terminal():
                continue

            # Expansion
            if not node.game_state.is_terminal():
                node = random.choice(self.expansion(node))

            # Simulation
            reward = self.simulation(node)

            # Backpropagation
            self.backpropagation(node, reward)

            # Choose an action
            chosen_action = self.choose_action(node)

            # Get the list of children that match the chosen action
            matching_children = [child for child in node.children if child.move == chosen_action]

            # Check if there are any matching children
            if matching_children:
                # If there are, set the root to the first matching child
                self.root = matching_children[0]
            else:
                # If there are no matching children, handle the error appropriately
                print("No child found with the chosen action.")

    def print_tree(self, node, indent=""):
        print(indent + str(node))
        for child in node.children:
            self.print_tree(child, indent + "  ")

    def self_play(self, network, game, game_number, num_simulations=50):
        states = []
        policies = []
        current_state = game.get_initial_state()
        root = Node(current_state, None)

        while not current_state.is_terminal():
            # Perform MCTS simulations from the root
            for _ in range(num_simulations):
                leaf = self.selection(root)
                children = self.expansion(leaf)
                reward = self.simulation(random.choice(children))
                self.backpropagation(leaf, reward)

            # Get the visit counts of the root's children
            visit_counts = tf.convert_to_tensor([child.visits for child in root.children], dtype=tf.float32)

            # Convert the visit counts to a policy
            policy = tf.nn.softmax(visit_counts).numpy().tolist()

            # Choose an action based on the policy
            action_index = tf.random.categorical(tf.math.log(visit_counts[None, ...]), 1).numpy()[0][0]
            action = root.children[action_index].move

            # Store the numerical representation of the state and policy
            states.append(current_state.to_array())
            policies.append(policy)

            # Apply the action to get the next state
            current_state = current_state.make_move(action)

            # Update the root node to the child node corresponding to the chosen action
            root = root.children[action_index]

        # Get the reward from the final state
        reward = current_state.get_reward()

        # Perform backpropagation after the game ends
        self.backpropagation(root, reward)

        return states, policies, reward

    def train(self, states, policies, reward, epochs):
        # Convert the states, policies, and reward to TensorFlow tensors
        states = tf.convert_to_tensor(states, dtype=tf.float32)
        policies = tf.convert_to_tensor(policies, dtype=tf.float32)
        result = tf.convert_to_tensor([reward] * len(states), dtype=tf.float32)[..., None]

        # Loop over the number of training epochs
        for _ in range(epochs):
            # Get the predicted policy and value from the network
            with tf.GradientTape() as tape:
                policy_pred, value_pred = self.model(states)

                # Compute the policy loss as the KL divergence between the predicted and target policy
                policy_loss = tf.keras.losses.KLDivergence()(policies, tf.nn.softmax(policy_pred))
                # Compute the value loss as the mean squared error between the predicted and actual reward
                value_loss = tf.keras.losses.MSE(result, value_pred)
                # The total loss is the sum of the policy loss and value loss
                loss = policy_loss + value_loss

            # Compute gradients and update the network parameters
            gradients = tape.gradient(loss, self.model.trainable_variables)
            self.optimizer.apply_gradients(zip(gradients, self.model.trainable_variables))

        # Return the final loss
        return loss.numpy()

In [None]:
class ResidualBlock(tf.keras.layers.Layer):
    def __init__(self, num_channels):
        super().__init__()
        self.fc1 = tf.keras.layers.Dense(num_channels, activation='relu')
        self.fc2 = tf.keras.layers.Dense(num_channels)

    def call(self, inputs):
        x = self.fc1(inputs)
        x = self.fc2(x)
        x += inputs  # Skip connection
        return tf.nn.relu(x)

class PolicyValueResNet(tf.keras.Model):
    def __init__(self, num_actions):
        super().__init__()
        self.hidden1 = tf.keras.layers.Dense(64, activation='relu')
        self.resblock = ResidualBlock(64)
        self.hidden2 = tf.keras.layers.Dense(64, activation='relu')

        # Policy Head forms probabilities for each action, distribution
        self.policy_head = tf.keras.layers.Dense(num_actions)

        # Value Head predicts winner of game from each position, scaler
        self.value_head = tf.keras.layers.Dense(1)

    def call(self, state):
        x = self.hidden1(state)
        x = self.resblock(x)
        x = self.hidden2(x)
        policy = tf.nn.softmax(self.policy_head(x))
        value = tf.math.tanh(self.value_head(x))
        return policy, value

Initial Training Loop

In [None]:
# Create the game and network
game = GameState(0)
num_actions = len(game.get_legal_moves())
network = PolicyValueResNet(num_actions)

# Create a uniform policy
uniform_policy = [1.0 / num_actions] * num_actions

root = Node(game.get_initial_state(), uniform_policy)

# Create the MCTS
mcts = MCTS(network, root)

# Initialize a list to store the losses
losses = []

epochs = 1 # Play games
game_number = 1

# Define the optimizer
optimizer = tf.keras.optimizers.Adam()

# Define the learning rate scheduler
scheduler = tf.keras.optimizers.schedules.ExponentialDecay(
    initial_learning_rate=0.1,
    decay_steps=100,
    decay_rate=0.1
)
optimizer.learning_rate = scheduler

# Generate self-play data and train the network
for i in range(epochs):
    states, actions, reward = mcts.self_play(network, game, game_number)
    loss = mcts.train(states, actions, reward, epochs)
    losses.append(loss)
    game_number += 1

    # Step the learning rate scheduler
    scheduler(optimizer.iterations)

Save - AddingGame_MCTS_Model2.pth

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Define the checkpoint and the checkpoint manager
checkpoint = tf.train.Checkpoint(model=network, optimizer=mcts.optimizer, scheduler=scheduler)
checkpoint_manager = tf.train.CheckpointManager(checkpoint, '/content/drive/My Drive/MCTS in Python/AddingGame_MCTS_Model2.ckpt', max_to_keep=5)

# Save the model, optimizer, game number, and losses
checkpoint.game_number = game_number
checkpoint.losses = losses
checkpoint_manager.save()

Mounted at /content/drive


ValueError: `Checkpoint` was expecting scheduler to be a trackable object (an object derived from `Trackable`), got <keras.optimizers.schedules.learning_rate_schedule.ExponentialDecay object at 0x7d9c50406b00>. If you believe this object should be trackable (i.e. it is part of the TensorFlow Python API and manages state), please open an issue.

Load & Continue - AddingGame_MCTS_Model2.pth

In [None]:
from google.colab import drive
drive.mount('/content/drive')

# Create the game and network
game = GameState(0)
num_actions = len(game.get_legal_moves())
network = PolicyValueResNet(num_actions)

# Create a uniform policy
uniform_policy = [1.0 / num_actions] * num_actions

root = Node(game.get_initial_state(), uniform_policy)

# Create the MCTS
mcts = MCTS(network, root)

# Define the checkpoint and the checkpoint manager
checkpoint = tf.train.Checkpoint(model=network, optimizer=mcts.optimizer, scheduler=mcts.scheduler)
checkpoint_manager = tf.train.CheckpointManager(checkpoint, '/content/drive/My Drive/MCTS in Python/AddingGame_MCTS_Model2.ckpt', max_to_keep=5)

# Restore the latest checkpoint
checkpoint.restore(checkpoint_manager.latest_checkpoint)

game_number = checkpoint.game_number.numpy()
losses = checkpoint.losses.numpy().tolist()

# Continue training for...<epochs> more games
epochs=800

for _ in range(epochs):
    states, actions, reward = mcts.self_play(network, game, game_number)
    loss = mcts.train(states, actions, reward, epochs)
    losses.append(loss)
    game_number += 1

    # Step the learning rate scheduler
    mcts.scheduler(mcts.optimizer.iterations)

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


Plot

In [None]:
import matplotlib.pyplot as plt
# Print and plot the losses
print(f"Average Loss: {sum(losses) / len(losses)}, Max Loss: {max(losses)}, Min Loss: {min(losses)}")
plt.plot(losses)
plt.xlabel('Game')
plt.ylabel('Loss')
plt.title('Loss per Game')
plt.show()

Play, Eval() - AddingGame_MCTS_Model2.pth

In [None]:
# Initialize a list to store the results of each game
win_rates = []

from google.colab import drive
drive.mount('/content/drive')

# See the model playout 10 games using MCTS

# Create the game and network
game = GameState(0)
num_actions = len(game.get_legal_moves())
network = PolicyValueResNet(num_actions)

# Create a uniform policy
uniform_policy = [1.0 / num_actions] * num_actions

root = Node(game.get_initial_state(), uniform_policy)

# Create the MCTS
mcts = MCTS(network, root)

# Define the checkpoint and the checkpoint manager
checkpoint = tf.train.Checkpoint(model=network, optimizer=mcts.optimizer, scheduler=mcts.scheduler)
checkpoint_manager = tf.train.CheckpointManager(checkpoint, '/content/drive/My Drive/MCTS in Python/AddingGame_MCTS_Model2.ckpt', max_to_keep=5)

# Restore the latest checkpoint
checkpoint.restore(checkpoint_manager.latest_checkpoint)

game_number = checkpoint.game_number.numpy()
losses = checkpoint.losses.numpy().tolist()

# Simulate 10 games
num_games = 10

for i in range(num_games):
    print(f"Game {i + 1}:")

    # Initialize the game and the root node
    game_state = game.get_initial_state()
    root = Node(game_state, None)

    # Make sure that the MCTS and the root node persist throughout each game
    mcts = MCTS(network, root)

    while not game_state.is_terminal():
        # Perform MCTS simulations from the root
        for _ in range(num_games):
            leaf = mcts.selection(root)
            children = mcts.expansion(leaf)
            reward = mcts.simulation(random.choice(children))
            mcts.backpropagation(leaf, reward)

        # Choose the action that leads to the most visited child node
        action = mcts.choose_action(root)

        # Apply the action to get the next state
        game_state = game_state.make_move(action)

        print(f"Action taken: {action}")

    print(f"Final reward: {reward}")

    # Append the result of the game to the win_rates list
    # Assume a reward of 1 is a win, 0 is a draw, and -1 is a loss
    win_rates.append(reward)

# Calculate the win rate
win_rate = win_rates.count(1) / num_games

# Print and plot the win rates
print(f"Win Rate: {win_rate}")
plt.plot(range(1, num_games + 1), win_rates)
plt.xlabel('Game')
plt.ylabel('Win Rate')
plt.title('Win Rate per Game')
plt.show()