Getting Started

# Connect 4 using Minimax Algorithm By Group 7 for Reinforcement Learning Final Project

<img src="https://i.imgur.com/PFz0byC.jpg" 
     align="justify" 
     width="400" />

## Team
This is a collaborative effort of group 7 (Kritagya Kumra, Aakash Chavan, Daniel Macias, Janvi Navdiya, and Jaswant Reddy Bota.

## Objective
The aim of this notebook is to implement and train an agent on the game of Connect 4 using Minimax Algorithm. We aim to explore how well Minimax can adapt and optimize its gameplay in the context of Connect 4.

## Methodology
1. **Game engine**: Implement a Connect 4 game engine to facilitate the interaction of the agent with the game environment.
2. **Neural network**: Design a deep neural network model to approximate both policy (best moves) and value (winning chances) functions.
3. **Self-play**: Utilize Minimax guided by the neural network to generate self-play data. The agent plays against itself to explore the game tree.
4. **Training**: Update the neural network weights based on the outcomes and search probabilities obtained during self-play.
5. **Evaluation**: Evaluate the trained neural network's ability to recognize critical game states, such as immediate wins or blocks as a performance heuristic.

## Imports and Configuration

In [1]:
import numpy as np
import random

In [2]:
from kaggle_environments import make, evaluate

No pygame installed, ignoring import


In [3]:
env = make("connectx", debug=True)
print(list(env.agents))

['random', 'negamax']


In [4]:
env.run(["random", "random"])
env.render(mode="ipython")

## Defining agents
We'll start with a few examples, to provide some context. In the code cell below:

* The first agent behaves identically to the "random" agent above.
* The second agent always selects the middle column, whether it's valid or not! Note that if any agent selects an invalid move, it loses the game.
* The third agent selects the leftmost valid column.

In [5]:
def agent_leftmost(obs, config):
    """
    Selects leftmost valid column
    """
    valid_moves = []
    for col in range(config.columns):
        if obs.board[col] == 0:
            valid_moves.append(col)
    return valid_moves[0]
    
def agent_random(obs, config):
    valid_moves = []
    for col in range(config.columns):
        if obs.board[col] == 0:
            valid_moves.append(col)
    return random.choice(valid_moves)

def agent_middle(obs, config):
    return config.columns // 2

## Evaluating agents

The outcome of a single game is usually not enough information to figure out how well our agents are likely to perform. To get a better idea, we'll calculate the win percentages for each agent, averaged over multiple games. For fairness, each agent goes first half of the time.

To do this, we'll use the get_win_percentages() function (defined in a hidden code cell). To view the details of this function, click on the "Code" button below.



In [6]:
env.run([agent_leftmost, agent_random])
env.render(mode="ipython")

In [7]:
def get_win_percentages(agent1, agent2, n_rounds=100):
    config = {
        'rows': 6,
        'columns': 7,
        'inarrow': 4
    }
    outcomes = evaluate(
        "connectx", [agent1, agent2], config, [], n_rounds // 2
    )
    outcomes += [[b, a] for [a, b] in evaluate("connectx", [agent2, agent1], config, [], n_rounds - n_rounds // 2)]
    print("Agent 1 Win Percentage: ", np.round(outcomes.count([1, -1]) / len(outcomes), 2))
    print("Agent 2 Win Percentage: ", np.round(outcomes.count([-1, 1]) / len(outcomes), 2))
    print("Number of Invalid Plays by Agent 1: ", outcomes.count([None, 0]))
    print("Number of Invalid Plays by Agent 2: ", outcomes.count([0, None]))

In [8]:
get_win_percentages(agent1=agent_middle, agent2=agent_random)

Agent 1 Win Percentage:  0.66
Agent 2 Win Percentage:  0.01
Number of Invalid Plays by Agent 1:  33
Number of Invalid Plays by Agent 2:  0


In [9]:
get_win_percentages(agent1=agent_leftmost, agent2=agent_random)

Agent 1 Win Percentage:  0.82
Agent 2 Win Percentage:  0.18
Number of Invalid Plays by Agent 1:  0
Number of Invalid Plays by Agent 2:  0


On performing comparision between who plays better against the random agent: the agent that always plays in the middle **(agent_middle)**, or the agent that chooses the leftmost valid column **(agent_leftmost)**, **It looks like the agent that chooses the leftmost valid column performs best!**

## A Smarter Agent

In [10]:
import numpy as np

# Gets board at next step if agent drops piece in selected column
def drop_piece(grid, col, piece, config):
    next_grid = grid.copy()
    for row in range(config.rows-1, -1, -1):
        if next_grid[row][col] == 0:
            break
    next_grid[row][col] = piece
    return next_grid

# Returns True if dropping piece in column results in game win
def check_winning_move(obs, config, col, piece):
    # Convert the board to a 2D grid
    grid = np.asarray(obs.board).reshape(config.rows, config.columns)
    next_grid = drop_piece(grid, col, piece, config)
    # horizontal
    for row in range(config.rows):
        for col in range(config.columns-(config.inarow-1)):
            window = list(next_grid[row,col:col+config.inarow])
            if window.count(piece) == config.inarow:
                return True
    # vertical
    for row in range(config.rows-(config.inarow-1)):
        for col in range(config.columns):
            window = list(next_grid[row:row+config.inarow,col])
            if window.count(piece) == config.inarow:
                return True
    # positive diagonal
    for row in range(config.rows-(config.inarow-1)):
        for col in range(config.columns-(config.inarow-1)):
            window = list(next_grid[range(row, row+config.inarow), range(col, col+config.inarow)])
            if window.count(piece) == config.inarow:
                return True
    # negative diagonal
    for row in range(config.inarow-1, config.rows):
        for col in range(config.columns-(config.inarow-1)):
            window = list(next_grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
            if window.count(piece) == config.inarow:
                return True
    return False

In [11]:
def agent_q1(obs, config):
    valid_moves = []
    for col in range(config.columns):
        if obs.board[col] == 0:
            valid_moves.append(col)
    
    for col in valid_moves:
        if check_winning_move(obs, config, col, obs.mark):
            return col
    return random.choice(valid_moves)

In [12]:
def agent_q2(obs, config):
    valid_moves = []
    for col in range(config.columns):
        if obs.board[col] == 0:
            valid_moves.append(col)
    
    for col in valid_moves:
        if check_winning_move(obs, config, col, obs.mark):
            return col
    
    for col in valid_moves:
        if check_winning_move(obs, config, col, obs.mark % 2 + 1):
            return col
    
    return random.choice(valid_moves)

In [13]:
env.run([agent_q1, agent_q2])
env.render(mode="ipython")

# One-Step Lookahead using simple heuristic

## Game Trees

As a human player, how do you think about how to play the game? How do you weigh alternative moves?

You likely do a bit of forecasting. For each potential move, you predict what your opponent is likely to do in response, along with how you'd then respond, and what the opponent is likely to do then, and so on. Then, you choose the move that you think is most likely to result in a win.

We can formalize this idea and represent all possible outcomes in a (complete) game tree.
<img src="https://storage.googleapis.com/kaggle-media/learn/images/EZKHxyy.png" 
     align="justify" 
     width="800" />


The game tree represents each possible move (by agent and opponent), starting with an empty board. The first row shows all possible moves the agent (red player) can make. Next, we record each move the opponent (yellow player) can make in response, and so on, until each branch reaches the end of the game. (The game tree for Connect Four is quite large, so we show only a small preview in the image above.)

## Heuristics¶
The complete game tree for Connect Four has over 4 trillion different boards! So in practice, our agent only works with a small subset when planning a move.

To make sure the incomplete tree is still useful to the agent, we will use a heuristic (or heuristic function). The heuristic assigns scores to different game boards, where we estimate that boards with higher scores are more likely to result in the agent winning the game. You will design the heuristic based on your knowledge of the game.

For instance, one heuristic that might work reasonably well for Connect Four looks at each group of four adjacent locations in a (horizontal, vertical, or diagonal) line and assigns:

1. 1000000 (1e6) points if the agent has four discs in a row (the agent won),
2. 1 point if the agent filled three spots, and the remaining spot is empty (the agent wins if it fills in the empty spot), and
3. 100 points if the opponent filled three spots, and the remaining spot is empty (the opponent wins by filling in the empty spot).
<img src="https://storage.googleapis.com/kaggle-media/learn/images/vzQa4ML.png" 
     align="justify" 
     width="800" />

In [14]:
# Calculates score if agent drops piece in selected column
def score_move(grid, col, mark, config):
    next_grid = drop_piece(grid, col, mark, config)
    score = get_heuristic(next_grid, mark, config)
    return score

# Helper function for score_move: gets board at next step if agent drops piece in selected column
def drop_piece(grid, col, mark, config):
    next_grid = grid.copy()
    for row in range(config.rows-1, -1, -1):
        if next_grid[row][col] == 0:
            break
    next_grid[row][col] = mark
    return next_grid

# Helper function for score_move: calculates value of heuristic for grid
def get_heuristic(grid, mark, config):
    num_threes = count_windows(grid, 3, mark, config)
    num_fours = count_windows(grid, 4, mark, config)
    num_threes_opp = count_windows(grid, 3, mark%2+1, config)
    score = num_threes - 1e2*num_threes_opp + 1e6*num_fours
    return score

# Helper function for get_heuristic: checks if window satisfies heuristic conditions
def check_window(window, num_discs, piece, config):
    return (window.count(piece) == num_discs and window.count(0) == config.inarow-num_discs)
    
# Helper function for get_heuristic: counts number of windows satisfying specified heuristic conditions
def count_windows(grid, num_discs, piece, config):
    num_windows = 0
    # horizontal
    for row in range(config.rows):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[row, col:col+config.inarow])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    # vertical
    for row in range(config.rows-(config.inarow-1)):
        for col in range(config.columns):
            window = list(grid[row:row+config.inarow, col])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    # positive diagonal
    for row in range(config.rows-(config.inarow-1)):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[range(row, row+config.inarow), range(col, col+config.inarow)])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    # negative diagonal
    for row in range(config.inarow-1, config.rows):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    return num_windows

In [15]:
# The agent is always implemented as a Python function that accepts two arguments: obs and config
def agent(obs, config):
    # Get list of valid moves
    valid_moves = [c for c in range(config.columns) if obs.board[c] == 0]
    # Convert the board to a 2D grid
    grid = np.asarray(obs.board).reshape(config.rows, config.columns)
    # Use the heuristic to assign a score to each possible board in the next turn
    scores = dict(zip(valid_moves, [score_move(grid, col, obs.mark, config) for col in valid_moves]))
    # Get a list of columns (moves) that maximize the heuristic
    max_cols = [key for key in scores.keys() if scores[key] == max(scores.values())]
    # Select at random from the maximizing columns
    return random.choice(max_cols)

In [16]:
# Game round against a random agent
env.run([agent, "random"])
env.render(mode="ipython")

In [17]:
# Game round against other agent
env.run([agent, agent_q2])
env.render(mode="ipython")

In [18]:
get_win_percentages(agent1=agent, agent2="random")

Agent 1 Win Percentage:  0.98
Agent 2 Win Percentage:  0.02
Number of Invalid Plays by Agent 1:  0
Number of Invalid Plays by Agent 2:  0


In [19]:
get_win_percentages(agent1=agent, agent2=agent_q2)

Agent 1 Win Percentage:  0.74
Agent 2 Win Percentage:  0.25
Number of Invalid Plays by Agent 1:  0
Number of Invalid Plays by Agent 2:  0


## More Complext heuristic

In [20]:
# Calculates score if agent drops piece in selected column
A = 1e10
B = 1e4
C = 1e2
D = -1
E = -1e6

def score_move(grid, col, mark, config):
    next_grid = drop_piece(grid, col, mark, config)
    score = get_heuristic_q1(next_grid, col, mark, config)
    return score

# Helper function for score_move: gets board at next step if agent drops piece in selected column
def drop_piece(grid, col, mark, config):
    next_grid = grid.copy()
    for row in range(config.rows-1, -1, -1):
        if next_grid[row][col] == 0:
            break
    next_grid[row][col] = mark
    return next_grid

# Helper function for score_move: calculates value of heuristic for grid
def get_heuristic_q1(grid, col, mark, config):
    num_twos = count_windows(grid, 2, mark, config)
    num_threes = count_windows(grid, 3, mark, config)
    num_fours = count_windows(grid, 4, mark, config)
    num_twos_opp = count_windows(grid, 2, mark%2+1, config)
    num_threes_opp = count_windows(grid, 3, mark%2+1, config)
    score = A*num_fours + B*num_threes + C*num_twos + D*num_twos_opp + E*num_threes_opp
    return score

# Helper function for get_heuristic: checks if window satisfies heuristic conditions
def check_window(window, num_discs, piece, config):
    return (window.count(piece) == num_discs and window.count(0) == config.inarow-num_discs)
    
# Helper function for get_heuristic: counts number of windows satisfying specified heuristic conditions
def count_windows(grid, num_discs, piece, config):
    num_windows = 0
    # horizontal
    for row in range(config.rows):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[row, col:col+config.inarow])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    # vertical
    for row in range(config.rows-(config.inarow-1)):
        for col in range(config.columns):
            window = list(grid[row:row+config.inarow, col])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    # positive diagonal
    for row in range(config.rows-(config.inarow-1)):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[range(row, row+config.inarow), range(col, col+config.inarow)])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    # negative diagonal
    for row in range(config.inarow-1, config.rows):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    return num_windows

