In [2]:
import numpy as np
import pickle
import random
import math

# Constants for Connect4
BOARD_ROWS = 6
BOARD_COLS = 7
WIN_COUNT = 4
EMPTY = 0
PLAYER_PIECE = 1
AI_PIECE = -1

# ENVIRONMENT FOR CONNECT4
class Connect4State:
    def __init__(self, p1, p2):
        self.board = np.zeros((BOARD_ROWS, BOARD_COLS))
        self.p1 = p1
        self.p2 = p2
        self.isEnd = False
        self.boardHash = None
        self.playerSymbol = PLAYER_PIECE

    def getHash(self):
        self.boardHash = str(self.board.reshape(BOARD_COLS * BOARD_ROWS))
        return self.boardHash

    def winner(self):
        # Check horizontal locations for win
        for r in range(BOARD_ROWS):
            for c in range(BOARD_COLS - WIN_COUNT + 1):
                if abs(sum(self.board[r, c:c + WIN_COUNT])) == WIN_COUNT:
                    self.isEnd = True
                    return np.sign(sum(self.board[r, c:c + WIN_COUNT]))

        # Check vertical locations for win
        for r in range(BOARD_ROWS - WIN_COUNT + 1):
            for c in range(BOARD_COLS):
                if abs(sum(self.board[r:r + WIN_COUNT, c])) == WIN_COUNT:
                    self.isEnd = True
                    return np.sign(sum(self.board[r:r + WIN_COUNT, c]))

        # Check positively sloped diagonals
        for r in range(BOARD_ROWS - WIN_COUNT + 1):
            for c in range(BOARD_COLS - WIN_COUNT + 1):
                if abs(sum([self.board[r + i, c + i] for i in range(WIN_COUNT)])) == WIN_COUNT:
                    self.isEnd = True
                    return np.sign(sum([self.board[r + i, c + i] for i in range(WIN_COUNT)]))

        # Check negatively sloped diagonals
        for r in range(WIN_COUNT - 1, BOARD_ROWS):
            for c in range(BOARD_COLS - WIN_COUNT + 1):
                if abs(sum([self.board[r - i, c + i] for i in range(WIN_COUNT)])) == WIN_COUNT:
                    self.isEnd = True
                    return np.sign(sum([self.board[r - i, c + i] for i in range(WIN_COUNT)]))

        # Check for tie
        if len(self.get_valid_location()) == 0:
            self.isEnd = True
            return 0

        # No winner yet
        self.isEnd = False
        return None

    def get_valid_location(self):
        valid_locations = []
        for col in range(BOARD_COLS):
            if self.board[0][col] == EMPTY:
                valid_locations.append(col)
        return valid_locations

    def updateState(self, action):
        for r in range(BOARD_ROWS-1, -1, -1):
            if self.board[r][action] == EMPTY:
                self.board[r][action] = self.playerSymbol
                break

        self.playerSymbol = -self.playerSymbol

    def reset(self):
        self.board = np.zeros((BOARD_ROWS, BOARD_COLS))
        self.isEnd = False
        self.boardHash = None
        self.playerSymbol = PLAYER_PIECE

    def printBoard(self):
        for r in range(BOARD_ROWS):
            print('------------------------------------')
            out = '| '
            for c in range(BOARD_COLS):
                token = ' '
                if self.board[r][c] == PLAYER_PIECE:
                    token = 'x'
                elif self.board[r][c] == AI_PIECE:
                    token = 'o'

                out += token + ' | '
            print(out)
        print('-------------------------------------')

