In [1]:
import numpy as np
from numpy.random import rand
import math
from random import randint
import itertools
import matplotlib.pyplot as plt
%matplotlib inline
import random
from copy import deepcopy 
import copy

In [2]:
class ConnectN:
    """
    Connect N game simulator for two players, 1 and -1.
    
    Inputs:
    Grid size- creates a grid size x grid size square board
    N- number of tokens a player must connect to win the game
    """
    
    def __init__(self, grid_size, n):
        self.n = n
        self.grid_size = grid_size
        self.grid = np.zeros([grid_size,grid_size])
        self.finished = 0
        self.turn_num = 0
        
    def reset(self):
        self.__init__(self.grid_size, self.n)

    def check_win(self, col, row, player):
        """
        Checks if given player has connected N tokens.
        """
        for i in range(0, self.n):
            if sum(self.grid[col, row - i:row - i + self.n]) == self.n*player:
                self.finished = 1
                return 1
            if sum(self.grid[col - i: col - i + self.n, row]) == self.n*player:
                self.finished = 1
                return 1
            if col - i >= 0 and col - i + self.n - 1 < self.grid_size and row - i >= 0 and row - i + self.n - 1 < self.grid_size:
                if sum([self.grid[col - i + x, row - i + x] for x in range(0, self.n)]) == self.n*player:
                    self.finished = 1
                    return 1
            if col - i >= 0 and col - i + self.n - 1 < self.grid_size and row + i >= self.n - 1 and row + i < self.grid_size:
                if sum([self.grid[col - i + x, row + i - x] for x in range(0, self.n)]) == self.n*player:
                    self.finished = 1
                    return 1
        return 0

    def move(self, col, player):
        """
        Given player and column to move in, modifies board and increments the turn counter.
        
        Returns a tuple, where first value is return message and second value is reward.
        """
        self.turn_num += 1
        
        if self.finished == 1:
            return 1, 50
        sum_col = np.sum([abs(x) for x in self.grid[col]])
        if sum_col == self.grid_size:
            return -1, -1
        self.grid[col, sum_col] = player
        if self.check_win(col, sum_col, player) == 1:
            return 1, 50
        return 0, 0
    
    def simulate_move(self, col, player):
        """
        Tests a move and returns if it is valid or not
        """
        sum_col = np.sum([abs(x) for x in self.grid[col]])
        if sum_col == self.grid_size:
            return 1
        else:
            return 0
        
    def turn(self):
        """
        Returns which player's turn it is. First turn is player 1, second turn is player -1.
        """
        if self.turn_num%2 == 0:
            return 1
        else:
            return -1
        
    def next_possible_moves(self):
        """
        Returns array of possible columns for a next move
        """
        columns = []
        
        for i in xrange(0, self.grid_size):
            if (0 in self.grid[i]):
                columns.append(i)
                
        return columns
    
    def all_tokens_placed(self):
        """
        Returns location of all tokens (column, row) that have been placed
        """
        all_tokens = []
        
        for col in xrange(0, self.grid_size):
            for row in xrange(0, self.grid_size): 
                if self.grid[col][row] != 0:
                    all_tokens.append({"location": [col, row], "player": self.grid[col][row]})
                    
        return all_tokens
    
    def is_empty(self, col, row):
        """
        Returns if a given spot (column, row) is empty
        """
        return self.grid[col][row] == 0
    
    """
    Following streak functions check if player has token streak in the four possible win directions
    """
    def streakVertical(self, board, col, row, player):
        if row > len(board[col]) - self.n:
            return 0
        for i in range(0,self.n):
            if board[col][row + i] == -1*player:
                return 0
            if board[col][row + i] == 0:
                return i
        return self.n

    def streakHorizontal(self, board, col, row, player):
        if col > len(board) - self.n:
            return 0
        for i in range(0,self.n):
            if board[col + i][row] == -1*player:
                return 0
            if board[col + i][row] == 0:
                return i
        return self.n
    
    def streakDiagonalUp(self, board, col, row, player):
        if row > len(board[col]) - self.n or col > len(board) - self.n:
            return 0
        for i in range(0,self.n):
            if board[col + i][row + i] == -1*player:
                return 0
            if board[col + i][row + i] == 0:
                return i
        return self.n
    
    def streakDiagonalDown(self, board, col, row, player):
        if row < self.n or col > len(board) - self.n:
            return 0
        for i in range(0,self.n):
            if board[col + i][row - i] == -1*player:
                return 0
            if board[col + i][row - i] == 0:
                return i
        return self.n
    
    def print_grid(self):
        print(np.rot90(self.grid))