In [21]:
# The agent is always implemented as a Python function that accepts two arguments: obs and config
def agent(obs, config):
    # Get list of valid moves
    valid_moves = [c for c in range(config.columns) if obs.board[c] == 0]
    # Convert the board to a 2D grid
    grid = np.asarray(obs.board).reshape(config.rows, config.columns)
    # Use the heuristic to assign a score to each possible board in the next turn
    scores = dict(zip(valid_moves, [score_move(grid, col, obs.mark, config) for col in valid_moves]))
    # Get a list of columns (moves) that maximize the heuristic
    max_cols = [key for key in scores.keys() if scores[key] == max(scores.values())]
    # Select at random from the maximizing columns
    return random.choice(max_cols)

In [22]:
# Game round against a random agent
env.run([agent, "random"])
env.render(mode="ipython")

In [23]:
# Game round against other agent
env.run([agent, agent_q2])
env.render(mode="ipython")

In [24]:
get_win_percentages(agent1=agent, agent2="random")

Agent 1 Win Percentage:  1.0
Agent 2 Win Percentage:  0.0
Number of Invalid Plays by Agent 1:  0
Number of Invalid Plays by Agent 2:  0


In [25]:
get_win_percentages(agent1=agent, agent2=agent_q2)

Agent 1 Win Percentage:  0.81
Agent 2 Win Percentage:  0.19
Number of Invalid Plays by Agent 1:  0
Number of Invalid Plays by Agent 2:  0


