# Temporal Difference Learning for 3D Tic Tac Toe

This notebook contains the implementation of a Temporal Difference (TD) learning model using a Deep Q-Network (DQN) for playing 3D 4x4x4 Tic Tac Toe. The implementation is based on the approach outlined in the provided paper.


In [1]:
# Import necessary libraries
import numpy as np
import os
import random

# Setting Directory
os.chdir('C:/Users/Talha/OneDrive - Higher Education Commission/Documents/GitHub/reinforcement_learning/Project/')

from python_scripts import state_formulation, utils, algorithm, tictactoe_4x4
import torch
import torch.nn as nn
import torch.optim as optim
from torchinfo import summary
import torch.nn.init as init
from tqdm.autonotebook import tqdm

In [2]:
# class customDotProduct(nn.Module):
#     def __init__(self, input_size, output_size, block_size = 4):
#         super(customDotProduct, self).__init__()
#         self.input_size = input_size
#         self.output_size = output_size
#         self.block_size = block_size
#         # Convert structure_weight to nn.Parameter
#         self.structure_weight = torch.zeros((self.output_size, self.input_size))
#         self.structure_weight = self.get_block_weights(self.structure_weight, block_size)
#         self.structure_weight = nn.ParameterList([nn.Parameter(sw.float()) for sw in self.structure_weight])

#     def get_block_weights(self, weight_list, block_size):
#         for i in range(0, 304, block_size):
#             weight_list[i: i + block_size, i: i + block_size] = init.xavier_normal_(torch.randn(block_size, block_size))
#         learnable_blocks = [weight_list[i:i + block_size, i:i + block_size] for i in range(0, weight_list.shape[0], block_size)]
#         updated = [block for block in learnable_blocks]
#         return updated
    
#     def forward(self, feature_map):
#         self.feature_map = [fm.float() for fm in feature_map]
#         # Calculate dot products and concatenate along dim = 1
#         concatenated_products = torch.cat([torch.matmul(fm.unsqueeze(0), sw) for fm, sw in zip(self.feature_map, self.structure_weight)], dim = 1)
#         return concatenated_products

In [9]:
# # Testing Code
# weights = torch.zeros((304, 304))

# block_size = 4
# for i in range(0, 304, block_size):
#     weights[i: i + block_size, i: i + block_size] = torch.ones(block_size, block_size)
# learnable_blocks = [weights[i:i + block_size, i:i + block_size] for i in range(0, weights.shape[0], block_size)]
# weights = [init.xavier_normal_(block) for block in learnable_blocks]

# print(f'Before Update: Weights = {weights[0]} \n')

# # Assume some loss function and optimizer have been defined
# custom_dot_product_module = customDotProduct(304, 304)
# loss_function = nn.MSELoss()
# optimizer = torch.optim.SGD(custom_dot_product_module.parameters(), lr = 0.01)

# # Example training loop iteration
# optimizer.zero_grad()  # Clear gradients
# output = custom_dot_product_module(rows)  # Perform forward pass
# loss = loss_function(output, torch.randn(1, 304))  # Compute loss
# loss.backward()  # Perform backward pass
# optimizer.step()  # Update weights
# print(f'After Update: Weights = {custom_dot_product_module.structure_weight[0]}')


In [None]:
# class StructuredLinear(nn.Module):
#     def __init__(self):
#         super(StructuredLinear, self).__init__()

#     def get_rows(self, input_tensor):
#         # Get diagonals (across 2 faces),digonals (across 3 faces) and horizontal and vertical rows
#         diag_two_faces = []
#         diag_two_faces.extend(
#             [torch.diagonal(input_tensor[i, :, :]), torch.diagonal(input_tensor[:, i, :]), torch.diagonal(input_tensor[:, :, i]), 
#             torch.diagonal(torch.fliplr(input_tensor)[i, :, :]), torch.diagonal(torch.fliplr(input_tensor)[:, i, :]), torch.diagonal(torch.fliplr(input_tensor)[:, :, i])] 
#             for i in range(input_tensor.shape[0]))
#         diag_two_faces = [item for sublist in diag_two_faces for item in sublist]
        
