In [8]:
# ignore deprecation warnings ('safe' as long as we don't update packages)
from warnings import filterwarnings
filterwarnings("ignore")

from math import sqrt, log, inf
import copy

import gym
from gym_go.gogame import turn, random_weighted_action

import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F

import matplotlib.pyplot as plt
import numpy as np

# Constants

In [9]:
UCB_C = 30

# Board will be a BOARD_SIZE * BOARD_SIZE board (BOARD_SIZE**2)
BOARD_SIZE = 5
ACTIONSPACE_LENGHT = BOARD_SIZE ** 2 + 1
'''
The state object that is returned by the reset and step functions of the environment is a
6 x BOARD_SIZE x BOARD_SIZE numpy array. All values in the array are either 0 or 1.
'''

# Indication layer channel indices
BLACK = 0
WHITE = 1
TURN = 2
INVALID = 3
PASS = 4
DONE = 5

In [10]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
go_env = gym.make('gym_go:go-v0', size=BOARD_SIZE, komi=0, reward_method='heuristic')

In [11]:
class CNN(nn.Module):

    def __init__(self):
        super().__init__()
        # input: 1 x BOARD_SIZE**2 x 6 channels
        self.conv1 = nn.Conv2d(6, 6, 5)
        self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, BOARD_SIZE*BOARD_SIZE + 1)
        # output: 1 x BOARD_SIZE**2 + 1

    def forward(self, x):
        x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        x = torch.flatten(x, 1) # flatten all dimensions except batch
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x    

In [12]:
net = CNN()
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

In [13]:
def export_model(model, name="model.pth"):
    torch.save(model.state_dict(), "models/" + name)

def import_model():
    
    pass

def train_model():
    pass

In [14]:
# for epoch in range(2):  # loop over the dataset multiple times
#     running_loss = 0.0
#     for i, data in enumerate(trainloader, 0):
#         # get the inputs; data is a list of [inputs, labels]
#         inputs, labels = data

#         # zero the parameter gradients
#         optimizer.zero_grad()

#         # forward + backward + optimize
#         outputs = net(inputs)
#         loss = criterion(outputs, labels)
#         loss.backward()
#         optimizer.step()

#         # print statistics
#         running_loss += loss.item()
#         if i % 2000 == 1999:    # print every 2000 mini-batches
#             print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}')
#             running_loss = 0.0

# Monte Carlo Tree Search

1. Selection
    - Taverse the tree to find greatest UCB-score
2. Expansion
    - If the selected leaf node has been visited before expand by adding weighted game action
3. Rollout
    - Simulate the game until end-condition from the expanded leaf
4. Back-propagation
    - Updating the value of each ancestor node of the expanded leaf


In [15]:
# TODO: GymGo.gogame.move/validmove/elns for å nulle ut actionspace
def get_legal_move(env):
    board_shape = env.state().shape[1:]
    pass_id = np.prod(board_shape)
    action = env.action_space.sample() # pick random action
    action2d = action // board_shape[0], action % board_shape[1], action
    while action2d[2] != pass_id and env.state()[INVALID, action2d[0], action2d[1]] == 1:
        action = env.action_space.sample() # pick random action
        action2d = action // board_shape[0], action % board_shape[1], action
    return action2d[2]

def equals(state_a, state_b):
    for i in range(0, 6):
        if not np.array_equal(state_a[i], state_b[i]):
            return False
    return True

In [16]:
class Node():
    def __init__(self, env, parent, action):
        self.env : gym.Env = env # This env will be altered by the other player
        self.value : int = 0 # Value estimate
        self.trials : int = 0 # Number of trials for this node
        self.parent : Node = parent # Parent node of this node
        self.children : list[Node] = [] # List of children of this node
        self.action : int = action # The step action made by this node
    
    # calculate a Upper Confidence Bound
    def ucb(self, total_trials):
        return self.value + ( UCB_C * sqrt(log(total_trials) / self.trials) )
    
    # Add a new node to a leaf node
    def expansion(self, move_weights):
        if self.env.done:
            return
        board_shape = self.env.state().shape[1:]
        pass_id = np.prod(board_shape)
        for action in range(0, pass_id + 1):
            action2d = action // board_shape[0], action % board_shape[1]
            if action == pass_id or self.env.state()[3, action2d[0], action2d[1]] == 0:
                child_env = copy.deepcopy(self.env)
                child_env.step(action)
                child = Node(child_env, self, action)
                child.value = move_weights[action]
                self.children.append(child)

    # Simulate game from current move until end-condition returning the score
    def rollout(self, move_selection_method):
        if self.env.done:
            return self.env.reward()
        
        rollout_env = copy.deepcopy(self.env)
        rollout_result = 0
        done = False
        while not done:
            random_action = move_selection_method(rollout_env)
            _, reward, done, _ = rollout_env.step(random_action)
            rollout_result += reward
        return rollout_result