# Player class
class Player:
    def __init__(self, name, exp_rate=0.3):
        self.name = name
        self.states = [] # record all positions taken
        self.lr = 0.2
        self.exp_rate = exp_rate
        self.decay_gamma = 0.9
        self.states_value = {} # state -> value

    def getHash(self, board):
        boardHash = str(board.reshape(BOARD_COLS * BOARD_ROWS))
        return boardHash

    def chooseAction(self, positions, current_board, symbol):
        if np.random.uniform(0, 1) <= self.exp_rate:
            # choose random action
            idx = np.random.choice(len(positions))
            action = positions[idx]
        else:
            # choose best action
            value_max = -999
            for p in positions:
                next_board = current_board.copy()
                for r in range(BOARD_ROWS - 1, -1, -1):
                    if next_board[r, p] == EMPTY:
                        next_board[r, p] = symbol
                        break

                next_boardHash = self.getHash(next_board)
                value = 0 if self.states_value.get(next_boardHash) is None else self.states_value.get(next_boardHash)
                if value >= value_max:
                    value_max = value
                    action = p
        return action

    def addState(self, state):
        self.states.append(state)

    def feedReward(self, reward):
        for st in reversed(self.states):
            if self.states_value.get(st) is None:
                self.states_value[st] = 0
            self.states_value[st] += self.lr * (self.decay_gamma * reward - self.states_value[st])
            reward = self.states_value[st]

    def reset(self):
        self.states = []

    def savePolicy(self):
        with open('policy_' + self.name, 'wb') as fw:
            pickle.dump(self.states_value, fw)

    def loadPolicy(self, file):
        with open(file, 'rb') as fr:
            self.states_value = pickle.load(fr)

# Human player class
class HumanPlayer:
    def __init__(self, name):
        self.name = name

    def chooseAction(self, positions, current_board, symbol):
        action = None
        while action not in positions:
            try:
                action = int(input(f"Player {symbol}, enter your move (0-{BOARD_COLS-1}): "))
            except ValueError:
                print("Invalid input. Please enter a valid column number.")
        return action

    def addState(self, state):
        pass

    def feedReward(self, reward):
        pass

    def reset(self):
        pass


In [3]:
def evalute_window(window, piece):
    score = 0
    opp_piece = PLAYER_PIECE if piece == AI_PIECE else AI_PIECE

    if window.count(piece) == 4:
        score += 100
    elif window.count(piece) == 3 and window.count(EMPTY) == 1:
        score += 5
    elif window.count(piece) == 2 and window.count(EMPTY) == 2:
        score += 2

    if window.count(opp_piece) == 3 and window.count(EMPTY) == 1:
        score -= 4

    return score

