# Monte Carlo Tree Search Lab

In this lab, we'll be using the game connect four, as a vehicle for learning MinMax and Monte Carlo Tree Search.
We'll also introduce concepts, such as state, that'll stay relevant throughout the course.
Expect to lose in connect four to the algorithm at the end of the lab.

## Setup
This section you won't need to edit, but it is worth skimming through—this is where we declare the objects you'll be interacting with througout the lab.

In [1]:
# imports
import random
from typing import List, Tuple
import time
import numpy as np
from collections import defaultdict
from copy import deepcopy  # world -> thought

In [2]:
# world and world model
class State:
    def __init__(self, cols=4, rows=3, win_req=3): # 7, 6, 4
        self.board = [['.'] * cols for _ in range(rows)]
        self.heights = [1] * cols
        self.num_moves = 0
        self.win_req = win_req

    def get_avail_actions(self) -> List[int]:
        return [i for i in range(len(self.board[0])) if self.heights[i] <= len(self.board)]
  
    def put_action(self, action, agent):
        self.board[len(self.board) - self.heights[action]][action] = agent.name
        self.heights[action] += 1
        self.num_moves += 1

    def is_over(self):
        return self.num_moves >= len(self.board) * len(self.board[0])

    def __repr__(self):
        return self.__str__()
    
    def __str__(self):
        header  = " ".join([str(i) for i in range(len(self.board[0]))])
        line    = "".join(["-" for _ in range(len(header))])
        board   = [[e for e in row] for row in self.board]
        board   = '\n'.join([' '.join(row) for row in board])
        return  '\n' + header + '\n' + line + '\n' + board + '\n'

In [3]:
# evaluate the utility of a state
def utility(state: 'State'):
    board = state.board
    n_cols = len(board[0]) - 1
    n_rows = len(board) - 1

    def diags_pos():
        """Get positive diagonals, going from bottom-left to top-right."""
        for di in ([(j, i - j) for j in range(n_cols)] for i in range(n_cols + n_rows - 1)):
            yield [board[i][j] for i, j in di if i >= 0 and j >= 0 and i < n_cols and j < n_rows]

    def diags_neg():
        """Get negative diagonals, going from top-left to bottom-right."""
        for di in ([(j, i - n_cols + j + 1) for j in range(n_cols)] for i in range(n_cols + n_rows - 1)):
            yield [board[i][j] for i, j in di if i >= 0 and j >= 0 and i < n_cols and j < n_rows]

    cols = list(map(list, list(zip(*board))))
    rows = board
    diags = list(diags_neg()) + list(diags_pos())
    lines = rows + cols + diags
    strings = ["".join(s) for s in lines]
    for string in strings:
        if 'OOO' in string:
            return -1
        if 'XXX' in string:
            return 1
    return 0


In [4]:
# parrent class for mcts, minmax, human, and any other idea for an agent you have
class Agent:
    def __init__(self, name: str):
        self.name: str = name

    def get_action(self, state: State):
        return random.choice(state.get_avail_actions())


class Human:
    def __init__(self, name):
        self.name = name

    def get_action(self, state: State):
        given_input = int(input())
        if (given_input in state.get_avail_actions()):
            return given_input
        else:
            print('Pick a valid input')


In [5]:
# connecting states and agents
class Game:
    def __init__(self, agents: Tuple[MCTSNode, Human]):
        self.agents = agents
        self.state = State()

    def play(self):
        while utility(self.state) == 0 and not self.state.is_over():
            for agent in self.agents:
                if utility(self.state) == 0 and not self.state.is_over():
                    action = agent.get_action(self.state)
                    self.state.put_action(action, agent)
                    print(self.state)


NameError: name 'MCTSNode' is not defined

## Exercise 0: Discuss and Run game
Let's discuss if the `utility` function best belongs to the state or the agent.
Put the state, agent and game class together so that a game is run.

In [None]:
agents = (MCTSNode(state=State()), Human('X'))
game = Game(agents)
game.play()


