In [1]:
# !pip3 install kaggle-environments
# !pip3 install stable-baselines3
# !pip3 install numpy
# !pip3 install torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118
import torch as th
import torch.nn as nn

from stable_baselines3 import PPO
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor
from stable_baselines3.common.evaluation import evaluate_policy
from stable_baselines3.common.env_checker import check_env
import time

In [34]:
from kaggle_environments import evaluate, make
def my_agent(obs, config):
    import random
    import numpy as np

    # Helper function for score_move: gets board at next step if agent drops piece in selected column
    def drop_piece(grid, col, mark, self):
        next_grid = grid.copy()
        for row in range(self.rows-1, -1, -1):
            if next_grid[row][col] == 0:
                break
        next_grid[row][col] = mark
        return next_grid

    # Helper function for score_move: calculates value of heuristic for grid
    def get_heuristic(grid, mark, config):
        list_my_pos = [2,3,4]
        my_pos_weights = [1,2,10000]
        list_opp_pos = [2,3,4]
        opp_pos_weights = [-1, -100, -1000]
        num_my_pos, num_opp_pos = count_windows(grid, list_my_pos, list_opp_pos, mark, config)
        score = np.sum(np.array(num_my_pos) * np.array(my_pos_weights)) + np.sum(np.array(num_opp_pos) * np.array(opp_pos_weights))
        return score

    # Helper function for get_heuristic: checks if window satisfies heuristic conditions
    def check_window(window, num_discs, piece, config):
        return (window.count(piece) == num_discs and window.count(0) == config.inarow-num_discs)
    
    # Helper function for get_heuristic: counts number of windows satisfying specified heuristic conditions
    def count_windows(grid, list_my_pos, list_opp_pos, piece, config):
        num_windows = [0] * len(list_my_pos)
        num_opp_windows = [0] * len(list_opp_pos)
        # horizontal
        for row in range(config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[row, col:col+config.inarow])
                for i,setup in enumerate(list_my_pos):
                    if check_window(window, setup, piece, config):
                        num_windows[i] += 1
                for i,setup in enumerate(list_opp_pos):
                    if check_window(window, setup, piece%2+1, config):
                        num_opp_windows[i] += 1
        # vertical
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns):
                window = list(grid[row:row+config.inarow, col])
                for i,setup in enumerate(list_my_pos):
                    if check_window(window, setup, piece, config):
                        num_windows[i] += 1
                for i,setup in enumerate(list_opp_pos):
                    if check_window(window, setup, piece%2+1, config):
                        num_opp_windows[i] += 1
        # positive diagonal
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row+config.inarow), range(col, col+config.inarow)])
                for i,setup in enumerate(list_my_pos):
                    if check_window(window, setup, piece, config):
                        num_windows[i] += 1
                for i,setup in enumerate(list_opp_pos):
                    if check_window(window, setup, piece%2+1, config):
                        num_opp_windows[i] += 1
        # negative diagonal
        for row in range(config.inarow-1, config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
                for i,setup in enumerate(list_my_pos):
                    if check_window(window, setup, piece, config):
                        num_windows[i] += 1
                for i,setup in enumerate(list_opp_pos):
                    if check_window(window, setup, piece%2+1, config):
                        num_opp_windows[i] += 1
        return num_windows, num_opp_windows
    
    # Uses minimax to calculate value of dropping piece in selected column
    def score_move(grid, col, mark, config, nsteps):
        next_grid = drop_piece(grid, col, mark, config)
        score = minimax(next_grid, nsteps-1, False, mark, config, -1000, 10000)
        return score

    # Helper function for minimax: checks if agent or opponent has four in a row in the window
    def is_terminal_window(window, config):
        return window.count(1) == config.inarow or window.count(2) == config.inarow

    # Helper function for minimax: checks if game has ended
    def is_terminal_node(grid, config):
        # Check for draw 
        if list(grid[0, :]).count(0) == 0:
            return True
        # Check for win: horizontal, vertical, or diagonal
        # horizontal 
        for row in range(config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[row, col:col+config.inarow])
                if is_terminal_window(window, config):
                    return True
        # vertical
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns):
                window = list(grid[row:row+config.inarow, col])
                if is_terminal_window(window, config):
                    return True
        # positive diagonal
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row+config.inarow), range(col, col+config.inarow)])
                if is_terminal_window(window, config):
                    return True
        # negative diagonal
        for row in range(config.inarow-1, config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
                if is_terminal_window(window, config):
                    return True
        return False

    # Minimax implementation
    def minimax(node, depth, maximizingPlayer, mark, config, alpha, beta):
        is_terminal = is_terminal_node(node, config)
        valid_moves = [c for c in range(config.columns) if node[0][c] == 0]
        if depth == 0 or is_terminal:
            return get_heuristic(node, mark, config)
        if maximizingPlayer:
            value = -np.Inf
            for col in valid_moves:
                child = drop_piece(node, col, mark, config)
                value = max(value, minimax(child, depth-1, False, mark, config, alpha, beta))
#                 print("player move",value, beta, value > beta, "depth", depth)
                if value >= beta:
                    alpha = max(value, alpha)
#                     print(alpha, beta, value)
                    break
            return value
        else:
            value = np.Inf
            for col in valid_moves:
                child = drop_piece(node, col, mark%2+1, config)
                value = min(value, minimax(child, depth-1, True, mark, config, alpha, beta))
#                 print("opp move",value, alpha, value < alpha, "depth", depth)
                if value <= alpha:
                    beta = min(beta, value)
                    break
            return value
    # Get list of valid moves
    N_STEPS = 2
    valid_moves = [c for c in range(config.columns) if obs.board[c] == 0]
    # Convert the board to a 2D grid
    grid = np.asarray(obs.board).reshape(config.rows, config.columns)
    # Use the heuristic to assign a score to each possible board in the next step
    scores = dict(zip(valid_moves, [score_move(grid, col, obs.mark, config, N_STEPS) for col in valid_moves]))
    # Get a list of columns (moves) that maximize the heuristic
    max_cols = [key for key in scores.keys() if scores[key] == max(scores.values())]
    # Select at random from the maximizing columns
    return random.choice(max_cols)

In [4]:
# hyper parameters 
learning_rate=3e-4
batch_size=128
device="cuda" if th.cuda.is_available() else "cpu"
stop = False
total_timesteps = 100000
outfile = "PPO_progress.txt"
no_imporvement_count = 0
max_loops = 100
starting_loops = 1
n_envs = 6
loops = starting_loops
current_env_name = ["random", "my_agent"]
env_state = ["random", my_agent]
state = 0


In [5]:
import numpy as np
import pandas as pd
import gymnasium as gym
import matplotlib.pyplot as plt
%matplotlib inline
from kaggle_environments import make, evaluate
from stable_baselines3.common.vec_env.subproc_vec_env import SubprocVecEnv
from gymnasium import spaces
    

def get_win_percentages(agent1, agent2, n_rounds=150):
    # Use default Connect Four setup
    config = {'rows': 6, 'columns': 7, 'inarow': 4} 
    outcomes = evaluate("connectx", [agent1, agent2], config, [], n_rounds//2)
    win = np.round(outcomes.count([1,-1])/len(outcomes), 2)
    return win, outcomes.count([None, 0])


class ConnectFourGym(gym.Env):
    def __init__(self, agent2="random"):
        ks_env = make("connectx", debug=True)
        self.env = ks_env.train([None, agent2])
        self.rows = ks_env.configuration.rows
        self.columns = ks_env.configuration.columns
        self.inarow = ks_env.configuration.inarow
        self.offset = 1
        self.wins_blocked_row = []
        self.wins_blocked_col = []
        self.wins_blocked_pos_diag = []
        self.wins_blocked_neg_diag = []
        # Learn about spaces here: http://gym.openai.com/docs/#spaces
        self.action_space = spaces.Discrete(self.columns)
        self.observation_space = spaces.Box(low=0, high=2, 
                                            shape=(1,self.rows,self.columns), dtype=int)
        # Tuple corresponding to the min and max possible rewards
        self.reward_range = (-50, 10)
        # StableBaselines throws error if these are not defined
        self.spec = None
        self.metadata = None
        self.last_action = -1
        
    def drop_piece(self, grid, action, piece):
        next_grid = grid.copy()
        for row in range(self.rows-1, -1, -1):
            if next_grid[row][action] == 0:
                break
        next_grid[row][action] = piece
        return next_grid, row + 1
    
    def blocked_winning_move(self, grid, action_col, piece):
        # Convert the board to a 2D grid
        next_grid, action_row = self.drop_piece(grid, action_col, piece)
        # horizontal
        if action_row not in self.wins_blocked_row:
            for col in range(self.columns-(self.inarow-1)):
                window = list(next_grid[action_row,col:col+self.inarow])
                if window.count(piece) == self.inarow - self.offset and window.count(piece%2+1) == self.offset:
                    self.wins_blocked_row.append(action_row)
                    return True
        # vertical
        if action_col not in self.wins_blocked_col:
            for row in range(self.rows-(self.inarow-1)):
                window = list(next_grid[row:row+self.inarow,action_col])
                if window.count(piece) == self.inarow - self.offset and window.count(piece%2+1) == self.offset:
                    self.wins_blocked_col.append(action_col)
                    return True
        # positive diagonal
        positive_diag, start_row, start_col = self.find_positive_diagonal(grid, action_row, action_col, piece)
        if positive_diag:
            self.wins_blocked_pos_diag.append((start_row, start_col))
            return positive_diag
    
        negative_diag, start_row, start_col = self.find_negative_diagonal(grid, action_row, action_col, piece)
        if negative_diag:
                self.wins_blocked_neg_diag.append((start_row, start_col))
                return negative_diag
        
        return False
    
    def find_positive_diagonal(self, grid, action_row, action_col, piece):
        min_pos = min(action_col, action_row)
        start_row = action_row - min_pos
        start_col = action_col - min_pos
        if (start_row,start_col) in self.wins_blocked_pos_diag:
            return False, -1, -1 
        steps = self.rows - start_row 
        steps = 0
        temp_row = start_row
        temp_col = start_col
        diagonal_values = []
        while temp_col < self.columns and temp_row < self.rows:
            diagonal_values.append(grid[temp_row, temp_col])
            if len(diagonal_values) == self.inarow:
                if diagonal_values.count(piece) == self.inarow - self.offset and diagonal_values.count(piece%2+1) == self.offset:
                    return True, start_row, start_col
                diagonal_values.pop(0)
            steps += 1
            temp_col +=1 
            temp_row += 1
        return False, -1, -1 
                
    def find_negative_diagonal(self, grid, action_row, action_col, piece):
        min_pos = min(action_col, self.rows - action_row -1)
        start_row = action_row + min_pos
        start_col = action_col - min_pos
        if (start_row,start_col) in self.wins_blocked_neg_diag:
            return False, -1, -1 
        steps = 0
        temp_row = start_row
        temp_col = start_col
        diagonal_values = []
        while temp_col < self.columns and temp_row >= 0:
            diagonal_values.append(grid[temp_row, temp_col])
            if len(diagonal_values) == self.inarow:
                if diagonal_values.count(piece) == self.inarow - self.offset and diagonal_values.count(piece%2+1) == self.offset:
                    return True, start_row, start_col
                diagonal_values.pop(0)
            steps += 1
            temp_col +=1 
            temp_row -= 1
        return False, -1, -1
        
    def reset(self, seed=None):
        super().reset(seed=seed)
        self.obs = self.env.reset()
        self.wins_blocked_row = []
        self.wins_blocked_col = []
        self.wins_blocked_pos_diag = []
        self.wins_blocked_neg_diag = []
        return np.array(self.obs['board']).reshape(1,self.rows,self.columns), self.obs
    
    def change_reward(self, old_reward, done, action, board):
        grid = np.asarray(board).reshape(6, 7)
        blocked = self.blocked_winning_move(grid, action, self.obs.mark%2 + 1)
        if old_reward == 1: # The agent won the game
            return 10
        elif done: # The opponent won the game
            return -10
        elif blocked:
            return 0.5
        else: 
            return 0
           
        
    def step(self, action):
        # Check if agent's move is valid
        is_valid = (self.obs['board'][int(action)] == 0)
        old_reward = None
        if is_valid: # Play the move
            self.obs, old_reward, done, _ = self.env.step(int(action))
            reward = self.change_reward(old_reward, done, int(action), self.obs['board'])
        else: # End the game and penalize agent
            reward, done, _ = -50, True, {}
        self.last_action = int(action)
        return np.array(self.obs['board']).reshape(1,self.rows,self.columns), reward, done, False, self.obs
    

In [6]:
# Neural network for predicting action values
class CustomCNN(BaseFeaturesExtractor):
    
    def __init__(self, observation_space: gym.spaces.Box, features_dim: int=2048):
        super(CustomCNN, self).__init__(observation_space, features_dim)
        # CxHxW images (channels first)
        n_input_channels = observation_space.shape[0]
        self.cnn = nn.Sequential(
            nn.Conv2d(n_input_channels, 32, kernel_size=3, stride=1, padding=0),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=0),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.Conv2d(256, 512, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(512),
            nn.ReLU(),
            nn.Conv2d(512, 1024, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(1024),
            nn.ReLU(),
            nn.Conv2d(1024, 2048, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(2048),
            nn.ReLU(),
            nn.Flatten(),
        )

        # Compute shape by doing one forward pass
        with th.no_grad():
            n_flatten = self.cnn(
                th.as_tensor(observation_space.sample()[None]).float()
            ).shape[1]

        self.linear = nn.Sequential(nn.Linear(n_flatten, features_dim), nn.ReLU())

    def forward(self, observations: th.Tensor) -> th.Tensor:
        return self.linear(self.cnn(observations))

policy_kwargs = dict(
    features_extractor_class=CustomCNN,
)


True

In [7]:
# setup envs
new_env_list = [lambda: ConnectFourGym(agent2=env_state[state])] * n_envs
new_env = SubprocVecEnv(new_env_list)
eval_env = ConnectFourGym(agent2=env_state[state])
current_env = current_env_name[state]

In [9]:
from IPython.display import clear_output
import sys
while not stop and loops < max_loops:
    clear_output(wait=True)
    start_time = time.time()
    
    if loops == starting_loops:
        model = PPO("CnnPolicy", new_env, policy_kwargs=policy_kwargs, verbose=0, device="cuda", learning_rate=learning_rate, batch_size=batch_size)

    model.learn(total_timesteps=total_timesteps,progress_bar=True)
    name = f"PPO_{current_env}_{loops*total_timesteps}_model"
    model.save(name)

    new_env.reset()
    eval_env.reset()
    with th.no_grad():
        with open(outfile,"a") as file1:
            mean_reward, std_reward = evaluate_policy(model, eval_env, n_eval_episodes=150, deterministic=True, warn=False)
            end_time = time.time()
            file1.write(f"\n\nIteration: {loops}\nModel name: {name}\nMean reward vs {current_env}: {mean_reward:.2f} +/- {std_reward}\nIteration duration {end_time-start_time}")
            if mean_reward >= 10.0 and state == 0:
                loops = 1
                current_env = "my_agent"
                file1.write(f"\nChanging opponent from random to min/max agent")
                state = 1
                new_env_list = [lambda: ConnectFourGym(agent2=env_state[state])] * n_envs
                new_env = SubprocVecEnv(new_env_list)
                model.set_env(new_env)
                eval_env = ConnectFourGym(agent2=env_state[state])
            elif mean_reward - std_reward >= 5.0 and state == 1:
                stop = True
    loops+=1
    

Output()