In [17]:
class Monte_Carlo_Tree_Search():
    def __init__(self, size, ml_model):
        self.env : gym.Env = gym.make('gym_go:go-v0', size=size, reward_method='heuristic')
        self.env.reset()
        self.number_of_trials : int = 0
        self.root = Node(self.env, None, None)
        self.ml_model = ml_model
    
    # Gets the weights of all moves from the Machine Learning model
    def get_move_weights(self, state):
        # Do something to get this from ml_model
        move_weights = np.ones(ACTIONSPACE_LENGHT)
        
        board_shape = state.shape[1:]
        for i in range(len(move_weights) - 1):
            action2d = i // board_shape[0], i % board_shape[1], i
            if state[INVALID, action2d[0], action2d[1]] == 1:
                move_weights[i] = 0.0

        return move_weights

    # Gets a weighted move for the given env
    def get_weighted_move(self, env : gym.Env):
        move_weights = self.get_move_weights(env.state())
        return random_weighted_action(move_weights)

    # Update scores of all parent nodes after rollout
    def back_propagation(self, rollout_node: Node, rollout_result):
        current_node = rollout_node
        while current_node != None:
            current_node.trials += 1
            if turn(self.env.state()) == BLACK:
                current_node.value -= rollout_result
            if turn(self.env.state()) == WHITE:                
                current_node.value += rollout_result
            current_node = current_node.parent
        self.number_of_trials += 1
    
    # Find and return the leaf node with the highest UCB-score 
    def selection(self):
        selected_child = self.root
        current_node = self.root
        while len(current_node.children) > 0:
            current_best_ucb = -inf
            for child in current_node.children:
                if child.trials == 0:
                    return child

                child_ucb = child.ucb(self.number_of_trials)

                if not child.env.done and child_ucb > current_best_ucb:
                    selected_child = child
                    current_best_ucb = child_ucb

            current_node = selected_child

        return selected_child
    
    # Explores the tree for the given number of iterations
    def run(self, iterations):
        selected_node = self.root
        selected_node.expansion(self.get_move_weights(selected_node.env.state()))
        selected_node = self.root.children[0]

        run = 0
        while run < iterations:
            selected_node = self.selection()
            # print("Run:", run, ": Selection", selected_node.action)

            # if selected_node.env.done:
            #     self.back_propagation(selected_node, selected_node.env.reward())
            #     run += 1
            #     continue

            if selected_node.trials > 0:
                # print("Run:", run, ": Expansion:")
                selected_node.expansion(self.get_move_weights(selected_node.env.state()))
                selected_node = selected_node.children[0]

            rollout_result = selected_node.rollout(self.get_weighted_move)
            # print("Run:", run, ": Rollout", rollout_result)
            
            self.back_propagation(selected_node, rollout_result)
            run += 1
    
    # searches the tree for a spesific state
    def find_node_from_state(self, state, node: Node = None):
        if node is None:
            node = self.root
        if equals(node.env.state(), state):
            return node

        for child in node.children:
            if equals(child.env.state(), state):
                return child
            
            res = self.find_node_from_state(state, child)
            if res != None and equals(res.env.state(), state):
                return res

    # Attempts to find the best move from the tree by searching for the state and finding the best child for that state
    def get_move_from_env(self, env, node: Node = None):
        if node is None:
            node = self.root
        node = self.find_node_from_state(env.state(), node)
        
        if node != None:
            # print("Found node for state:")
            node.env.render()
            # print("Finding best move from:", len(node.children), "available explored moves.")
            best_child = None
            current_best_value = -inf
            for child in node.children:
                if child.value > current_best_value:
                    best_child = child
                    current_best_value = child.value

            if best_child != None:
                return best_child
        
        return None

    def get_training_data_from_tree(self):
        training_data = []
        self.get_training_data_from_node(training_data, self.root)
        return training_data
        
    def get_training_data_from_node(self, training_data : list, current_node):
        node_tuple = [current_node.env.state(), [0] * (BOARD_SIZE ** 2 + 1)]
        for child in current_node.children:
            node_tuple[1][child.action] = child.value
            training_data.append(node_tuple)
            self.get_training_data_from_node(training_data, child)
        training_data.append(node_tuple)

In [18]:
def play_game_no_render(advesary_function, model : Monte_Carlo_Tree_Search, go_env: gym.Env):
    go_env.reset()
    done = go_env.done
    turn_nr = 0
    while not done:
        action = advesary_function(go_env)
        _, _, done, _ = go_env.step(action)

        if done:
            continue

        node = model.get_move_from_env(go_env)
        action = model.get_weighted_move(go_env)
        if node != None:
            action = node.action  
        _, _, done, _ = go_env.step(action)
        turn_nr += 1
        if turn_nr > 300:
            break

    if node != None:
        model.back_propagation(node, go_env.reward())
    
    return go_env

In [19]:
model = Monte_Carlo_Tree_Search(BOARD_SIZE, None)
model.run(100)