#         diag_three_faces = []
#         diag_three_faces = [[[[input_tensor[i, i, i], input_tensor[3 - i, i, i], input_tensor[i, 3 - i, i], input_tensor[i, i, 3 - i]] 
#                             for i in range(4)][k][j] for j in range(4) for k in range(4)][l:l + 4] for l in range(0, 16, 4)]
#         diag_three_faces = [torch.tensor([t.item() for t in row]) for row in diag_three_faces]

#         horizontal_and_vertical_rows = []
#         horizontal_and_vertical_rows.extend([input_tensor[i, j, :], input_tensor[i, :, j], input_tensor[:, i, j]]
#                                             for i in range(input_tensor.shape[0]) for j in range(input_tensor.shape[0]))
#         horizontal_and_vertical_rows = [item for sublist in horizontal_and_vertical_rows for item in sublist]
        
#         return horizontal_and_vertical_rows + diag_two_faces + diag_three_faces

#     def forward(self, x):
#         rows = self.get_rows(x)
#         return rows

# class MyNeuralNetwork(nn.Module, tictactoe_4x4.TicTacToe4x4x4):
#     def __init__(self, num_detectors):
#         super(MyNeuralNetwork, self).__init__()

#         self.num_detectors = num_detectors
#         self.structured_layer = StructuredLinear()
        
#         self.custom_operation_layer = customDotProduct(input_size = num_detectors * 4, output_size = num_detectors * 4)

#         self.second_layer = nn.Linear(num_detectors * 4, 32, bias = False)
#         init.xavier_normal_(self.second_layer.weight)

#         self.output_layer = nn.Linear(32, 1, bias = False)
#         self.act = nn.Tanh()

#     def forward(self, x):
#         x = self.structured_layer(x)
#         x = self.custom_operation_layer(x)
#         x = self.act(x) # --> Tanh
#         x = self.second_layer(x)
#         x = self.act(x)
#         x = self.output_layer(x)
#         return x
    
# # Instantiate Model
# model = MyNeuralNetwork(num_detectors = 4)


In [2]:
class customDotProduct(nn.Module):
    def __init__(self, input_size, output_size, block_size = 4):
        super(customDotProduct, self).__init__()
        self.input_size = input_size
        self.output_size = output_size
        self.block_size = block_size
        # Convert structure_weight to nn.Parameter
        self.structure_weight = torch.zeros((self.output_size, self.input_size))
        self.structure_weight = self.get_block_weights(self.structure_weight, block_size)
        self.structure_weight = nn.ParameterList([nn.Parameter(sw.float()) for sw in self.structure_weight])

    def get_block_weights(self, weight_list, block_size):
        for i in range(0, 304, block_size):
            weight_list[i: i + block_size, i: i + block_size] = init.xavier_normal_(torch.randn(block_size, block_size))
        learnable_blocks = [weight_list[i:i + block_size, i:i + block_size] for i in range(0, weight_list.shape[0], block_size)]
        updated = [block for block in learnable_blocks]
        return updated
    
    def forward(self, feature_map):
        self.feature_map = [fm.float() for fm in feature_map]
        # Calculate dot products and concatenate along dim = 1
        concatenated_products = torch.cat([torch.matmul(fm.unsqueeze(0), sw) for fm, sw in zip(self.feature_map, self.structure_weight)], dim = 1)
        return concatenated_products

