In [2]:
from __future__ import division
import numpy as np
import random

# Connect 4

The game is fully described by the current state of the grid, which is an array of size 6x7 initially filled with zeros. 
Players are called +1 and -1. Player +1 starts and everytime they play, their ID is placed in the grid array.

Note that... For classic Connect Four played on 6 high, 7 wide grid, there are 4,531,985,219,092 positions for all game boards populated with 0 to 42 pieces (wikipedia). So it's a bit hard a visit them all. 

In [3]:


def create_grid(sizeX=6, sizeY=7):
    return np.zeros((sizeX, sizeY), dtype=int)

def reset(grid):
    return np.zeros(grid.shape, dtype=int)

def play(grid_before_play, column, player=None):
    """
    Play at given column, if no player provided, calculate which player must play, otherwise force player to play
    Return new grid and winner
    
    grid_before_play: array containing zero if location empty, 1 or -1 if the respective player already played there. 
    column: index of the column
    player: index of the player (1 or -1)
    """
    grid = grid_before_play.copy()
    if player == None:
        player = get_player_to_play(grid)

    if can_play(grid, column):
        row = grid.shape[0] - 1 - np.sum(np.abs(grid[:, column]), dtype=int)
        grid[row, column] = player
    else:
        raise Exception('Error : Column {} is full'.format(column))
    return grid

def get_row(grid, column):
    """
    return the index of the row where the player's coin is going to be after playing column in state grid.
    """
    return grid.shape[0] - 1 - np.sum(np.abs(grid[:, column]), dtype=int)
    

def can_play(grid, column):
    """
    Check if the given column is free
    """
    return grid[0, column ] == 0

def valid_moves(grid):
    """
    returns the list of available next moves given the current state (grid)
    """
    return [i for i in range(grid.shape[1]) if can_play(grid, i)]

def has_won(grid, player, row, column):
    """
    Check if player has won after playing column. 
    Must check if the new state of the grid after (row, column) is played contains a victory_pattern.
    """
    player += 1
    grid += 1
    row_str = ''.join(grid[row, :].astype(str).tolist())
    col_str = ''.join(grid[:, column].astype(str).tolist())
    up_diag_str = ''.join(np.diagonal(grid, offset=(column - row)).astype(str).tolist())
    down_diag_str = ''.join(np.diagonal(np.rot90(grid), offset=-grid.shape[1] + (column + row) + 1).astype(str).tolist())

    grid -= 1
    victory_pattern = str(player)*4
    if victory_pattern in row_str:
        return True
    if victory_pattern in col_str:
        return True
    if victory_pattern in up_diag_str:
        return True
    if victory_pattern in down_diag_str:
        return True

    return False


def get_player_to_play(grid):
    """
    Get player to play given a grid
    """
    player_1 = 0.5 * np.abs(np.sum(grid-1))
    player_2 = 0.5 * np.sum(grid + 1)

    if player_1 > player_2:
        return 1
    else:
        return -1



def get_next_state(grid, move):
    """
    Returns the next grid after chosing column move in grid
    """
    if can_play(grid, move) == False:
        print(grid)
        print(move)
        #this should not happen if there is no bug in the selection algorithm
        #raise Exception('Error : Column {} is full'.format(move))
        print("wrong move")
        
    
    next_grid = play(grid, move)
    return next_grid


def get_winning_moves(grid, moves, player):
        return [move for move in moves if has_won(grid, player, get_row(grid,move),move)]

In [4]:
# TESTS
test_grid = create_grid()
test_grid = play(test_grid, 4)
test_grid = play(test_grid, 4)

In [5]:
test_grid = play(test_grid, 4) # the state is not preserved

In [6]:
can_play(test_grid, 3) 

True

In [7]:
valid_moves(test_grid)

[0, 1, 2, 3, 4, 5, 6]

In [8]:
test_grid = play(test_grid, 3)
test_grid = play(test_grid, 3)
test_grid = play(test_grid, 5)
test_grid = play(test_grid, 5)
test_grid = play(test_grid, 6)

In [9]:
test_grid

array([[ 0,  0,  0,  0,  0,  0,  0],
       [ 0,  0,  0,  0,  0,  0,  0],
       [ 0,  0,  0,  0,  0,  0,  0],
       [ 0,  0,  0,  0, -1,  0,  0],
       [ 0,  0,  0, -1,  1, -1,  0],
       [ 0,  0,  0,  1, -1,  1,  1]])