3

0 1 2 3
-------
. . . .
. . . .
. . . O


0 1 2 3
-------
. . . .
. . . .
. X . O

3

0 1 2 3
-------
. . . .
. . . O
. X . O


0 1 2 3
-------
. . . .
. X . O
. X . O

3

0 1 2 3
-------
. . . O
. X . O
. X . O



## Exercise 1: Human Agent
Make a child class of `Agent` called `Human`, with the `get_action` method overwritten to take input from you. *hint*: use `int(input())`

In [None]:
class Human:
    def __init__(self, name):
        self.name = name

    def get_action(self, state: State):
        given_input = int(input())
        if (given_input in state.get_avail_actions()):
            return given_input
        else:
            print('Pick a valid input')


## Exercise 2: Gekko
Make a child class of `Agent` called `Gekko`, with a `get_action` that is very short sighted (greedy). You can basically do whatever you want here, as long as your output a valid action. You might want to make a `utility` function for the agent, and perhaps some helper functions. Write a two line comment explaining your Gekko's heuristic.

In [None]:
# Gekko works by counting the amount of already placed "O"'s and, if it is valid to continue that string towards four "O"s in a an either vertical or horizontal row, it will do so.
# It never considers blocking the opponent, or diagnals because I found them to be buggy and ignoring them gave better "greedy" results.

class Gekko:
    def __init__(self, name):
        self.name = name

    def determine_col_o_count(self, string):
        no_dots = string.replace(".", "")
        no_x_list = no_dots.split("X", 1)
        no_x_string = no_x_list[0]
        return len(no_x_string)

    def determine_row_o_count_and_moves(self, string):
        o_temp_counter = 0
        temp_iterator = 0
        valid_option_placements = []
        for char in string:
            if char == "X":
                if temp_iterator < 3:
                    o_temp_counter = 0
                    valid_option_placements.clear()
                if temp_iterator > 3:
                    break
            # Not optimal in terms of finding the best move...
            if char == "O" and o_temp_counter < 3:
                o_temp_counter = o_temp_counter + 1
            if char == ".":
                valid_option_placements.append(temp_iterator)
            temp_iterator = temp_iterator + 1
        if (len(valid_option_placements) + o_temp_counter) > 3:
            valid_option_placements.append(o_temp_counter)
            return valid_option_placements
        else:
            valid_option_placements.clear()
            return valid_option_placements

    def get_action(self, state: State):
        # Board stuff
        board = state.board
        col_height_list = state.heights
        n_cols = len(board[0]) - 1
        n_rows = len(board) - 1

        def diags_pos():
            """Get positive diagonals, going from bottom-left to top-right."""
            for di in ([(j, i - j) for j in range(n_cols)] for i in range(n_cols + n_rows - 1)):
                yield [board[i][j] for i, j in di if i >= 0 and j >= 0 and i < n_cols and j < n_rows]

        def diags_neg():
            """Get negative diagonals, going from top-left to bottom-right."""
            for di in ([(j, i - n_cols + j + 1) for j in range(n_cols)] for i in range(n_cols + n_rows - 1)):
                yield [board[i][j] for i, j in di if i >= 0 and j >= 0 and i < n_cols and j < n_rows]

        cols = list(map(list, list(zip(*board))))
        rows = board
        diags = list(diags_neg()) + list(diags_pos())
        lines = rows + cols + diags
        strings = ["".join(s) for s in lines]

        # New attempt at columns
        col_strings = ["".join(s) for s in cols]
        col_iterator = 0

        best_col_o_count = 0
        best_col_option = -1

        for col_height in col_height_list:
            # This is a valid column to pursue
            if col_strings[col_iterator][3] != "X" and col_strings[col_iterator][2] != "X" and col_strings[col_iterator][1] != "X" and col_strings[col_iterator][0] != "X" and col_height < 7:
                col_str = col_strings[col_iterator]
                o_count = self.determine_col_o_count(col_str)
                if o_count > best_col_o_count:
                    best_col_o_count = o_count
                    best_col_option = col_iterator
            # This is NOT a valid column to pursue
            else:
                if col_iterator == best_col_option:
                    best_col_option = -1
            col_iterator = col_iterator + 1

        # New attempt at rows
        row_strings = ["".join(s) for s in rows]
        row_strings.reverse()
        row_iterator = 0

        best_row_o_count = 0
        best_row_option = -1

        for row_str in row_strings:
            # This is a valid row to pursue
            if row_str[3] != "X":
                moves = self.determine_row_o_count_and_moves(row_str)
                if len(moves) > 1:
                    temp_o_count = moves[len(moves) - 1]
                    if temp_o_count > best_row_o_count:
                        best_row_o_count = temp_o_count
                        best_row_option = moves[0]

        # Due to diagnals being read wrong, I found it was best to ignore them...
        if best_col_o_count > best_row_o_count:
            if best_col_option != -1:
                return best_col_option
            else:
                return random.choice(state.get_avail_actions())
        else:
            if best_row_option != -1:
                return best_row_option
            else:
                return random.choice(state.get_avail_actions())