In [3]:
def grid_to_key(grid):
    """
    Converts ConnectN grid into string for dict indexing
    """

    key = ""

    for row in np.rot90(grid):
        for column in row:
            key += str(int(column))

    return key

In [4]:
class ConnectDict(dict):
    """
    Creates a Custom Dict that inherits from Python's native dict.
    Takes in a number of states.
    Adds keys to dict each time lookup is necessary to avoid full dict initialization.
    """
    
    def __init__(self, num_states, *arg, **kw):
        self.num_states = num_states
        super(ConnectDict, self).__init__(*arg, **kw)
        
    def __getitem__(self, key):
        if not dict.__contains__(self, key):
            dict.__setitem__(self, key, np.zeros(self.num_states))
        return dict.__getitem__(self, key)

In [5]:
class TD_Learner(object):
    """
    Base class for Temporal Difference Learners, like Sarsa and Q learning.
    """
    
    def __init__(self, task, value_table=None, epsilon=.1, discount_factor=.9, learning_rate=.5, player=1, trace_size=.1):
        
        self.num_states = task.grid_size
        self.num_actions = task.grid_size
        self.epsilon = epsilon
        self.discount_factor = discount_factor
        self.learning_rate = learning_rate
        
        if value_table == None:
            self.value_table = ConnectDict(self.num_states)
        else:
            self.value_table = value_table
            
        self.e = ConnectDict(self.num_states)
        self.player = player
        self.trace_size = trace_size
        self.last_board_state = None
        self.last_action = None

#     def reset(self):
#         self.last_board_state = None
#         self.last_action = None
    
    def softmax(self, next_board_state):
        """
        Implementation of Softmax Policy, which weights towards better actions rather
        than sampling uniformly across all possible actions (epsilon-greedy)
        """
        
        def weighted_pick(weights,n_picks):
            t = np.cumsum(weights)
            s = sum(weights)
            return np.searchsorted(t,rand(n_picks)*s)
        
        tau = .5
        key_val = grid_to_key(next_board_state.grid)
        
        vals = self.value_table[key_val]
        num = ([math.e**(float(x)/tau) for x in vals])
        
        probs = [x/sum(num) for x in num]
        best_action = weighted_pick(probs, 1)

        return best_action[0]
        

In [443]:
def run_trial(agent, MIN_ITERATIONS, MIN_EPISODES, player):
    """
    Runs the ConnectN Simulator for one player given an agent and number of iterations and episodes
    """
    
    rewards_by_iteration = []
    rewards_by_episode = []
    cumu_rewards_by_iteration = []
    cumu_rewards_by_episode = []
    
    iteration = episode = 0
    #agent.reset()
    
    new_value_table = None

    while iteration < MIN_ITERATIONS or episode < MIN_EPISODES:
        
        task.reset()
        board_state = task
        reward = None
        cumulative_reward = 0

        while iteration < MIN_ITERATIONS or episode < MIN_EPISODES:
                        
            action = agent.interact(reward, board_state)

            if task.move(action, player)[1] == 50:
                print "Won!"
                break

            return_val, reward = task.move(action, player)

            if iteration < MIN_ITERATIONS:
                #print np.rot90(task.grid)

                rewards_by_iteration.append(reward)
                if cumu_rewards_by_iteration == []:
                    cumu_rewards_by_iteration.append(reward)
                else:
                    cumu_rewards_by_iteration.append(cumu_rewards_by_iteration[-1] + reward)
                
            cumulative_reward += reward

            iteration += 1

        if episode < MIN_EPISODES:
            rewards_by_episode.append(cumulative_reward)
            if cumu_rewards_by_episode == []:
                cumu_rewards_by_episode.append(cumulative_reward)
            else:
                cumu_rewards_by_episode.append(cumu_rewards_by_episode[-1] + cumulative_reward)
        episode += 1
        
    return rewards_by_iteration, rewards_by_episode, cumu_rewards_by_iteration, cumu_rewards_by_episode