def score_position(board, piece):
    score = 0

    # Score center column
    center_array = [int(i) for i in list(board[:, BOARD_COLS//2])]
    center_count = center_array.count(piece)
    score += center_count * 3

    # Score horizontal
    for r in range(BOARD_ROWS):
        row_array = [int(i) for i in list(board[r, :])]
        for c in range(BOARD_COLS-3):
            window = row_array[c:c+4]
            score += evalute_window(window, piece)

    # Score vertical
    for c in range(BOARD_COLS):
        col_array = [int(i) for i in list(board[:, c])]
        for r in range(BOARD_ROWS-3):
            window = col_array[r:r+4]
            score += evalute_window(window, piece)

    # Score positively sloped diagonals
    for r in range(BOARD_ROWS-3):
        for c in range(BOARD_COLS-3):
            window = [board[r+i][c+i] for i in range(4)]
            score += evalute_window(window, piece)

    # Score negatively sloped diagonals
    for r in range(3, BOARD_ROWS):
        for c in range(BOARD_COLS-3):
            window = [board[r-i][c+i] for i in range(4)]
            score += evalute_window(window, piece)

    return score

def terminal_node(board):
    return winning_move(board, PLAYER_PIECE) or winning_move(board, AI_PIECE) or len(get_valid_location(board)) == 0

def get_valid_location(board):
    valid_locations = []
    for col in range(BOARD_COLS):
        if board[0][col] == EMPTY:
            valid_locations.append(col)
    return valid_locations

def winning_move(board, piece):
    # Check horizontal locations for win
    for c in range(BOARD_COLS-3):
        for r in range(BOARD_ROWS):
            if board[r][c] == piece and board[r][c+1] == piece and board[r][c+2] == piece and board[r][c+3] == piece:
                return True

    # Check vertical locations for win
    for r in range(BOARD_ROWS-3):
        for c in range(BOARD_COLS):
            if board[r][c] == piece and board[r+1][c] == piece and board[r+2][c] == piece and board[r+3][c] == piece:
                return True

    # Check positively sloped diagonals
    for r in range(BOARD_ROWS-3):
        for c in range(BOARD_COLS-3):
            if board[r][c] == piece and board[r+1][c+1] == piece and board[r+2][c+2] == piece and board[r+3][c+3] == piece:
                return True

    # Check negatively sloped diagonals
    for r in range(3, BOARD_ROWS):
        for c in range(BOARD_COLS-3):
            if board[r][c] == piece and board[r-1][c+1] == piece and board[r-2][c+2] == piece and board[r-3][c+3] == piece:
                return True

    return False

def available_row(board, col):
    for r in range(BOARD_ROWS-1, -1, -1):
        if board[r][col] == EMPTY:
            return r

def drop_pieces(board, row, col, piece):
    board[row][col] = piece

def minimax(board, depth, alpha, beta, maximizingPlayer):
    valid_locations = get_valid_location(board)
    is_terminal = terminal_node(board)
    if depth == 0 or is_terminal:
        if is_terminal:
            if winning_move(board, AI_PIECE):
                return (None, 100000)
            elif winning_move(board, PLAYER_PIECE):
                return (None, -100000)
            else: # Game is over and no valid move is available
                return (None, 0)
        else:
            return (None, score_position(board, AI_PIECE))

    if maximizingPlayer:
        value = -math.inf
        column = random.choice(valid_locations)
        for col in valid_locations:
            row = available_row(board, col)
            b_copy = board.copy()
            drop_pieces(b_copy, row, col, AI_PIECE)
            new_score = minimax(b_copy, depth-1, alpha, beta, False)[1]
            if new_score > value:
                value = new_score
                column = col
            alpha = max(alpha, value)
            if alpha >= beta:
                break
        return column, value

    else: # Minimizing Player
        value = math.inf
        column = random.choice(valid_locations)
        for col in valid_locations:
            row = available_row(board, col)
            b_copy = board.copy()
            drop_pieces(b_copy, row, col, PLAYER_PIECE)
            new_score = minimax(b_copy, depth-1, alpha, beta, True)[1]
            if new_score < value:
                value = new_score
                column = col
            beta = min(beta, value)
            if alpha >= beta:
                break
        return column, value

def best_move(board, piece):
    valid_locations = get_valid_location(board)
    best_score = -10000
    best_col = random.choice(valid_locations)
    for col in valid_locations:
        row = available_row(board, col)
        temp_board = board.copy()
        drop_pieces(temp_board, row, col, piece)
        score = score_position(temp_board, piece)
        if score > best_score:
            best_score = score
            best_col = col
    return best_col


In [10]:
# Training function
def play_games(p1, p2, state, rounds=10000, print_board=False):
    for i in range(rounds):
        if i % 100 == 0:
            print(f"Rounds {i}")
        # Print the current round
        #print(f"Playing round {i+1}")

        state.reset()
        while not state.isEnd:
            if print_board:
                state.printBoard()
            positions = state.get_valid_location()
            p1_action = p1.chooseAction(positions, state.board, state.playerSymbol)
            state.updateState(p1_action)
            if print_board:
                state.printBoard()
            state_hash = state.getHash()
            p1.addState(state_hash)

            win = state.winner()
            if win is not None:
                if win == PLAYER_PIECE:
                    p1.feedReward(1)
                    p2.feedReward(0)
                elif win == AI_PIECE:
                    p1.feedReward(0)
                    p2.feedReward(1)
                else:
                    p1.feedReward(0.5)
                    p2.feedReward(0.5)
                break
            else:
                # Minimax player move
                col, minimax_score = minimax(state.board, 4, -math.inf, math.inf, True)
                state.updateState(col)
                if print_board:
                    state.printBoard()
                state_hash = state.getHash()
                p2.addState(state_hash)

                win = state.winner()
                if win is not None:
                    if win == PLAYER_PIECE:
                        p1.feedReward(1)
                        p2.feedReward(0)
                    elif win == AI_PIECE:
                        p1.feedReward(0)
                        p2.feedReward(1)
                    else:
                        p1.feedReward(0.5)
                        p2.feedReward(0.5)
                    break

# Example of training the agent
p1 = Player("p1")
p2 = Player("p2")  # Placeholder for minimax player
state = Connect4State(p1, p2)
print("Training against Minimax AI...")
play_games(p1, p2, state, rounds=10000, print_board=False)

Training against Minimax AI...
Rounds 0
Rounds 100
Rounds 200
Rounds 300
Rounds 400
Rounds 500
Rounds 600
Rounds 700
Rounds 800
Rounds 900
Rounds 1000
Rounds 1100
Rounds 1200
Rounds 1300
Rounds 1400
Rounds 1500
Rounds 1600
Rounds 1700
Rounds 1800
Rounds 1900
Rounds 2000
Rounds 2100
Rounds 2200
Rounds 2300
Rounds 2400
Rounds 2500
Rounds 2600
Rounds 2700
Rounds 2800
Rounds 2900
Rounds 3000
Rounds 3100
Rounds 3200
Rounds 3300
Rounds 3400
Rounds 3500
Rounds 3600
Rounds 3700
Rounds 3800
Rounds 3900
Rounds 4000
Rounds 4100
Rounds 4200
Rounds 4300
Rounds 4400
Rounds 4500
Rounds 4600
Rounds 4700
Rounds 4800
Rounds 4900
Rounds 5000
Rounds 5100
Rounds 5200
Rounds 5300
Rounds 5400
Rounds 5500
Rounds 5600
Rounds 5700
Rounds 5800
Rounds 5900
Rounds 6000
Rounds 6100
Rounds 6200
Rounds 6300
Rounds 6400
Rounds 6500
Rounds 6600
Rounds 6700
Rounds 6800
Rounds 6900
Rounds 7000
Rounds 7100
Rounds 7200
Rounds 7300
Rounds 7400
Rounds 7500
Rounds 7600
Rounds 7700
Rounds 7800
Rounds 7900
Rounds 8000
Rounds 81

In [11]:
p1.savePolicy()
p2.savePolicy()

In [12]:
# Example of playing against a human
human = HumanPlayer("Human")
print("Playing against a human...")
play_games(human, p1, state, rounds=1, print_board=True)


Playing against a human...
Rounds 0
------------------------------------
|   |   |   |   |   |   |   | 
------------------------------------
|   |   |   |   |   |   |   | 
------------------------------------
|   |   |   |   |   |   |   | 
------------------------------------
|   |   |   |   |   |   |   | 
------------------------------------
|   |   |   |   |   |   |   | 
------------------------------------
|   |   |   |   |   |   |   | 
-------------------------------------
Player 1, enter your move (0-6): 3
------------------------------------
|   |   |   |   |   |   |   | 
------------------------------------
|   |   |   |   |   |   |   | 
------------------------------------
|   |   |   |   |   |   |   | 
------------------------------------
|   |   |   |   |   |   |   | 
------------------------------------
|   |   |   |   |   |   |   | 
------------------------------------
|   |   |   | x |   |   |   | 
-------------------------------------
------------------------------------


In [18]:
# Save the trained policy
p1.savePolicy()
print("Policy saved as policy_p1.pkl.")

# Verify file existence
!ls # List files in the current directory to confirm if policy_p1.pkl exists

# Download the policy file
from google.colab import files
files.download('policy_p1.pkl')

Policy saved as policy_p1.pkl.
policy_p1  policy_p2  sample_data


FileNotFoundError: Cannot find file: policy_p1.pkl

In [20]:
# Save the trained policy
p1.savePolicy()
print("Policy saved as policy_p1.pkl.")

# Download the policy file using wget (corrected)
!wget ./policy_p1.pkl # Add ./ to indicate current directory

Policy saved as policy_p1.pkl.
--2024-07-26 22:41:23--  http://./policy_p1.pkl
Resolving . (.)... failed: No address associated with hostname.
wget: unable to resolve host address ‘.’