## *Optional exercise: MinMax (useful to have done for exercise 3)*
Make a MinMax agent:

In [None]:
class MinMax:
  def __init__(self, name):
      self.name = name

  def get_action(self, state: State):
    best_option = -1
    currently_mazimizing = True
    best_score = -10
    for action in state.get_avail_actions():
      list_test = state.get_avail_actions()
      print(list_test[0])
      state_copy = deepcopy(state)
      state_copy.put_action(action, self)
      temp_score = self.minimax(state_copy, not currently_mazimizing)
      if temp_score > best_score:
        best_option = action
        best_score = temp_score
    return best_option

  def minimax(self, state: State, maximizing):
    # Determine if we are done
    if state.is_over() and maximizing:
      return 1
    elif state.is_over() and not maximizing:
      return -1
    elif maximizing:
      temp_best_score = -10
      for action in state.get_avail_actions():
        state_copy = deepcopy(state)
        state_copy.put_action(action, self)
        score = self.minimax(state_copy, not maximizing)
        if score > temp_best_score:
          temp_best_score = score
      return temp_best_score
    elif not maximizing:
      temp_best_score = 10
      for action in state.get_avail_actions():
        state_copy = deepcopy(state)
        state_copy.put_action(action, self)
        score = self.minimax(state_copy, maximizing)
        if score > temp_best_score:
          temp_best_score = score
      return score
    else:
      return 0


In [None]:
# This did not make a big improvement, I might not have implemented it right.
# It only runs the 3 x 3 game
class MinMaxAB:
  def __init__(self, name):
      self.name = name

  def get_action(self, state: State):
    best_option = -1
    currently_mazimizing = True
    best_score = -10
    for action in state.get_avail_actions():
      list_test = state.get_avail_actions()
      print(list_test[0])
      state_copy = deepcopy(state)
      state_copy.put_action(action, self)
      alpha = -10
      beta = 10
      temp_score = self.minimax(
          state_copy, not currently_mazimizing, alpha, beta)
      if temp_score > best_score:
        best_option = action
        best_score = temp_score
    return best_option

  def minimax(self, state: State, maximizing, alpha, beta):
    # Determine if we are done
    if state.is_over() and maximizing:
      return 1
    elif state.is_over() and not maximizing:
      return -1
    elif maximizing:
      temp_best_score = -10
      for action in state.get_avail_actions():
        state_copy = deepcopy(state)
        state_copy.put_action(action, self)
        score = self.minimax(state_copy, not maximizing, alpha, beta)
        if score < beta:
            if score > temp_best_score:
                temp_best_score = score
        else:
            break
      return temp_best_score
    elif not maximizing:
      temp_best_score = 10
      for action in state.get_avail_actions():
        state_copy = deepcopy(state)
        state_copy.put_action(action, self)
        score = self.minimax(state_copy, maximizing, alpha, beta)
        if score > alpha:
            if score < temp_best_score:
                temp_best_score = score
        else:
            break
      return score
    else:
      return 0

