In [11]:
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import math

In [12]:
class TicTacToe:
    def __init__(self):
        self.board = np.zeros((3, 3), dtype=int)
        self.current_player = 1
    
    def make_move(self, row, col):
        if self.board[row, col] == 0:
            self.board[row, col] = self.current_player
            self.current_player = 3 - self.current_player  # Switch player
            return True
        return False

    def check_winner(self):
        for i in range(3):
            if np.all(self.board[i, :] == self.current_player):
                return self.current_player
            if np.all(self.board[:, i] == self.current_player):
                return self.current_player
        
        if self.board[0, 0] == self.board[1, 1] == self.board[2, 2] == self.current_player:
            return self.current_player
        
        if self.board[0, 2] == self.board[1, 1] == self.board[2, 0] == self.current_player:
            return self.current_player
        
        return 0

    def is_draw(self):
        return np.all(self.board != 0)

    def is_game_over(self):
        return self.check_winner() or self.is_draw()

    def get_valid_moves(self):
        return [(i, j) for i in range(3) for j in range(3) if self.board[i, j] == 0]

    def display(self):
        print(self.board)

    def clone(self):
        clone = TicTacToe()
        clone.board = self.board.copy()
        clone.current_player = self.current_player
        return clone
    
def move_to_index(move):
    return move[0] * 3 + move[1]

In [13]:
class ResNetBlock(nn.Module):
    def __init__(self, in_channels, out_channels, stride = 1):
        super(ResNetBlock, self).__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size = 3, stride = stride, padding = 1)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size = 3, padding = 1)
        self.bn2 = nn.BatchNorm2d(out_channels)
        self.shortcut = nn.Sequential()
        if stride != 1 or in_channels != out_channels:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size = 1, stride = stride),
                nn.BatchNorm2d(out_channels)
            )
    
    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out

In [14]:
class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=9):
        super(ResNet, self).__init__()
        self.in_channels = 64
        self.conv1 = nn.Conv2d(1, 64, kernel_size = 3, stride = 1, padding = 1)
        self.bn1 = nn.BatchNorm2d(64)
        self.layer1 = self._make_layer(block, 64, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, 128, num_blocks[1], stride=2)
        self.fc1 = nn.Linear(128, num_classes)
        self.fc2 = nn.Linear(128, 1)  # For winning probability

    def _make_layer(self, block, out_channels, num_blocks, stride):
        strides = [stride] + [1] * (num_blocks - 1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_channels, out_channels, stride))
            self.in_channels = out_channels
        return nn.Sequential(*layers)
    
    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = F.adaptive_avg_pool2d(out, (1, 1))
        out = out.view(out.size(0), -1)
        policy = self.fc1(out)
        value = torch.tanh(self.fc2(out))
        return policy, value


In [15]:
def ResNet18():
    return ResNet(ResNetBlock, [2, 2])

In [16]:
class MCTSNode:
    def __init__(self, state, parent=None):
        self.state = state
        self.parent = parent
        self.children = {}
        self.visits = 0
        self.value = 0.0
        self.prior = 0.0

    def is_leaf(self):
        return len(self.children) == 0
    
    def expand(self, action_priors):
        for action, prior, in action_priors:
            if action not in self.children:
                self.children[action] = MCTSNode(self.state.clone(), parent=self)
                self.children[action].prior = prior

In [17]:
class MCTS:
    def __init__(self, model, c_puct=1.0, n_playouts=1600):
        self.model = model
        self.c_puct = c_puct
        self.n_playouts = n_playouts

    def _uct_select(self, node):
        return max(node.children.items(), key=lambda act_node: act_node[1].value / (1 + act_node[1].visits) + self.c_puct * act_node[1].prior * math.sqrt(node.visits) / (1 + act_node[1].visits))
    
    def _playout(self, state):
        node = self.root
        while not node.is_leaf():
            action, node = self._uct_select(node)
            state.make_move(*action)

        action_probs, _ = self.model(torch.FloatTensor(state.board).unsqueeze(0).unsqueeze(0))
        action_probs = torch.softmax(action_probs, dim=1).detach().numpy().flatten()
        valid_moves = state.get_valid_moves()
        action_priors = [(move, action_probs[move_to_index(move)]) for  move in valid_moves]
        node.expand(action_priors) 

        leaf_value = self._evaluate(state)
        self._backpropagate(node, leaf_value)

    def _evaluate(self, state):
        _, value = self.model(torch.FloatTensor(state.board).unsqueeze(0).unsqueeze(0))
        return value.item()
    
    def _backpropagate(self, node, value):
        while node:
            node.visits += 1
            node.value += value
            node = node.parent

    def get_move(self, state):
        self.root = MCTSNode(state)
        for _ in range(self.n_playouts):
            self._playout(state.clone())
        return max(self.root.children.items(), key=lambda act_node: act_node[1].visits)[0]