In [3]:
class StructuredLinear(nn.Module):
    def __init__(self):
        super(StructuredLinear, self).__init__()

    def get_rows(self, input_tensor):
        # Get diagonals (across 2 faces),digonals (across 3 faces) and horizontal and vertical rows
        diag_two_faces = []
        diag_two_faces.extend(
            [torch.diagonal(input_tensor[i, :, :]), torch.diagonal(input_tensor[:, i, :]), torch.diagonal(input_tensor[:, :, i]), 
            torch.diagonal(torch.fliplr(input_tensor)[i, :, :]), torch.diagonal(torch.fliplr(input_tensor)[:, i, :]), torch.diagonal(torch.fliplr(input_tensor)[:, :, i])] 
            for i in range(input_tensor.shape[0]))
        diag_two_faces = [item for sublist in diag_two_faces for item in sublist]
        
        diag_three_faces = []
        diag_three_faces = [[[[input_tensor[i, i, i], input_tensor[3 - i, i, i], input_tensor[i, 3 - i, i], input_tensor[i, i, 3 - i]] 
                            for i in range(4)][k][j] for j in range(4) for k in range(4)][l:l + 4] for l in range(0, 16, 4)]
        diag_three_faces = [torch.tensor([t.item() for t in row]) for row in diag_three_faces]

        horizontal_and_vertical_rows = []
        horizontal_and_vertical_rows.extend([input_tensor[i, j, :], input_tensor[i, :, j], input_tensor[:, i, j]]
                                            for i in range(input_tensor.shape[0]) for j in range(input_tensor.shape[0]))
        horizontal_and_vertical_rows = [item for sublist in horizontal_and_vertical_rows for item in sublist]
        
        return horizontal_and_vertical_rows + diag_two_faces + diag_three_faces

    def forward(self, x):
        rows = self.get_rows(x)
        return rows

class MyNeuralNetwork(nn.Module):
    def __init__(self):
        super(MyNeuralNetwork, self).__init__()
        self.structured_layer = StructuredLinear()
        
        self.custom_operation_layer = customDotProduct(input_size = 304, output_size = 304)

        self.second_layer = nn.Linear(304, 32, bias = False)
        init.xavier_normal_(self.second_layer.weight)

        self.output_layer = nn.Linear(32, 1, bias = False)
        self.act = nn.Tanh()

    def forward(self, x):
        x = self.structured_layer(x)
        x = self.custom_operation_layer(x)
        x = self.act(x) # --> Tanh
        x = self.second_layer(x)
        x = self.act(x)
        x = self.output_layer(x)
        return x
    
# Example usage
model= MyNeuralNetwork()

In [4]:
summary(model, input_size = [4, 4, 4], col_names = ['input_size', 'output_size', 'num_params'])

Layer (type:depth-idx)                   Input Shape               Output Shape              Param #
MyNeuralNetwork                          [4, 4, 4]                 [1, 1]                    --
├─StructuredLinear: 1-1                  [4, 4, 4]                 [4]                       --
├─customDotProduct: 1-2                  [4]                       [1, 304]                  1,216
├─Tanh: 1-3                              [1, 304]                  [1, 304]                  --
├─Linear: 1-4                            [1, 304]                  [1, 32]                   9,728
├─Tanh: 1-5                              [1, 32]                   [1, 32]                   --
├─Linear: 1-6                            [1, 32]                   [1, 1]                    32
Total params: 10,976
Trainable params: 10,976
Non-trainable params: 0
Total mult-adds (Units.MEGABYTES): 0.01
Input size (MB): 0.00
Forward/backward pass size (MB): 0.00
Params size (MB): 0.04
Estimated Total Size (MB): 0

In [5]:
# Training parameters
learning_rate = 0.001
optimizer = optim.Adam(model.parameters(), lr = learning_rate)
loss_function = nn.SmoothL1Loss()
EPSILON = 0.1
GAMMA = 0.9

In [10]:
def e_greedy(value_dict):
    if np.random.random() > EPSILON:
        return max(value_dict, key = lambda k: value_dict[k])
    else:
        return random.choice(list(value_dict.items()))[0]

def func_modify(afterstate):
    return torch.tensor([[[1 if cell == "X" else -1 if cell == "O" else 0 for cell in row] for row in layer] for layer in afterstate])

def benchmark_policy_for_player2(action_list):
    return np.random.choice(action_list)

