## Othello Funcs

In [None]:
GRID_SIZE = 8

def is_valid_move(x, y,board,current_player):
    if board[y][x] != 0:
        return False
    directions = [(0,1),(1,1),(1,0),(1,-1),(0,-1),(-1,-1),(-1,0),(-1,1)]
    for dx, dy in directions:
        if check_direction(x, y, dx, dy,board,current_player):
            return True
    return False

def check_direction(x, y, dx, dy,board,current_player):
    opponent = 1 if current_player == 2 else 2
    x, y = x + dx, y + dy
    if not (0 <= x < GRID_SIZE and 0 <= y < GRID_SIZE) or board[y][x] != opponent:
        return False
    while 0 <= x < GRID_SIZE and 0 <= y < GRID_SIZE:
        if board[y][x] == 0:
            return False
        if board[y][x] == current_player:
            return True
        x, y = x + dx, y + dy
    return False

def make_move(x, y,board,current_player):
    board[y][x] = current_player
    directions = [(0,1),(1,1),(1,0),(1,-1),(0,-1),(-1,-1),(-1,0),(-1,1)]
    for dx, dy in directions:
        if check_direction(x, y, dx, dy,board,current_player):
            flip_direction(x, y, dx, dy,board,current_player)

def flip_direction(x, y, dx, dy,board,current_player):
    x, y = x + dx, y + dy
    while board[y][x] != current_player:
        board[y][x] = current_player
        x, y = x + dx, y + dy

In [None]:
def decide_winner(board,current_player):
    black = 0
    white = 0
    for x in board:
        for y in x:
            black += (y == 2)
            white += (y == 1)
    if(black + white != 64) and current_player == 2:
        return 0
    if(black + white != 64) and current_player == 1:
        return 1
    return int(black >= 32)

## Player class

In [None]:
class Player():
    def __init__(self,**kwargs):
        pass

    def get_move(self,board,current_player):
        pass

### MCTS PLAYER STUFF

In [None]:
import random 

def get_move_random(board,current_player):
    valid_moves = []
    for x in range(GRID_SIZE):
        for y in range(GRID_SIZE):
            if is_valid_move(x,y,board,current_player): valid_moves.append((x,y))
    if(len(valid_moves) == 0): return None    
    return valid_moves[random.randint(0,len(valid_moves)-1)]

In [None]:
import math
from copy import deepcopy
import time

class Node():
    def __init__(self,x,y,player_who_decides_next):
        self.x = x
        self.y = y
        self.children = []
        self.visits = 0
        self.wins = 0
        self.player = player_who_decides_next
    
    def generate_children(self,board):
        for x in range(GRID_SIZE):
            for y in range(GRID_SIZE):
                if is_valid_move(x,y,board,self.player): 
                    if self.player == 1: self.children.append(Node(x,y,2))
                    if self.player == 2: self.children.append(Node(x,y,1))
        if len(self.children) == 0:
            self.children = None

    def select(self,C):
        if(self.player == 2):
            ret = max(self.children,key = lambda child: child.wins/(child.visits+0.001) +  C*( 
            (2.0*math.log(self.visits))/(child.visits+0.001))**0.5)
            return ret
        else:
            # USING LOSES FOR MINIMIZING PLAYER
            ret = max(self.children,key = lambda child: ((child.visits - child.wins)/(child.visits+0.0001) +  C*( 
            (2.0*math.log(self.visits))/(child.visits+0.0001))**0.5))
            return ret

    def rollout(self,depth,C,board): # 1 == simulate from children
        self.visits += 1

        if depth == 0:
            ret = self.simulate(board)
            self.wins += ret
            return ret
        
        
        if self.children == None: 
            ret = decide_winner(board,self.player)
            self.wins += ret
            return ret
        if len(self.children) == 0: self.generate_children(board)
        if self.children == None: 
            ret = decide_winner(board,self.player)
            self.wins += ret
            return ret
        
        
        child = self.select(C)
        make_move(child.x,child.y,board,self.player)
        ret = child.rollout(depth-1,C,board)
        self.wins += ret
        return ret
    
    def simulate(self,board):
        current_player = self.player
        while True:
            move = get_move_random(board,current_player)
            if(move == None): break
            make_move(move[0],move[1],board,current_player)
            if current_player == 2: current_player = 1
            else: current_player = 2
        return decide_winner(board,current_player)
    
    def give_best_move(self,currentplayer):
        if currentplayer == 2 :child = max(self.children,key = lambda child: child.wins/(child.visits+0.001))
        else: child = max(self.children,key = lambda child: (child.visits - child.wins)/(child.visits+0.001))
        return child.x,child.y