In [18]:
def self_play(game, model, mcts, n_games=1000):
    all_game_data = []
    
    for _ in range(n_games):
        game_data = []  # Store (board, move) pairs during the game
        state = game()
        
        while not state.is_game_over():
            move = mcts.get_move(state)
            game_data.append((state.board.copy(), move))
            state.make_move(*move)
        
        winner = state.check_winner()
        final_game_data = []  # Store (board, policy_target, reward) pairs after the game
        
        for board, move in game_data:
            if winner == state.current_player:
                reward = 1
            elif winner == 3 - state.current_player:
                reward = -1
            else: 
                reward = 0
            
            policy_target = np.zeros(9)
            policy_target[move_to_index(move)] = 1
            
            final_game_data.append((board, policy_target, reward))
        
        all_game_data.extend(final_game_data)
    
    return all_game_data


In [19]:
def train(model, data, epochs, lr=0.01):
    optimizer = optim.Adam(model.parameters(), lr=lr)
    criterion_policy = nn.CrossEntropyLoss()
    criterion_value = nn.MSELoss()

    for epoch in range(epochs):
        total_loss = 0
        total_policy_loss = 0
        total_value_loss = 0
        num_batches = 0
        print(f"\nEpoch {epoch+1}/{epochs} started...")

        for i, (board, policy_target, reward) in enumerate(data):
            board = torch.FloatTensor(board).unsqueeze(0).unsqueeze(0)
            policy_target = torch.FloatTensor(policy_target).unsqueeze(0)
            reward = torch.FloatTensor([reward]).unsqueeze(1)

            optimizer.zero_grad()
            policy, value = model(board)
            
            policy_loss = criterion_policy(policy, policy_target)
            value_loss = criterion_value(value, reward)
            loss = policy_loss + value_loss
            
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            total_policy_loss += policy_loss.item()
            total_value_loss += value_loss.item()
            num_batches += 1
            
            # Print statement for individual batch loss
            print(f"  Batch {i+1}/{len(data)} - Loss: {loss.item():.4f}, "
                  f"Policy Loss: {policy_loss.item():.4f}, Value Loss: {value_loss.item():.4f}")

        avg_loss = total_loss / num_batches if num_batches > 0 else 0
        avg_policy_loss = total_policy_loss / num_batches if num_batches > 0 else 0
        avg_value_loss = total_value_loss / num_batches if num_batches > 0 else 0

        # Print statement for epoch summary
        print(f"Epoch {epoch+1}/{epochs} completed. "
              f"Avg Loss: {avg_loss:.4f}, Avg Policy Loss: {avg_policy_loss:.4f}, Avg Value Loss: {avg_value_loss:.4f}")

In [20]:
game = TicTacToe
model = ResNet18()
mcts = MCTS(model)
data = self_play(game, model, mcts, n_games=10)
train(model, data, epochs=10, lr=0.01)


Epoch 1/10 started...
  Batch 1/80 - Loss: 3.1337, Policy Loss: 2.2102, Value Loss: 0.9235
  Batch 2/80 - Loss: 3.1724, Policy Loss: 2.2144, Value Loss: 0.9579
  Batch 3/80 - Loss: 2.3934, Policy Loss: 2.3324, Value Loss: 0.0610
  Batch 4/80 - Loss: 3.4094, Policy Loss: 3.4069, Value Loss: 0.0025
  Batch 5/80 - Loss: 4.6340, Policy Loss: 4.6338, Value Loss: 0.0002
  Batch 6/80 - Loss: 5.0135, Policy Loss: 5.0135, Value Loss: 0.0000
  Batch 7/80 - Loss: 5.2369, Policy Loss: 5.2368, Value Loss: 0.0000
  Batch 8/80 - Loss: 5.4310, Policy Loss: 5.4310, Value Loss: 0.0000
  Batch 9/80 - Loss: 5.8637, Policy Loss: 5.8637, Value Loss: 0.0000
  Batch 10/80 - Loss: 2.4082, Policy Loss: 2.4082, Value Loss: 0.0000
  Batch 11/80 - Loss: 2.4523, Policy Loss: 2.4523, Value Loss: 0.0000
  Batch 12/80 - Loss: 2.0827, Policy Loss: 2.0827, Value Loss: 0.0000
  Batch 13/80 - Loss: 1.7236, Policy Loss: 1.7236, Value Loss: 0.0000
  Batch 14/80 - Loss: 1.7928, Policy Loss: 1.7928, Value Loss: 0.0000
  Batc