def train_td_model(model, num_episodes):
    overall_loss = []
    for episode in tqdm(range(num_episodes)):
        env = tictactoe_4x4.TicTacToe4x4x4()

        terminated = False
        current_state = torch.zeros((64))
        reward = 0
        player_turn = "X"
        prev_afterstate = None
        time_idx = 0
        loss_list = []

        while not terminated:
            time_idx += 1
            # Get actions space
            action_space = env.get_action_space()
            value_dict = {}
            for action in action_space:
                copy_tensor = current_state.detach().clone()
                copy_tensor[action] = 1
                value_dict[action] = model.forward(copy_tensor.view(4, 4, 4))
            
            # Here we choose action based on epsilon greedy
            action = e_greedy(value_dict)
            
            current_state, reward, terminated, player_turn = env.step(action) # afterstate

            if time_idx != 1:
                v_new = reward + (GAMMA * model.forward(func_modify(current_state)))
                v = model.forward(func_modify(prev_afterstate))
                loss = loss_function(v, v_new)
                loss_list.append(loss.item())
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

            prev_afterstate = current_state
            # Player 2 -- use benchmark
            player_2_action_space = env.get_action_space()
            player_2_move = benchmark_policy_for_player2(player_2_action_space)
            current_state, reward, terminated, player_turn = env.step(player_2_move)
            
            current_state = func_modify(current_state).view(64)

        mean_loss = np.mean(loss_list)
        print(f'Episode: {episode + 1}, Loss: {mean_loss}')
        overall_loss.append(mean_loss)

In [11]:
# Example usage
num_episodes = 1000  # Number of episodes for training
loss = train_td_model(model, num_episodes)  # Train the model

# Save the trained model
# os.makedirs('C:/Users/Talha/OneDrive - Higher Education Commission/Documents/GitHub/reinforcement_learning/Project/Phase_3_3D_Tic_Tac_Toe/models', exist_ok = True)
# model_path = 'C:/Users/Talha/OneDrive - Higher Education Commission/Documents/GitHub/reinforcement_learning/Project/Phase_3_3D_Tic_Tac_Toe/models/td_tictactoe_model.pth'
# save_model(model, model_path)

# model_path

# 1) Turn all of this to device compatible - (cuda) (train for 1000 episodes and store weights every 100)
# 2) Once the model has been trained, make a function which recieves the models weights and the Neural Network Object and the current action space and state
# the function takes the weights, and inserts them into the neural network object and computes forward pass for each of the possible transitions and returns
# the action corresponding to the maximum value. Be careful as the state that is being passed is a list of lists of lists. So to make that compatible to the
# forward pass, use the func_modify function to get it into a 3D tensor form. Also remember to convert each of the actions (in integer form) to coardinates
# form using the get_coardinates function. Then transition using those coardinates. Once you have now computed the maximum action value, convert it back
# to integer using get_position and that will be returned

# Inference Loop - u will play ur policy (gotten from above function) with another baseline policy like random for the other player. Count the number of
# ones/ minus ones/zeros and report % win, % loss, % draw.


  0%|          | 0/1000 [00:00<?, ?it/s]

Episode: 1, Loss: 0.027332383097308503
Episode: 2, Loss: 5.1807438116956526e-05
Episode: 3, Loss: 0.00019809241865957681
Episode: 4, Loss: 0.0003479837768656366
Episode: 5, Loss: 0.025106080801919006
Episode: 6, Loss: 0.01963167903727005
Episode: 7, Loss: 0.021663222897347605
Episode: 8, Loss: 0.0006128902910542978
Episode: 9, Loss: 0.000210248991799705
Episode: 10, Loss: 0.021815336566850786
Episode: 11, Loss: 0.02595562695856451
Episode: 12, Loss: 0.0006484965807445064
Episode: 13, Loss: 0.002475650533122699
Episode: 14, Loss: 7.743617253671194e-05
Episode: 15, Loss: 0.0008059130394450711
Episode: 16, Loss: 0.0001395504700754244
Episode: 17, Loss: 0.028112671797819195
Episode: 18, Loss: 0.0013305847987094938
Episode: 19, Loss: 5.7880113849427696e-05
Episode: 20, Loss: 0.00010876695018320207
Episode: 21, Loss: 0.00022659868182017817
Episode: 22, Loss: 0.00014484774108068117
Episode: 23, Loss: 0.029176112286971628
Episode: 24, Loss: 0.03018881896342825
Episode: 25, Loss: 0.029532805950