In [6]:
class Q_Learner(TD_Learner):
    """
    Implementation of Q Learning, inheriting from TD Learner base class. 
    """
    
    def __init__(self, task, value_table, known_states, epsilon=.1, discount_factor=.9, learning_rate=.5, player=1, trace_size=.1):   
        TD_Learner.__init__(self, task, value_table, epsilon, discount_factor, learning_rate, player, trace_size) 
        self.known_states = known_states
        

    def interact(self, reward, next_board_state):
        if reward is None:
            # Approximation of known states. Since too many states, instead, given a board position, 
            # explore possible moves and give 15 reward to creating streaks of length 3 or 4 and 
            # 20 reward for preventing an opponent win.
            if (self.known_states):
                for col in task.next_possible_moves():
                    row = np.sum([abs(x) for x in next_board_state.grid[col]])
                    if next_board_state.streakVertical(next_board_state.grid, col, row - 2, self.player) >= 2:
                        self.value_table[grid_to_key(next_board_state.grid)][col] = 15
                    temp_board = deepcopy(next_board_state.grid)
                    temp_board[col][row] = self.player
                    for i in range(0, 4):
                        if next_board_state.streakHorizontal(temp_board, col - i, row, self.player) >= 3:
                            self.value_table[grid_to_key(next_board_state.grid)][col] = 15
                        if next_board_state.streakDiagonalUp(temp_board, col - i, row - i, self.player) >= 3:
                            self.value_table[grid_to_key(next_board_state.grid)][col] = 15
                        if next_board_state.streakDiagonalDown(temp_board, col - i, row + i, self.player) >= 3:
                            self.value_table[grid_to_key(next_board_state.grid)][col] = 15
                            
                    if next_board_state.streakVertical(next_board_state.grid, col, row - 3, -self.player) == 3:
                        self.value_table[grid_to_key(next_board_state.grid)][col] = 20
                    temp_board = deepcopy(next_board_state.grid)
                    temp_board[col][row] = -1*self.player
                    for i in range(0, 4):
                        if next_board_state.streakHorizontal(temp_board, col - i, row, -1*self.player) == 4:
                            self.value_table[grid_to_key(next_board_state.grid)][col] = 20
                        if next_board_state.streakDiagonalUp(temp_board, col - i, row - i, -1*self.player) == 4:
                            self.value_table[grid_to_key(next_board_state.grid)][col] = 20
                        if next_board_state.streakDiagonalDown(temp_board, col - i, row + i, -1*self.player) == 4:
                            self.value_table[grid_to_key(next_board_state.grid)][col] = 20

            next_action = self.softmax(next_board_state)

            self.last_board_state = next_board_state.grid
            self.last_action = next_action
            return self.last_action
                
        if reward == 50:
            delta = delta = reward - self.value_table[grid_to_key(self.last_board_state)][self.last_action]
            self.value_table[grid_to_key(self.last_board_state)][self.last_action] += self.learning_rate * delta
            
            return self.last_action
        
        """
        VDBE-Softmax policy. If draw < epsilon, perform Softmax. Else do best action.
        """
        draw = np.random.uniform(0,1,1)

        if draw < self.epsilon:
            next_action = self.softmax(next_board_state)
        else:
            next_action = np.argmax(self.value_table[grid_to_key(next_board_state.grid)])

        # Update value function.
        delta = reward + self.discount_factor * np.amax(self.value_table[grid_to_key(next_board_state.grid)]) - self.value_table[grid_to_key(self.last_board_state)][self.last_action]
        self.value_table[grid_to_key(self.last_board_state)][self.last_action] += self.learning_rate * delta
        
        # Update eligibility traces (Watson's Q(lambda))
        self.e[grid_to_key(self.last_board_state)][self.last_action] += 1

        # Eligibility traces
        # Note that here we do not implement classic eligibility traces, which iterate over all state, action pairs
        # Instead we consider all next possible board states and update those (for easier computation)
        next_possible_moves = next_board_state.next_possible_moves()
        next_possible_boards = []
        
        for i in next_possible_moves:
            temp_board = deepcopy(next_board_state)
            temp_board.move(next_action, self.player)
            next_possible_boards.append(temp_board)
            
        for board in next_possible_boards:
            valid_actions = board.next_possible_moves()
            for action in valid_actions:
                self.value_table[grid_to_key(board.grid)][action] += self.learning_rate * delta \
                                                                    * self.e[grid_to_key(board.grid)][action]
                if self.last_action == action:
                    self.e[grid_to_key(board.grid)][action] = self.discount_factor * self.trace_size \
                                                                    * self.e[grid_to_key(board.grid)][action]
                else:
                    self.e[grid_to_key(board.grid)][action] = 0
                    
        self.last_board_state = next_board_state.grid
        self.last_action = next_action

        if next_board_state.simulate_move(self.last_action, self.player) == 1:
            self.last_action = self.softmax(next_board_state)
            
        return self.last_action