class MCTSPlayer(Player):
    def __init__(self,explorationFactor = 1.4,rollouts = 100,selectionDepth = 3,timeLimit = 5.0,timed = False):
        super().__init__()
        self.explorationFactor = explorationFactor
        self.rollouts = rollouts
        self.selectionDepth = selectionDepth
        self.timeLimit = timeLimit
        self.timed = timed

    def get_move(self,board,current_player):
        root = Node(-1,-1,current_player)
        if not self.timed:
            startTime = time.time()
            for i in range(self.rollouts):
                sim = deepcopy(board)
                root.rollout(self.selectionDepth,self.explorationFactor,sim)
            # print("ROLL ",time.time() - startTime)
        else:
            num_rollouts = 0
            startTime = time.time()
            while time.time() - startTime < self.timeLimit:
                num_rollouts+=1
                sim = deepcopy(board)
                root.rollout(self.selectionDepth,self.explorationFactor,sim)

            # print("TIME ",time.time() - startTime," ",num_rollouts)
        return root.give_best_move(current_player)

### Parrallel MCTS

In [None]:
import concurrent.futures


import parrallel_utils
    
class MCTSParallelPlayer(Player):
    def __init__(self,explorationFactor = 1.4,rollouts = 800,selectionDepth = 3,num_threads = 4,timed = False,timeLimit = 3.0):
        super().__init__()
        self.explorationFactor = explorationFactor
        self.rollouts = rollouts
        self.selectionDepth = selectionDepth
        self.num_threads = num_threads
        self.timed = timed
        self.timeLimit = timeLimit

    def get_move(self,board,current_player):
        
        params = [[deepcopy(board),current_player,self.selectionDepth,self.explorationFactor,self.rollouts//self.num_threads,self.timed,self.timeLimit] for x in range(self.num_threads)]
        results = {}
        visits = 0
     
        with concurrent.futures.ProcessPoolExecutor(max_workers=self.num_threads) as executor:
            futures = [executor.submit(parrallel_utils.run_mcts_instance,*param) for param in params]
            
            for future in concurrent.futures.as_completed(futures):
            
                result_node = future.result()
                for child in result_node.children:
                    if (child.x,child.y) not in results:
                        results[(child.x,child.y)] = {'visits' : 0,'wins' : 0}
                    visits += child.visits
                    results[(child.x,child.y)]['visits'] += child.visits
                    results[(child.x,child.y)]['wins'] += child.wins

        # print(visits)
        values  = []
        moves = []
        if(current_player == 2):
            for x,y in results.items():
                values.append(y['wins']/(y['visits'] + 0.001))
                moves.append(x)
        if(current_player == 1):
            for x,y in results.items():
                values.append(y['visits'] - y['wins']/(y['visits'] + 0.001))
                moves.append(x)

        maxi = 0
        for i in range(len(values)):
            if values[maxi] < values[i] : maxi = i

        return moves[maxi][0],moves[maxi][1]
    



### ALPHA-BETA PLAYER

In [None]:
class AlphaBetaPlayer(Player):
    
    def __init__(self,depth = 2):
        super().__init__()
        self.depth = depth
    
    def naiveValue(self,board):
        val = 0
        for x in board:
            for y in x:
                if y == 2: val += 1
                if y == 1: val -= 1
        return val

    def minimax(self,board,depth,alpha,beta,maximizingPlayer,current_player):
    
        valid_moves = []
        for x in range(GRID_SIZE):
            for y in range(GRID_SIZE):
                if is_valid_move(x,y,board,current_player): valid_moves.append((x,y))

        if depth == 0: 
            return self.naiveValue(board)

        if len(valid_moves) == 0: return decide_winner(board,current_player) * 64

        if maximizingPlayer:
            value = -100
            for x,y in valid_moves:
            
                child = deepcopy(board)
                make_move(x,y,child,2)

                value = max(value,self.minimax(child,depth-1,alpha,beta,False,1))
                if value > beta:
                    break
                alpha = max(alpha,value)
            return value
        else:
            value = 100
            for x,y in valid_moves:

                child = deepcopy(board)
                make_move(x,y,child,1)

                value = min(value,self.minimax(child,depth-1,alpha,beta,True,2))
                if value < alpha:
                    break
                beta = min(beta,value)
            return value

    def get_move(self,board,current_player):
        if(current_player == 2):
            valid_moves = []
            for x in range(GRID_SIZE):
                for y in range(GRID_SIZE):
                    if is_valid_move(x,y,board,2): valid_moves.append((x,y))
    
            points = []
            for x,y in valid_moves:
            
                sim = deepcopy(board)
                make_move(x,y,sim,2)
    
                points.append(self.minimax(sim,self.depth,-1000,+1000,False,1))
    
            maxi = 0
            for i in range(len(valid_moves)):
                if points[i] > points[maxi]:
                    maxi = i
    
            return valid_moves[maxi]
        else:
            valid_moves = []
            for x in range(GRID_SIZE):
                for y in range(GRID_SIZE):
                    if is_valid_move(x,y,board,1): valid_moves.append((x,y))
    
            points = []
            for x,y in valid_moves:
            
                sim = deepcopy(board)
                make_move(x,y,sim,1)
    
                points.append(self.minimax(sim,self.depth,-1000,+1000,True,2))
    
            mini = 0
            for i in range(len(valid_moves)):
                if points[i] < points[mini]:
                    mini = i
    
            return valid_moves[mini]
            

### Random Player

In [None]:
class RandomPlayer(Player):
    def __init__(self):
        super().__init__()

    def get_move(self, board, current_player):
        valid_moves = []
        for x in range(GRID_SIZE):
            for y in range(GRID_SIZE):
                if is_valid_move(x,y,board,current_player): valid_moves.append((x,y))
        return valid_moves[random.randint(0,len(valid_moves)-1)]

### Policy Value Player

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
import numpy as np

class PolicyValueNetwork(nn.Module):
    def __init__(self, grid_size):
        super().__init__()
        self.grid_size = grid_size
        
        # Convolutional layers for board processing
        self.conv1 = nn.Conv2d(2, 32, 3, padding=1)
        self.conv2 = nn.Conv2d(32, 64, 3, padding=1)
        self.conv3 = nn.Conv2d(64, 128, 3, padding=1)
        
        # Policy head - predicts move probabilities
        self.policy_conv = nn.Conv2d(128, 2, 1)
        self.policy_fc = nn.Linear(2 * grid_size * grid_size, grid_size * grid_size)
        
        # Value head - predicts game outcome
        self.value_conv = nn.Conv2d(128, 1, 1)
        self.value_fc1 = nn.Linear(grid_size * grid_size, 64)
        self.value_fc2 = nn.Linear(64, 1)
        
    def forward(self, x):
        # Common representation
        x = torch.relu(self.conv1(x))
        x = torch.relu(self.conv2(x))
        x = torch.relu(self.conv3(x))
        
        # Policy output
        policy = torch.relu(self.policy_conv(x))
        policy = policy.view(-1, 2 * self.grid_size * self.grid_size)
        policy = self.policy_fc(policy)
        policy = torch.softmax(policy, dim=1)
        
        # Value output
        value = torch.relu(self.value_conv(x))
        value = value.view(-1, self.grid_size * self.grid_size)
        value = torch.relu(self.value_fc1(value))
        value = torch.tanh(self.value_fc2(value))
        value = (value + 1)/2
        
        return policy, value


In [None]:
def board_to_tensor(board):
    tensor = torch.zeros(2, GRID_SIZE, GRID_SIZE)
    
    for i in range(GRID_SIZE):
        for j in range(GRID_SIZE):
            if board[i][j] == 1:
                tensor[0, i, j] = 1
            elif board[i][j] == 2:
                tensor[1, i, j] = 1
    
    return tensor.unsqueeze(0)  

In [None]:
class RLNode(Node):
    def __init__(self, x, y, player_who_decides_next, prior_prob=0):
        super().__init__(x, y, player_who_decides_next)
        self.prior_prob = prior_prob
        self.children = []
    
    def select(self, C):
        # AlphaZero-style selection with prior probabilities
        if self.player == 2:
            ret = max(self.children, key=lambda child: 
                 child.wins / (child.visits + 0.001) + 
                 C * child.prior_prob * ((self.visits) ** 0.5) / (1 + child.visits))
        else:
            ret = max(self.children, key=lambda child: 
                 (child.visits - child.wins) / (child.visits + 0.001) + 
                 C * child.prior_prob * ((self.visits) ** 0.5) / (1 + child.visits))
        return ret
    
    def generate_children(self, board, policy_network):
        # Get policy from neural network
        board_tensor = board_to_tensor(board)
        policy, _ = policy_network(board_tensor)
        policy = policy.detach().numpy().reshape(GRID_SIZE, GRID_SIZE)
        
        for x in range(GRID_SIZE):
            for y in range(GRID_SIZE):
                if is_valid_move(x, y, board, self.player):
                    prior_prob = policy[x][y]
                    if self.player == 1:
                        self.children.append(RLNode(x, y, 2, prior_prob))
                    if self.player == 2:
                        self.children.append(RLNode(x, y, 1, prior_prob))
        
        if len(self.children) == 0:
            self.children = None
    
    def rollout(self, depth, C, board, policy_value_network):
        self.visits += 1
        

        if self.children == None:
            ret = decide_winner(board, self.player)
            self.wins += ret
            return ret
        
     
        if len(self.children) == 0:
            board_tensor = board_to_tensor(board)
            _, value = policy_value_network(board_tensor)
            value = value.item()
            
            self.generate_children(board, policy_value_network)
            
            if self.children == None:
                ret = decide_winner(board, self.player)
            else:
                ret = value
            
            self.wins += ret
            return ret

        child = self.select(C)
        make_move(child.x, child.y, board, self.player)
        ret = child.rollout(depth - 1, C, board, policy_value_network)
        self.wins += ret
        return ret


In [None]:
def is_game_over(board,current_player):
    for x in range(GRID_SIZE):
        for y in range(GRID_SIZE):
            if is_valid_move(x,y,board,current_player): return False
    return True

In [None]:
class RLMCTSPlayer(Player):
    def __init__(self, explorationFactor=1.4, rollouts=400, selectionDepth=4, timeLimit = 3.0,timed = False,checkpointNUM = 0):
        super().__init__()
        self.policy_value_network = PolicyValueNetwork(GRID_SIZE)
        self.optimizer = optim.Adam(self.policy_value_network.parameters(), lr=0.001)
        self.explorationFactor = explorationFactor
        self.rollouts = rollouts
        self.selectionDepth = selectionDepth
        self.timeLimit = timeLimit
        self.timed = timed
        self.checkpointNUM = checkpointNUM
    
    def get_move(self, board, current_player):
        root = RLNode(-1, -1, current_player)
        if not self.timed:
            for i in range(self.rollouts):
                sim = deepcopy(board)
                root.rollout(self.selectionDepth, self.explorationFactor, sim, self.policy_value_network)
        else:
            num_rollouts = 0
            startTime = time.time()
            while time.time() - startTime < self.timeLimit:
                num_rollouts += 1
                sim = deepcopy(board)
                root.rollout(self.selectionDepth,self.explorationFactor,sim,self.policy_value_network)

        return root.give_best_move(current_player)
    
    def train(self, game_states, moves, outcomes):
        """Train the network using game data"""
        self.policy_value_network.train()
        
        for state, move, outcome in zip(game_states, moves, outcomes):
            state_tensor = board_to_tensor(state)
            policy, value = self.policy_value_network(state_tensor)
            
            # Create target policy (one-hot encoding of the move)
            target_policy = torch.zeros_like(policy)
            move_idx = move[0] * GRID_SIZE + move[1]
            target_policy[0, move_idx] = 1
            
            # Calculate losses
            value_loss = nn.MSELoss()(value, torch.tensor([[outcome]], dtype=torch.float))
            policy_loss = -torch.sum(target_policy * torch.log(policy + 1e-8))
            loss = value_loss + policy_loss
            
            # Update network
            self.optimizer.zero_grad()
            loss.backward()
            self.optimizer.step()
    
    def self_play(self, num_games=20):
        """Generate training data through self-play"""
        game_states = []
        moves = []
        outcomes = []
        
        for _ in range(num_games):
            board = [[0 for _ in range(GRID_SIZE)] for _ in range(GRID_SIZE)]
            board[3][4] = board[4][3] = 2  # White
            board[3][3] = board[4][4] = 1  # Black
            current_player = 1
            states = []
            actions = []
            
            while True:
                # Store current state
                states.append(deepcopy(board))
                
                # Get move
                move = self.get_move(board, current_player)
                actions.append(move)
                
                # Make move
                make_move(move[0], move[1], board, current_player)
                
                # Check if game is over
                current_player = 3 - current_player 
                
                if is_game_over(board,current_player):  
                    winner = decide_winner(board, current_player)
                    
                    # Assign outcome to each state-action pair
                    for state, action in zip(states, actions):
                        game_states.append(state)
                        moves.append(action)
                        outcomes.append(winner)
                    break
                
        # Train network with generated data
        self.train(game_states, moves, outcomes)
        torch.save(self.policy_value_network.state_dict(), 'RLMCTS_5SEC_1.4/checkpoint' + str(self.checkpointNUM) + '.pth')
        self.checkpointNUM+=1