## Exercise 3: MCTS
Same but for Monte Carlo Tree Search. See if you can beat it with a `Human`.

In [None]:
class MCTSNode:
    def __init__(self, state, parent=None, parent_action=None):
        self.name = "O"
        self.state = state
        self.parent = parent
        self.parent_action = parent_action
        self.children = []
        self._number_of_visits = 0
        self._results = defaultdict(int)
        self._results[1] = 0
        self._results[-1] = 0
        self._untried_actions = None
        self._untried_actions = self.untried_actions()
        return

    def untried_actions(self):
        self._untried_actions = self.state.get_avail_actions()
        return self._untried_actions

    def q(self):
        wins = self._results[1]
        loses = self._results[-1]
        return wins - loses

    def n(self):
        return self._number_of_visits

    def expand(self):
        action = self._untried_actions.pop()
        state_copy = deepcopy(self.state)
        state_copy.put_action(action, self)
        child_node = MCTSNode(
            state_copy, parent=self, parent_action=action)

        self.children.append(child_node)
        return child_node
    
    def is_terminal_node(self):
        return self.state.is_over()

    def rollout(self):
        current_rollout_state = self.state

        while not current_rollout_state.is_over():

            possible_moves = current_rollout_state.get_avail_actions()

            action = self.rollout_policy(possible_moves)
            current_rollout_state = deepcopy(current_rollout_state)
            current_rollout_state.put_action(action, self)
        return utility(current_rollout_state)
        

    def backpropagate(self, result):
        self._number_of_visits += 1.
        self._results[result] += 1.
        if self.parent:
            self.parent.backpropagate(result)

    def is_fully_expanded(self):
        return len(self._untried_actions) == 0

    def best_child(self, c_param=0.1):
        choices_weights = [(c.q() / c.n()) + c_param *
                       np.sqrt((2 * np.log(self.n()) / c.n())) for c in self.children]
        return self.children[np.argmax(choices_weights)]

    def rollout_policy(self, possible_moves):
        return possible_moves[np.random.randint(len(possible_moves))]

    def _tree_policy(self):
        current_node = self
        while not current_node.is_terminal_node():

            if not current_node.is_fully_expanded():
                return current_node.expand()
            else:
                current_node = current_node.best_child()
        return current_node

    def best_action(self):
        simulation_no = 1000

        for i in range(simulation_no):

            v = self._tree_policy()
            reward = v.rollout()
            v.backpropagate(reward)

        return self.best_child(c_param=0.)

    def get_action(self, initial_state: State):
        root = MCTSNode(state=initial_state)
        selected_node = root.best_action()
        print(str(selected_node.parent_action))
        return selected_node.parent_action
        

# class Node:
#     def __init__(self, state: State, parent: 'Node' = None):
#         self.children: List['Nodes'] = []
#         self.parent: 'Node' = parent
#         self.state: State = state

In [None]:
# class MCTS(Agent):
#    def __init__(self, name):
#        super(MCTS, self).__init__(name)

## *Optional exercise: Dynamic Programming*
Then use dynamic programming to make your AI more efficient. You can use the class below (or not)

In [None]:
class TranspositionTable:
    def __init__(self, size=1_000_000):
        self.size = size
        self.vals = [None] * size

    def board_str(self, state: State):
        return ''.join([''.join(c) for c in state.board])

    def put(self, state: State, utility: float):
        bstr = self.board_str(state)
        idx = hash(bstr) % self.size
        self.vals[idx] = (bstr, utility)

    def get(self, state: State):
        bstr = self.board_str(state)
        idx = hash(bstr) % self.size
        stored = self.vals[idx]
        if stored is None:
            return None
        if stored[0] == bstr:
            return stored[1]
        else:
            return None