In [7]:
task = ConnectN(7, 4)
agent = Q_Learner(task, None, True)

In [8]:
run_trial(agent, 10, 1, 1)

NameError: name 'run_trial' is not defined

In [9]:
def play_game(board, p1, p2, q=False):
    """
    Runs Connect 4 game given simulator object and two agents (players)
    """
    reward = None
    
    if q == True:
        while True:
            print("p1")
            p1move = p1.interact(reward, board)
            print(p1move)
            if (p1move is None):
                board.print_grid()
                print("error player 1 a")
                return -1
            p1result, reward = board.move(p1move, 1)
            print p1result
            if (p1result == 1):
                print("player 1")
                board.print_grid()
                return 1
            elif (p1result == -1):
                board.print_grid()
                print("error player 1 b")
                return -1
            print("p2")
            p2move = p2.calc_next_move()
            print(p2move)
            if (p2move is None):
                board.print_grid()
                print("error player 2")
                return -1
            p2result = board.move(p2move, -1)
            print p2result
            if (p2result[0] == 1):
                print("player 2")
                board.print_grid()
                return 1
            elif (p2result[0] == -1):
                board.print_grid()
                print("error player 2")
                return -1
    
    else:
        while True:
            print("p1")
            p1move = p1.calc_next_move()
            print(p1move)
            if (p1move is None):
                board.print_grid()
                print("error")
                return -1
            p1result = board.move(p1move, 1)
            print p1result
            if (p1result[0] == 1):
                print("player 1")
                board.print_grid()
                return 1
            elif (p1result[0] == -1):
                board.print_grid()
                print("error")
                return -1
            print("p2")
            p2move = p2.calc_next_move()
            print(p2move)
            if (p2move is None):
                board.print_grid()
                print("error")
                return -1
            p2result = board.move(p2move, -1)
            print p2result
            if (p2result[0] == 1):
                print("player 2")
                board.print_grid()
                return 1
            elif (p2result[0] == -1):
                board.print_grid()
                print("error")
                return -1

In [10]:
class Random_Learner(object):
    """
    Implementation of Connect 4 agent that takes random moves at each action step
    """
    
    def __init__(self, board):
        self.board = board

    def calc_next_move(self):
        moves = self.board.next_possible_moves()
        return moves[random.randint(0, len(moves) - 1)]
        