In [20]:
for i in range(1,10):
    env = play_game_no_render(get_legal_move, model, copy.deepcopy(model.env))
    if env.done:
        print("Game finished within 300 turns:")
    else:
        print("Game stopped after 300 turns:")
        
    if env.reward() < 0:
        print("White won!")
    if env.reward() > 0:
        print("Black won!")
    if env.reward() == 0:
        print("It's a draw!")

	0 1 2 3 4 
0	╔═╤═╤═╤═╗
1	╟─┼─┼─┼─○
2	╟─┼─┼─┼─╢
3	╟─┼─┼─┼─╢
4	╚═╧═╧═╧═╝
	Turn: WHITE, Game State (ONGOING|PASSED|END): ONGOING
	Black Area: 25, White Area: 0

Game finished within 300 turns:
Black won!
	0 1 2 3 4 
0	╔═╤═╤═╤═╗
1	╟─┼─┼─┼─╢
2	╟─┼─┼─┼─╢
3	╟─┼─┼─┼─╢
4	╚═○═╧═╧═╝
	Turn: WHITE, Game State (ONGOING|PASSED|END): ONGOING
	Black Area: 25, White Area: 0

Game finished within 300 turns:
Black won!
	0 1 2 3 4 
0	╔═╤═╤═╤═╗
1	╟─┼─┼─┼─╢
2	╟─┼─○─┼─╢
3	╟─┼─┼─┼─╢
4	╚═╧═╧═╧═╝
	Turn: WHITE, Game State (ONGOING|PASSED|END): ONGOING
	Black Area: 25, White Area: 0

Game finished within 300 turns:
Black won!
	0 1 2 3 4 
0	╔═○═╤═╤═╗
1	╟─┼─┼─┼─╢
2	╟─┼─┼─┼─╢
3	╟─┼─┼─┼─╢
4	╚═╧═╧═╧═╝
	Turn: WHITE, Game State (ONGOING|PASSED|END): ONGOING
	Black Area: 25, White Area: 0

Game finished within 300 turns:
White won!
	0 1 2 3 4 
0	╔═╤═╤═╤═╗
1	╟─┼─┼─┼─╢
2	╟─┼─┼─┼─╢
3	╟─┼─┼─┼─╢
4	╚═○═╧═╧═╝
	Turn: WHITE, Game State (ONGOING|PASSED|END): ONGOING
	Black Area: 25, White Area: 0

Game finished within 300 turns:
W

# Convolutional Neural Network

In [21]:
# Run on GPU if possible
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print(f"Using device '{device}'")

Using device 'cpu'


In [22]:
class CNN(nn.Module):

    def __init__(self):
        super().__init__()
        # self.conv1 = nn.Conv2d(6, 6, 5)
        # self.pool = nn.MaxPool2d(2, 2)
        self.conv2 = nn.Conv2d(6, 16, 5)
        self.fc1 = nn.Linear(16 * 5 * 5, 120)
        self.fc2 = nn.Linear(120, 84)
        self.fc3 = nn.Linear(84, BOARD_SIZE**2 + 1)

    def forward(self, x):
        x = torch.from_numpy(x)
        x = x.to(device)
        x = x.float()
        # x = self.pool(F.relu(self.conv1(x)))
        x = self.pool(F.relu(self.conv2(x)))
        # x = torch.flatten(x, 1) # flatten all dimensions except batch
        x = F.softmax(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = self.fc3(x)
        return x    

In [23]:
net = CNN()
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

def export_cnn(name="net"):
    torch.save(net.state_dict(), "cnns/" + name + ".pth")

def import_cnn(path):
    net = CNN()
    net.load_state_dict(torch.load(path))

def train_cnn(data):
    net.to(device)
    for epoch in range(2):  # loop over the dataset multiple times
        running_loss = 0.0
        
        for i in range(len(data)):
            inputs, labels = data[0][i], data[1][i]
            optimizer.zero_grad()
            outputs = net(inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            running_loss += loss.item()
            if i % 2000 == 1999:    # print every 2000 mini-batches
                print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}')
                running_loss = 0.0

        # for i, data in enumerate(trainloader, 0):
        #     # get the inputs; data is a list of [inputs, labels]
        #     inputs, labels = data

        #     # zero the parameter gradients
        #     optimizer.zero_grad()

        #     # forward + backward + optimize
        #     outputs = net(inputs)
        #     loss = criterion(outputs, labels)
        #     loss.backward()
        #     optimizer.step()

        #     # print statistics
        #     running_loss += loss.item()
        #     if i % 2000 == 1999:    # print every 2000 mini-batches
        #         print(f'[{epoch + 1}, {i + 1:5d}] loss: {running_loss / 2000:.3f}')
        #         running_loss = 0.0


In [24]:
data = model.get_training_data_from_tree()
net = CNN() # reset neural network
train_cnn(data)

AttributeError: 'CNN' object has no attribute 'pool'