# N-Step Lookahead 

## Introduction¶
Till Now, you learned how to build an agent with one-step lookahead. This agent performs reasonably well, but definitely still has room for improvement! For instance, consider the potential moves in the figure below. (Note that we use zero-based numbering for the columns, so the leftmost column corresponds to col=0, the next column corresponds to col=1, and so on.)
<img src="https://storage.googleapis.com/kaggle-media/learn/images/aAYyy2I.png" 
     align="Justify" 
     width="800" />
     <br>
With one-step lookahead, the red player picks one of column 5 or 6, each with 50% probability as it has more reward associated. But column 5 is clearly a bad move, as it lets the opponent win the game in only one more turn. Unfortunately, the agent doesn't know this, because it can only look one move into the future.
Hence, what we thought of next step was to overcome this issue and use the minimax algorithm to help the agent look farther into the future and make better-informed decisions.


## Minimax Algorithm
Assume we're working with a depth of 3. This way, when choosing on a move, the agent examines all conceivable game boards that could result from the agent's move, the opponent's move, and the agent's subsequent move. For simplicity, we assume that at each turn, both the agent and opponent have only two possible moves. Each of the blue rectangles in the figure below corresponds to a different game board.

<img src="https://storage.googleapis.com/kaggle-media/learn/images/BrRe7Bu.png" 
     align="Justify" 
     width="800" />