In [11]:
def play_game_no_output(board, p1, p2, q=False):
    """
    Runs Connect 4 game given simulator object and two agents (players)
    """
    reward = None
    
    if q == True:
        while True:
            p1move = p1.interact(reward, board)
            if (p1move is None):
                return -1
            p1result, reward = board.move(p1move, 1)
            if (p1result == 1):
                p1.interact(reward, board)
                return 1
            elif (p1result == -1):
                return -1
            p2move = p2.calc_next_move()
            if (p2move is None):
                return -1
            p2result = board.move(p2move, -1)
            if (p2result[0] == 1):
                return 2
            elif (p2result[0] == -1):
                return -1
    
    else:
        while True:
            print("p1")
            p1move = p1.calc_next_move()
            print(p1move)
            if (p1move is None):
                board.print_grid()
                print("error")
                return -1
            p1result = board.move(p1move, 1)
            print p1result
            if (p1result[0] == 1):
                print("player 1")
                board.print_grid()
                return 1
            elif (p1result[0] == -1):
                board.print_grid()
                print("error")
                return -1
            print("p2")
            p2move = p2.calc_next_move()
            print(p2move)
            if (p2move is None):
                board.print_grid()
                print("error")
                return -1
            p2result = board.move(p2move, -1)
            print p2result
            if (p2result[0] == 1):
                print("player 2")
                board.print_grid()
                return 1
            elif (p2result[0] == -1):
                board.print_grid()
                print("error")
                return -1

In [56]:
def train_Q_learner(num_trials = 1000, k=2, n=4, grid_size = 7):
    """
    Trains the Q Learner against Minimax Depth k
    
    Inputs:
    Number of games to play
    On grid_size x
    N tokens to connect
    
    Outputs:
    Q Learner value table after training
    """
    depth = k
    N = n
    grid_size = grid_size
    x = ConnectN(grid_size, N)
    
    p1 = Q_Learner(x, None, None, player=1)
    p2 = Random_Learner(x)
    play_game_no_output(x, p1, p2, True)
    
    for game in xrange(1, num_trials):
                
        x = ConnectN(grid_size, N)
        p1 = Q_Learner(x, p1.value_table, None, player=1)
        p2 = Random_Learner(x)
        play_game_no_output(x, p1, p2, True)

        if game == num_trials - 1:
            return p1.value_table


In [60]:
x = ConnectN(7,4)
x.move(0,1)
x.move(1,-1)
x.move(1,1)
x.move(2,-1)
x.move(2,-1)
ql = Q_Learner(x, None, True)
ql.interact(None, x)

2

In [61]:
ql.value_table

{'0000000000000000000000000000000000001-100001-1-10000': array([  0.,   0.,  15.,   0.,   0.,   0.,   0.])}

In [59]:
train_Q_learner(num_trials=2)

{'000000000000000000000-100000010000001-1000001-1001-10': array([ 0.,  0.,  0.,  0.,  0.,  0.,  0.]),
 '000000000000000000000000000000000000-1000001-100100': array([ 0.,  0.,  0.,  0.,  0.,  0.,  0.]),
 '0000000000000000000000000000000000000000000-100100': array([ 0.,  0.,  0.,  0.,  0.,  0.,  0.]),
 '0000000000000000000000000000000000000000000000000': array([ 0.,  0.,  0.,  0.,  0.,  0.,  0.]),
 '000000000000000000000000000000000000000000000100-1': array([ 0.,  0.,  0.,  0.,  0.,  0.,  0.]),
 '0000000000000000000000000000000000000000001-100100': array([ 0.,  0.,  0.,  0.,  0.,  0.,  0.]),
 '0000000000000000000000000000000000000000001001-10-1': array([ 0.,  0.,  0.,  0.,  0.,  0.,  0.]),
 '000000000000000000000000000000000000000000100100-1': array([ 0.,  0.,  0.,  0.,  0.,  0.,  0.]),
 '000000000000000000000000000000000001-1000001-1001-10': array([ 0.,  0.,  0.,  0.,  0.,  0.,  0.]),
 '000000000000000000000000000000000001-1000001-100100': array([ 0.,  0.,  0.,  0.,  0.,  0.,  0.]),
 '0