In [10]:
has_won(test_grid, -1, 5, 6)

False

# Tree structure (Node class)

In [12]:
class Node:

    def __init__(self, grid_state, winning, move_from_parent, parent):
        self.parent = parent
        self.move = move_from_parent
        self.win = 1 if winning else 0
        self.games = 0
        
        self.state = grid_state # flattened grid containing 0, +1 and -1
        self.children = {} # Dict[int, Node], will be updated by the expand function of the agent
        self.winner = winning



    def get_state(self):
        print(self.state)
        return self.state


# Tree Search Algorithm

## Default Policy : random choice among valid moves

In [15]:
def random_play(grid, printing=False):
    """
    Play a random game starting by state and player
    Return winner
    """
    move_seq = []
    while True:
        moves = valid_moves(grid)
        if len(moves) == 0:
            return 0
        selected_move = random.choice(moves)
        move_seq.append(selected_move)
        
        player = get_player_to_play(grid)
        row = get_row(grid, selected_move )
        grid = play(grid, selected_move)
        winner = has_won(grid, player, row, selected_move)
        if winner :
            if printing:
                print("sequence of moves:", move_seq)
                print("final grid: ")
                print(grid)
            return player

In [16]:
test_grid = create_grid()
random_play(test_grid, printing=True)

sequence of moves: [0, 5, 3, 0, 6, 6, 2, 6, 3, 5, 1]
final grid: 
[[ 0  0  0  0  0  0  0]
 [ 0  0  0  0  0  0  0]
 [ 0  0  0  0  0  0  0]
 [ 0  0  0  0  0  0  1]
 [ 1  0  0 -1  0  1  1]
 [-1 -1 -1 -1  0  1 -1]]


-1

### Question : Can you think of a "better" Default policy ?

## UCB generic function

In [29]:
import operator

def break_ties(ucbs):
    mixer = np.random.random(ucbs.size)  # Shuffle to avoid always pulling the same arm when ties
    UCB_indices = list(np.lexsort((mixer, ucbs)))  # Sort the indices
    output = UCB_indices[::-1]  # Reverse list
    chosen_arm_index = output[0]
    chosen_arm = arms[chosen_arm_index,:]

    return chosen_arm

def ucb_selection(node, C_p, player, max_value=100.):
    """
    
    """
    #from pdb import set_trace; set_trace()
    
    moves = valid_moves(node.state)
    ucbs = np.empty(7)
    
    for move in moves:
        child = node.children[move]
        ucbs[move] = child.win/child.games + C_p * np.sqrt(2 * np.log(node.games) / child.games)
    
    return np.argmax(ucbs)
    


In [30]:
np.max(np.array([1,3,5,5,5]))

5

## MCTS

In [31]:

# define the 
class MCTS():
    def __init__(self, C_p, root_node, train_time = 100, max_value=10., printing=False):
        """
        C_p : exploration boost constant
        root_state: array representing the game before any round happened, 
        train_time : integer, number of games to be played for training using an MCTS algorithm,
        max_value: max_value of the ucbs
        """
        self.C_p = C_p
        self.max_value = max_value #remove or update
        self.root = root_node
        self.train_time = train_time
        self.printing=printing
        
        
    def select_move(self, current_node):
        """
        Must take a grid (array) as input, not a node, because it is the interacting function
        Select best move and advance
        :return:
        int : the move to be taken from current node 
        
        """
        # That's a hack :
        # check if a child is a winning state, if yes take the according move
#         winners = [current_node.children[i] for i in current_node.children.keys() if current_node.children[i].winner]
#         if len(winners) > 0:
#             return winners[0].move

#         player = get_player_to_play(current_node.state)
        