But the agent no complete control over its score and is dependent on the opponent’s move to attain big scores and thus the opponent's selection can prove disastrous for the agent in some case for instance, If the agent picks the left branch, the opponent can force a score of -1.
You would see that the right branch is the better the agent, since it is the less risky option as it gives up the score (+40) that can be accessed on the left, but it also guarantees at least +10 points in the right.
This is the main idea behind the minimax algorithm: where the agent chooses moves to get a high score possible assuming the opponent will counteract to move the score to be as low as possible.


In [26]:
import random
import numpy as np

# Gets board at next step if agent drops piece in selected column
def drop_piece(grid, col, mark, config):
    next_grid = grid.copy()
    for row in range(config.rows-1, -1, -1):
        if next_grid[row][col] == 0:
            break
    next_grid[row][col] = mark
    return next_grid

# Helper function for get_heuristic: checks if window satisfies heuristic conditions
def check_window(window, num_discs, piece, config):
    return (window.count(piece) == num_discs and window.count(0) == config.inarow-num_discs)
    
# Helper function for get_heuristic: counts number of windows satisfying specified heuristic conditions
def count_windows(grid, num_discs, piece, config):
    num_windows = 0
    # horizontal
    for row in range(config.rows):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[row, col:col+config.inarow])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    # vertical
    for row in range(config.rows-(config.inarow-1)):
        for col in range(config.columns):
            window = list(grid[row:row+config.inarow, col])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    # positive diagonal
    for row in range(config.rows-(config.inarow-1)):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[range(row, row+config.inarow), range(col, col+config.inarow)])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    # negative diagonal
    for row in range(config.inarow-1, config.rows):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
            if check_window(window, num_discs, piece, config):
                num_windows += 1
    return num_windows

## Transition into Minimax Approach using Minimax Heuristics

We'll need to slightly modify the heuristic from the previous slides, since the opponent is now able to modify the game board results based upon its move.
We need to check if the opponent has won the game by playing a disc. The new heuristic looks at each group of four adjacent locations in a (horizontal, vertical, or diagonal) line and assigns:
<img src="https://storage.googleapis.com/kaggle-media/learn/images/vQ8b1aX.png" 
     align="Justify" 
     width="800" />
     <br>
1000000 (1e6) points if the agent has four discs in a row (the agent won),
<br>
1 point if the agent filled three spots, and the remaining spot is empty (the agent wins if it fills in the empty spot),<br>
-100 points if the opponent filled three spots, and the remaining spot is empty (the opponent wins by filling in the empty spot), and<br>
-10000 (-1e4) points if the opponent has four discs in a row (the opponent won).


In [27]:
# Helper function for minimax: calculates value of heuristic for grid
def get_heuristic(grid, mark, config):
    num_threes = count_windows(grid, 3, mark, config)
    num_fours = count_windows(grid, 4, mark, config)
    num_threes_opp = count_windows(grid, 3, mark%2+1, config)
    num_fours_opp = count_windows(grid, 4, mark%2+1, config)
    score = num_threes - 1e2*num_threes_opp - 1e4*num_fours_opp + 1e6*num_fours
    return score