#         other_player = -1 * player
#         valid_moves_other_player = valid_moves(current_node.state)
#         winning_moves = get_winning_moves(current_node.state, valid_moves_other_player, other_player)
        
        return ucb_selection(current_node, self.C_p, player, self.max_value )
        
        
        
            
    def simulate(self, current_node):
        """
        from current node, generates the sequence of nodes selected by UCT and returns the last one,
        that is the one whose UCT move is does not correspond to any existing child. 
        
        return: Tuple(Node, int) : Last visited node and the chosen move for future expansion.
        
        """
        
        player = get_player_to_play(current_node.state)
        
        while True:
            moves = valid_moves(current_node.state)
            for move in moves:
                if move not in current_node.children:
                    return current_node, move
            
            
            move = ucb_selection(current_node, self.C_p, player, self.max_value)
            
            if move not in current_node.children:
                
                return current_node, move
            
            current_node = current_node.children[move] # update current node to next stage in the tree
            
        
            
    def expand(self, current_node, current_player, move):
        """
        creates a new node whose parent is current node and that is reached by choosing column move
        """
        child_state = get_next_state(current_node.state, move)
        winning = 0
        child_has_won = 0
        row = get_row(current_node.state, move)
        if has_won(child_state, current_player, row, move):
            winning = 1 # that move was winning
            child_has_won = 1
        else :
            winner = random_play(child_state)
            if winner == current_player:
                child_has_won = 1 #that move was winning        
            
        new_node = Node(child_state, winning, move, current_node)
        new_node.games += 1
        new_node.win += child_has_won
          
        current_node.children[move] = new_node # current node now points to its new child
        
        
        return new_node
    
    def backward_propagation(self, expanded_node):
        """ propagates back the value received by the expanded node following rollout"""
        winning = expanded_node.win
        current_node = expanded_node
        
        while current_node.parent is not None: 
            
            
            current_node.parent.win += winning
            
            current_node.parent.games += 1
            current_node = current_node.parent
            winning = 1 - winning # propagate opposite values along the path for each player
            
    def get_children_values(self, node):
        """
        returns a dict containing the UCBs of the children of the given node
        """
        values = dict()
        for move, child in node.children.items():
            values[move] = child.win / child.games + self.C_p * np.sqrt(np.log(node.games)/child.games)
            
        return values
            
        
        


    def train(self, node):
        """
        Builds and searches the tree and learns the node values to lead the decisions
        
        """
        
        
        t=0
        while t < self.train_time:
            
            # Select initial path in the "known" part of the tree
            node_to_expand, move = self.simulate(node)
            
            
            if can_play(node_to_expand.state, move):
                
                next_grid = get_next_state(node_to_expand.state, move)

                player = get_player_to_play(node_to_expand.state)

                new_node = self.expand(node_to_expand, player, move)
                self.backward_propagation(new_node)
                t += 1
                if (t % 100)==0 and self.printing:
                    print("100 more training steps")
            else:
                pass

    
    

In [32]:
root = create_grid()
root_node = Node(root, 0, None, None)
mcts = MCTS(2, root_node, train_time = 100, printing=True)


mcts.train(mcts.root)

100 more training steps


In [33]:
ucb_selection(root_node,2, -1)

5

In [34]:
mcts.train(mcts.root)

100 more training steps


In [35]:
print(mcts.root.games)

200


In [36]:
mcts.get_children_values(mcts.root)

{0: 1.350602471000455,
 1: 1.3210100743226403,
 2: 1.264267874075378,
 3: 1.242686352884689,
 4: 1.264267874075378,
 5: 1.3071678954223052,
 6: 1.3138118153593648}

# Playing with the trained tree

The print function below allows you to visualize the grid. Feel free to use this function anywhere you need for debugging or testing.  

In [38]:

def utils_print(grid):
    print_grid = grid.astype(str)
    print_grid[print_grid == '-1'] = 'X'
    print_grid[print_grid == '1'] = 'O'
    print_grid[print_grid == '0'] = ' '
    res = str(print_grid).replace("'", "")
    res = res.replace('[[', '[')
    res = res.replace(']]', ']')
    print(' ' + res)
    print('  ' + ' '.join('0123456'))




In [39]:
root_state = create_grid() 
root_node = Node(root_state, 0, None, None)
ai = MCTS(2., root_node, train_time=1000)

ai.train(root_node)


print('training finished')

training finished


In [40]:
ai.train(ai.root)

In [41]:
N_pretrainings = 1000

In [None]:
for t in range(N_pretrainings):
    ai.train(ai.root)
    

In [None]:
ai.root.games

In [None]:
print(ai.get_children_values(ai.root))
#print("interesting, it seems like playing in the middle is a bit better on average")

In [215]:
utils_print(ai.root.children[3].children[5].children[4].children[0].state)

 [             ]
 [             ]
 [             ]
 [             ]
 [             ]
 [O     X X O  ]
  0 1 2 3 4 5 6


In [223]:
print(ai.get_children_values(ai.root.children[3].children[5].children[4].children[0]))

{}


In [None]:

while True:
    # test AI with real play
     
    rounds = 0
    current_node = ai.root
    utils_print(root_node.state)
    while True:
        player = get_player_to_play(current_node.state)
        if (rounds % 2) == 0:
            #player = -1
            print('You play, enter a column index')
            move = int(input()) # the human player chooses a move in the interface
            while move not in valid_moves(current_node.state):
                print("Invalid move, please select another column")
                move = int(input())
        else:
            print('AI plays')
            #player = 1
            print(ai.get_children_values(current_node))
            move = ai.select_move(current_node) #ai move
            
        
        # compute the next grid state after the move occurs    
        next_state = play(current_node.state, move)
        row = get_row(current_node.state, move) # just for the has_won function
        
        
        # check if this was a winning move
        if has_won(next_state, player, row, move) == True:
            if player == -1:
                
                print(ai.get_children_values(current_node))
                print(" You win ! Congrats")
                print("Number of rounds : ")
                print(rounds)
                # use this experience to train your AI: 
                #TODO
            else:
                print("you were just beaten by a mediocre AI")
                print("Number of rounds : ")
                print(rounds)
            
            utils_print(next_state) #display last grid before restarting a game
            break
        
        if move in current_node.children:
            current_node = current_node.children[move] 
        else:
            # the chosen move has never been seen by the AI... let's allow it to quickly train
            next_node = ai.expand(current_node, player,move)
            ai.train(current_node)
            current_node = next_node

        utils_print(next_state)
    
        
        
        rounds += 1


#from pdb import set_trace; set_trace()


 [             ]
 [             ]
 [             ]
 [             ]
 [             ]
 [             ]
  0 1 2 3 4 5 6
You play, enter a column index
4
 [             ]
 [             ]
 [             ]
 [             ]
 [             ]
 [        X    ]
  0 1 2 3 4 5 6
AI plays
{0: 0.9054622998379851, 1: 0.9229887723764063, 2: 0.8194148726887243, 3: 0.8286991577880327, 4: 0.8696122782262095, 5: 0.9054622998379851, 6: 0.9143539574243043}
 [             ]
 [             ]
 [             ]
 [             ]
 [        O    ]
 [        X    ]
  0 1 2 3 4 5 6
You play, enter a column index
3
 [             ]
 [             ]
 [             ]
 [             ]
 [        O    ]
 [      X X    ]
  0 1 2 3 4 5 6
AI plays
{0: 2.050548270340074, 1: 2.1934054131972167, 2: 2.097631871758759, 3: 1.9309652050920927, 4: 2.1934054131972167, 5: 2.097631871758759, 6: 2.264298538425426}
 [             ]
 [             ]
 [             ]
 [             ]
 [        O    ]
 [      X X   O]
  0 1 2 3 4 5 6
You pl

### Question: What do you think of the behavior of the AI. What is good ? What could be improved ? 


### Question: For the moment, all the games you play with the AI are not really used to train it because you do not backpropagate the outcomes of your games. Can you code this ? 


## Evaluate your AI

To evaluate the quality of your player, you want to make it play against several other ones and get a winning rate. The first obvious adversary is the random player. 

### Question : Can you code a simple test that returns a winning score after playing against a uniform random player ? 

In [None]:
N_games = 5
wins = 0
nb_rounds = 0

for n in range(N_games):
    if (n % 10 )==0 and n!=0:
        print("Played 1 more games.")
        
    current_node = ai.root
   
    rounds = 0
    while True:
        player = get_player_to_play(current_node.state)
        if (round % 2) == 0:  
            #player = -1
            moves = valid_moves(current_node.state)
            move = random.choice(moves)
            
        else: 
            #player = 1
            move = ai.select_move(current_node) #ai move
            

        next_state = play(current_node.state, move)
        row = get_row(current_node.state, move)

        if has_won(next_state, player, row, move) == True:
            if player == 1:
                wins +=1
            nb_rounds += rounds
            print(rounds)
            
            
            break

        if move in current_node.children:
            current_node = current_node.children[move] 
        else:
            next_node = ai.expand(current_node, player,move)
            ai.train(current_node)
            current_node = next_node

        rounds += 1

# compute score
score = wins / N_games
print(" The score of your AI is:")
print(score)
print("---------------------------")
print("The average number of rounds per game is: ")
print(nb_rounds / N_games)

Indeed, playing against uniform random is a bit of a cheat... it would be better to test your AI against a better player... but who ? You could play against a human (i.e. you) a hundred times and report how many times you won. But that's quite long. 

### Bonus Question : Can you think of any other way to evaluate an agent ? Could you code it ?