In [28]:
# Uses minimax to calculate value of dropping piece in selected column
def score_move(grid, col, mark, config, nsteps):
    next_grid = drop_piece(grid, col, mark, config)
    score = minimax(next_grid, nsteps-1, False, mark, config)
    return score

# Helper function for minimax: checks if agent or opponent has four in a row in the window
def is_terminal_window(window, config):
    return window.count(1) == config.inarow or window.count(2) == config.inarow

# Helper function for minimax: checks if game has ended
def is_terminal_node(grid, config):
    # Check for draw 
    if list(grid[0, :]).count(0) == 0:
        return True
    # Check for win: horizontal, vertical, or diagonal
    # horizontal 
    for row in range(config.rows):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[row, col:col+config.inarow])
            if is_terminal_window(window, config):
                return True
    # vertical
    for row in range(config.rows-(config.inarow-1)):
        for col in range(config.columns):
            window = list(grid[row:row+config.inarow, col])
            if is_terminal_window(window, config):
                return True
    # positive diagonal
    for row in range(config.rows-(config.inarow-1)):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[range(row, row+config.inarow), range(col, col+config.inarow)])
            if is_terminal_window(window, config):
                return True
    # negative diagonal
    for row in range(config.inarow-1, config.rows):
        for col in range(config.columns-(config.inarow-1)):
            window = list(grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
            if is_terminal_window(window, config):
                return True
    return False

# Minimax implementation
def minimax(node, depth, maximizingPlayer, mark, config):
    is_terminal = is_terminal_node(node, config)
    valid_moves = [c for c in range(config.columns) if node[0][c] == 0]
    if depth == 0 or is_terminal:
        return get_heuristic(node, mark, config)
    if maximizingPlayer:
        value = -np.Inf
        for col in valid_moves:
            child = drop_piece(node, col, mark, config)
            value = max(value, minimax(child, depth-1, False, mark, config))
        return value
    else:
        value = np.Inf
        for col in valid_moves:
            child = drop_piece(node, col, mark%2+1, config)
            value = min(value, minimax(child, depth-1, True, mark, config))
        return value

In [29]:
# How deep to make the game tree: higher values take longer to run!
N_STEPS = 3

def agent(obs, config):
    # Get list of valid moves
    valid_moves = [c for c in range(config.columns) if obs.board[c] == 0]
    # Convert the board to a 2D grid
    grid = np.asarray(obs.board).reshape(config.rows, config.columns)
    # Use the heuristic to assign a score to each possible board in the next step
    scores = dict(zip(valid_moves, [score_move(grid, col, obs.mark, config, N_STEPS) for col in valid_moves]))
    # Get a list of columns (moves) that maximize the heuristic
    max_cols = [key for key in scores.keys() if scores[key] == max(scores.values())]
    # Select at random from the maximizing columns
    return random.choice(max_cols)

## Testing MiniMax with the random agent

The function selects a move based on a heuristic evaluation of the current board state. It assesses each possible move's desirability, chooses the moves that maximize the heuristic score, and randomly selects one of them.


In [30]:
env.run([agent, "random"])
env.render(mode="ipython")

In [31]:
def get_win_percentages(agent1, agent2, n_rounds=100):
    # Use default Connect Four setup
    config = {'rows': 6, 'columns': 7, 'inarow': 4}
    # Agent 1 goes first (roughly) half the time          
    outcomes = evaluate("connectx", [agent1, agent2], config, [], n_rounds//2)
    # Agent 2 goes first (roughly) half the time      
    outcomes += [[b,a] for [a,b] in evaluate("connectx", [agent2, agent1], config, [], n_rounds-n_rounds//2)]
    print("Agent 1 Win Percentage:", np.round(outcomes.count([1,-1])/len(outcomes), 2))
    print("Agent 2 Win Percentage:", np.round(outcomes.count([-1,1])/len(outcomes), 2))
    print("Number of Invalid Plays by Agent 1:", outcomes.count([None, 0]))
    print("Number of Invalid Plays by Agent 2:", outcomes.count([0, None]))

In [32]:
get_win_percentages(agent1=agent, agent2="random", n_rounds=50)

Agent 1 Win Percentage: 0.98
Agent 2 Win Percentage: 0.02
Number of Invalid Plays by Agent 1: 0
Number of Invalid Plays by Agent 2: 0


In [33]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

import gym
from gym import spaces

In [34]:
class ConnectFourGym(gym.Env):
    def __init__(self, agent2="random"):
        ks_env = make("connectx", debug=True)
        self.env = ks_env.train([None, agent2])
        self.rows = ks_env.configuration.rows
        self.columns = ks_env.configuration.columns
        # Learn about spaces here: http://gym.openai.com/docs/#spaces
        self.action_space = spaces.Discrete(self.columns)
        self.observation_space = spaces.Box(low=0, high=2, 
                                            shape=(1,self.rows,self.columns), dtype=int)
        # Tuple corresponding to the min and max possible rewards
        self.reward_range = (-10, 1)
        # StableBaselines throws error if these are not defined
        self.spec = None
        self.metadata = None

    def reset(self):
        self.obs = self.env.reset()
        return np.array(self.obs['board']).reshape(1,self.rows,self.columns)
    
    def change_reward(self, old_reward, done):
        if old_reward == 1: # The agent won the game
            return 1
        elif done: # The opponent won the game
            return -1
        else: # Reward 1/42
            return 1/(self.rows*self.columns)
    
    def step(self, action):
        # Check if agent's move is valid
        is_valid = (self.obs['board'][int(action)] == 0)
        if is_valid: # Play the move
            self.obs, old_reward, done, _ = self.env.step(int(action))
            reward = self.change_reward(old_reward, done)
        else: # End the game and penalize agent
            reward, done, _ = -10, True, {}
        return np.array(self.obs['board']).reshape(1,self.rows,self.columns), reward, done, _

In [35]:
env = ConnectFourGym(agent2="random")

In [36]:
import torch as th
import torch.nn as nn

In [37]:
from stable_baselines3 import PPO 
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor

2024-05-07 01:25:04.233465: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-05-07 01:25:04.233633: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-05-07 01:25:04.390753: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


In [38]:
class CustomCNN(BaseFeaturesExtractor):
    
    def __init__(self, observation_space: gym.spaces.Box, features_dim: int=128):
        super(CustomCNN, self).__init__(observation_space, features_dim)
        # CxHxW images (channels first)
        n_input_channels = observation_space.shape[0]
        self.cnn = nn.Sequential(
            nn.Conv2d(n_input_channels, 32, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.Flatten(),
        )

        # Compute shape by doing one forward pass
        with th.no_grad():
            n_flatten = self.cnn(
                th.as_tensor(observation_space.sample()[None]).float()
            ).shape[1]

        self.linear = nn.Sequential(nn.Linear(n_flatten, features_dim), nn.ReLU())

    def forward(self, observations: th.Tensor) -> th.Tensor:
        return self.linear(self.cnn(observations))

policy_kwargs = dict(
    features_extractor_class=CustomCNN,
)
        
# Initialize agent
model = PPO("CnnPolicy", env, policy_kwargs=policy_kwargs, verbose=0)



In [39]:
# Train agent
model.learn(total_timesteps=60000)

<stable_baselines3.ppo.ppo.PPO at 0x7d9008d47eb0>

In [40]:
def agent1(obs, config):
    # Use the best model to select a column
    col, _ = model.predict(np.array(obs['board']).reshape(1, 6,7))
    # Check if selected column is valid
    is_valid = (obs['board'][int(col)] == 0)
    # If not valid, select random move. 
    if is_valid:
        return int(col)
    else:
        return random.choice([col for col in range(config.columns) if obs.board[int(col)] == 0])

In [41]:
env = make("connectx")
env.run([agent, agent_q2])
env.render(mode="ipython")

In [42]:
get_win_percentages(agent1=agent1, agent2=agent_q2)

Agent 1 Win Percentage: 0.11
Agent 2 Win Percentage: 0.89
Number of Invalid Plays by Agent 1: 0
Number of Invalid Plays by Agent 2: 0


In [43]:
import torch
import torch.nn as nn

class CustomCNN(BaseFeaturesExtractor):
    def __init__(self, observation_space: gym.spaces.Box, features_dim: int=128):
        super(CustomCNN, self).__init__(observation_space, features_dim)
        input_channels = observation_space.shape[0]
        self.cnn = nn.Sequential(
            nn.Conv2d(input_channels, 32, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.ReLU(),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Flatten(),
        )

        # Forward a dummy input through the CNN to calculate output size
        with torch.no_grad():
            # dummy_input = torch.zeros((1, input_channels, 64, 64))  # Adjust size as per your input dimensions
            output_features = self.cnn(
                torch.as_tensor(
                    observation_space.sample()[None]
                ).float()
            )
            n_flatten = output_features.shape[1]

        self.linear = nn.Sequential(
            nn.Linear(n_flatten, features_dim),
            nn.ReLU()
        )

    def forward(self, x):
        return self.linear(self.cnn(x))


In [44]:
# Create ConnectFour environment 
env = ConnectFourGym(agent2="random")

policy_kwargs = dict(
    features_extractor_class=CustomCNN,
)
        
# Initialize agent
model = PPO("CnnPolicy", env, policy_kwargs=policy_kwargs, verbose=1)

Using cpu device
Wrapping the env with a `Monitor` wrapper
Wrapping the env in a DummyVecEnv.


In [45]:
model.learn(total_timesteps=60000)

---------------------------------
| rollout/           |          |
|    ep_len_mean     | 10.6     |
|    ep_rew_mean     | -1.66    |
| time/              |          |
|    fps             | 87       |
|    iterations      | 1        |
|    time_elapsed    | 23       |
|    total_timesteps | 2048     |
---------------------------------
-----------------------------------------
| rollout/                |             |
|    ep_len_mean          | 9.77        |
|    ep_rew_mean          | -1.17       |
| time/                   |             |
|    fps                  | 81          |
|    iterations           | 2           |
|    time_elapsed         | 50          |
|    total_timesteps      | 4096        |
| train/                  |             |
|    approx_kl            | 0.009826029 |
|    clip_fraction        | 0.0713      |
|    clip_range           | 0.2         |
|    entropy_loss         | -1.94       |
|    explained_variance   | 0.00728     |
|    learning_rate        | 0.

<stable_baselines3.ppo.ppo.PPO at 0x7d9008c30b20>

In [46]:
def agent1(obs, config):
    # Use the best model to select a column
    col, _ = model.predict(np.array(obs['board']).reshape(1, 6,7))
    # Check if selected column is valid
    is_valid = (obs['board'][int(col)] == 0)
    # If not valid, select random move. 
    if is_valid:
        return int(col)
    else:
        return random.choice([col for col in range(config.columns) if obs.board[int(col)] == 0])

In [47]:
# Create the game environment
env = make("connectx")

# Two random agents play one game round
env.run([agent1, agent_q2])

# Show the game
env.render(mode="ipython")

In [48]:
get_win_percentages(agent1=agent1, agent2=agent_q2)

Agent 1 Win Percentage: 0.15
Agent 2 Win Percentage: 0.84
Number of Invalid Plays by Agent 1: 0
Number of Invalid Plays by Agent 2: 0
