In [None]:
import gymnasium as gym
from gymnasium import spaces
import numpy as np
import itertools

from stable_baselines3 import PPO
from stable_baselines3.common.env_util import make_vec_env
from stable_baselines3.common.utils import set_random_seed
from stable_baselines3.common.callbacks import BaseCallback, CheckpointCallback
import copy
import random, math
import os
import torch as th
from torch import nn
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor

In [None]:
import math
import numpy as np
import pexpect
import ctypes

class Connect4Solver:
    !pip install stable_baselines3
    !git clone https://github.com/TonyCongqianWang/connect4_solver_fork.git && cd connect4_solver_fork && make
    !curl -L https://github.com/PascalPons/connect4/releases/download/book/7x6.book --output 7x6.book
    solver_path='./connect4_solver_fork/c4solver_c_interface.so'
    solver_lib = ctypes.CDLL(solver_path)
            
    solver_lib.solver_init.argtypes = [ctypes.c_char_p]
    solver_lib.solver_init.restype = ctypes.POINTER(ctypes.c_void_p)
    
    solver_lib.solver_delete.argtypes = [ctypes.POINTER(ctypes.c_void_p)]
    solver_lib.solver_delete.restype = None
    
    solver_lib.solver_solve.argtypes = [ctypes.POINTER(ctypes.c_void_p), ctypes.c_char_p, ctypes.c_bool, ctypes.c_bool, ctypes.c_char_p, ctypes.c_size_t]
    solver_lib.solver_solve.restype = ctypes.c_char_p
    def __init__(self):
        """
        Initializes the Connect4Solver with the path to the solver executable.

        Args:
            solver_path (str): Path to the Connect4 solver executable.
        """
        self.MAX_SCORE = 24
        self.handle = Connect4Solver.solver_lib.solver_init(None)
        self.result_buffer = ctypes.create_string_buffer(256)

    def __del__(self):
        """
        Destructor that sends EOF to the solver process.
        """
        if hasattr(self, 'child') and self.child is not None:
            try:
                self.child.sendeof()
            except:
                pass

    def _process_output(self, prompt_str, answer_str):
        """
        Processes the output from the solver.

        Args:
            prompt_str (str): The prompt string.
            answer_str (str): The answer string.

        Returns:
            list: List of floats representing the processed output.
        """
        if answer_str.startswith(prompt_str):
            answer_str = answer_str[len(prompt_str):].strip()
            
        answer_list = [float(x) for x in answer_str.split()]
        return answer_list

    def _softmax(self, x, temperature=1.0):
        """
        Calculates a modified softmax that approaches argmax for small temperatures.

        For very small temperatures, indices with the maximum value will receive
        equal probability, and the rest will receive 0.

        Args:
            x (list): List of values.
            temperature (float): Temperature parameter for softmax.

        Returns:
            list: List of probabilities.
        """
        if temperature <= 1e-5:  # Consider a very small temperature as argmax
            max_val = max(x)
            max_indices = [i for i, val in enumerate(x) if val == max_val]
            probabilities = [0.0] * len(x)
            prob = 1.0 / len(max_indices)
            for i in max_indices:
                probabilities[i] = prob
            return probabilities
        else:
            e_x = []
            for i in x:
                # Clipping to prevent overflow for large positive values
                exponent = i / temperature
                if exponent > 100:  # Or a suitable large value
                    e_x.append(float('inf'))
                elif exponent < -100:
                    e_x.append(0.0)
                else:
                    e_x.append(math.exp(exponent))

            sum_e_x = sum(e_x)
            if sum_e_x == 0:
                return ([1.0] * len(x)) / len(x)
            return [e / sum_e_x for e in e_x]

    def _transform_and_softmax(self, data, score_offset, temperature):
        """
        Transforms and calculates the softmax of the data.

        Args:
            data (list): List of data values.
            temperature (float): Temperature parameter for softmax.

        Returns:
            list: List of softmax probabilities.
        """
        transformed_data = []
        for x in data:
            sign = 1 if x > 0 else -1 if x < 0 else 0
            if x > -1000:
                transformed_x = sign * ((abs(x) + score_offset) / self.MAX_SCORE * 5)
            else:
                transformed_x = -1000
            transformed_data.append(transformed_x)
        #print(transformed_data)
        return self._softmax(transformed_data, temperature)

    def _random_index(self, softmax_probs):
        """
        Selects a random index based on softmax probabilities.

        Args:
            softmax_probs (list): List of softmax probabilities.

        Returns:
            int: Selected index.
        """
        selected_index = np.random.choice(len(softmax_probs), p=softmax_probs)
        return selected_index

    def get_solver_move(self, move_str, temperature=1.0):
        """
        Gets a move from the solver.

        Args:
            move_str (str): Move string to send to the solver.
            temperature (float): Temperature parameter for softmax.

        Returns:
            int: Selected move index.
        """
        try:
            result = Connect4Solver.solver_lib.solver_solve(self.handle, move_str.encode("utf-8"), False, True, self.result_buffer, 256)
            answer = result.decode()
            score_offset = math.floor(len(move_str) / 2)
            probas = self._transform_and_softmax(self._process_output(move_str, answer), score_offset, temperature)
            #print(f"{answer}")
            #print(probas)
            return self._random_index(probas)
        except Exception as e:
            print(f"{e}")
            print(f"{answer}")
        return 0

In [None]:
import zipfile

def zip_directories(directory_paths, working_dir):
    """
    Zips the given directories into their parent directory.

    Args:
        directory_paths (list): A list of paths to directories.

    Returns:
        list: A list of paths to the created zip files.
    """
    os.makedirs(working_dir, exist_ok=True)
    
    zip_file_paths = []
    for dir_path in directory_paths:
        if not os.path.isdir(dir_path):
            print(f"Warning: {dir_path} is not a directory. Skipping.")
            continue

        parent_dir = working_dir
        dir_name = os.path.basename(dir_path)
        zip_file_path = os.path.join(parent_dir, f"{dir_name}.zip")

        try:
            with zipfile.ZipFile(zip_file_path, 'w', zipfile.ZIP_DEFLATED) as zipf:
                for root, _, files in os.walk(dir_path):
                    for file in files:
                        file_path = os.path.join(root, file)
                        relative_path = os.path.relpath(file_path, dir_path)
                        zipf.write(file_path, relative_path)
            zip_file_paths.append(zip_file_path)
        except Exception as e:
            print(f"Error zipping {dir_path}: {e}")

    return zip_file_paths

In [None]:
class ConnectFourEnv(gym.Env):
    metadata = {"render_modes": ["human", "ansi", "rgb_array"], "render_fps": 1}
    def __init__(self, render_mode=None, board_rows=6, board_cols=7):
        super(ConnectFourEnv, self).__init__()
        self.board_rows = board_rows
        self.board_cols = board_cols
        self.action_space = spaces.Discrete(self.board_cols)  # Columns to drop a piece
        self.observation_space = spaces.Box(low=0, high=255, shape=(2, self.board_rows, self.board_cols), dtype=np.uint8)  # two binary matrices. one for each players stones
        self.render_mode = render_mode
        self.move_history = ""
        
        self.reset()

    def reset(self, seed=None, options=None):
        super().reset(seed=seed, options=options)
        self.board = np.zeros((self.board_rows, self.board_cols), dtype=np.int8)
        self.player = 1  # Player 1 starts
        self.done = False
        self.winner = None
        self.turns = 0
        self.move_history_str = ""
        info = {}
        return self._get_observation(), info

    def step(self, action):
        if self.done:
            return self._get_observation(), 0, True, False, {}

        if not self._is_valid_move(action):
            return self._get_observation(), -50, False, False, {}

        self._drop_piece(action)
        self.move_history_str += str(action + 1)
        self.turns += 1

        if self._check_win():
            self.done = True
            self.winner = self.player
            reward = 80 + 20 * (len(self.board.flatten()) - self.turns) / len(self.board.flatten())
        elif self._check_draw():
            self.done = True
            reward = 0
        else:
            reward = 0
        self.player *= -1  # Switch players
        return  self._get_observation(), reward, self.done, False, {}

    def get_valid_moves(self):
        valid_moves = []
        for col in range(self.board_cols):
            if self._is_valid_move(col):
                valid_moves.append(col)
        return valid_moves

    def _get_observation(self):
        m, n = self.board.shape
        player_perspective = self.board * self.player
        new_array = np.zeros((2, m, n), dtype=np.uint8)
        new_array[0, :, :] = 255 * (player_perspective == 1).astype(np.uint8)
        new_array[1, :, :] = 255 * (player_perspective == -1).astype(np.uint8)
        return new_array

    def _is_valid_move(self, col):
        return self.board[0, col] == 0

    def _drop_piece(self, col):
        for row in range(self.board_rows - 1, -1, -1):
            if self.board[row, col] == 0:
                self.board[row, col] = self.player
                return

    def _check_win(self):
        # Check horizontal, vertical, and diagonal wins
        for r in range(self.board_rows):
            for c in range(self.board_cols - 3):
                if (
                    self.board[r, c] == self.board[r, c + 1] == self.board[r, c + 2] == self.board[r, c + 3] != 0
                ):
                    return True

        for c in range(self.board_cols):
            for r in range(self.board_rows - 3):
                if (
                    self.board[r, c] == self.board[r + 1, c] == self.board[r + 2, c] == self.board[r + 3, c] != 0
                ):
                    return True

        for r in range(self.board_rows - 3):
            for c in range(self.board_cols - 3):
                if (
                    self.board[r, c] == self.board[r + 1, c + 1] == self.board[r + 2, c + 2] == self.board[r + 3, c + 3] != 0
                ):
                    return True

        for r in range(3, self.board_rows):
            for c in range(self.board_cols - 3):
                if (
                    self.board[r, c] == self.board[r - 1, c + 1] == self.board[r - 2, c + 2] == self.board[r - 3, c + 3] != 0
                ):
                    return True
        return False

    def _check_draw(self):
        return np.all(self.board != 0)

    def render(self):
        board_str = ""
        board_str += "-" * (self.board_cols * 2 + 3) + "\n"
        for row in self.board:
            board_str += "| "
            for cell in row:
                if cell == 1:
                    board_str += "x "
                elif cell == -1:
                    board_str += "o "
                else:
                    board_str += "  "
            board_str += "|\n"
        board_str += "-" * (self.board_cols * 2 + 3)
        print(board_str)

In [None]:
from collections import defaultdict
import pandas as pd

def calculate_elo(rating_a, rating_b, score_a):
    """Calculates the new Elo ratings for two players."""
    expected_a = 1 / (1 + 10**((rating_b - rating_a) / 400))
    expected_b = 1 / (1 + 10**((rating_a - rating_b) / 400))
    k = 32
    new_rating_a = rating_a + k * (score_a - expected_a)
    new_rating_b = rating_b + k * ((1 - score_a) - expected_b)
    return new_rating_a, new_rating_b

def evaluate_agents(agent_paths, num_episodes=100):
    """Evaluates multiple trained PPO agents and calculates Elo scores."""
    env = ConnectFourEnv()
    print(f"{agent_paths=}")
    agents = [PPO.load(path) for path in agent_paths]
    solver = Connect4Solver()
    solver_temps = [0.0, 0.25, 0.5, 1.0]
    agents += solver_temps
    agent_names = [path.split("/")[-1] for path in agent_paths] + [f"Solver_{t}" for t in solver_temps]
    num_agents = len(agents)

    results = defaultdict(lambda: defaultdict(lambda: {"wins_first": 0, "draws_first": 0, "loses_first": 0}))
    elo_ratings = [1200] * num_agents

    # Generate all possible matchups
    matchups = [(i, j) for i in range(num_agents) for j in range(num_agents)]  # Add self-play

    for episode in range(num_episodes):
        random.shuffle(matchups) #shuffle matchups for each episode.
        for agent1_index, agent2_index in matchups:
            agent1 = agents[agent1_index]
            agent2 = agents[agent2_index]

            agent1_wins_first = 0
            agent1_draws_first = 0
            agent1_loses_first = 0

            obs, _ = env.reset()
            done = False

            while not done:
                current_player = env.player
                if current_player == 1:
                    if type(agent1) == float:
                        action = solver.get_solver_move(env.move_history_str, agent1)
                    else:
                        action, _ = agent1.predict(obs, deterministic=False)
                else:
                    if type(agent2) == float:
                        action = solver.get_solver_move(env.move_history_str, agent2)
                    else:
                        action, _ = agent2.predict(obs, deterministic=False)

                valid_moves = env.get_valid_moves()
                if action not in valid_moves:
                    action = random.choice(valid_moves)
                obs, reward, done, truncated, _ = env.step(action)

                if done or truncated:
                    if reward > 0:
                        if current_player == 1:
                            agent1_wins_first = 1
                        else:
                            agent1_loses_first = 1
                    else:
                        agent1_draws_first = 1
                    break

            # Calculate Elo. Don't change Elo for self-play
            score_agent1 = (agent1_wins_first + 0.5 * agent1_draws_first)
            if agent1_index != agent2_index:
                elo_ratings[agent1_index], elo_ratings[agent2_index] = calculate_elo(
                    elo_ratings[agent1_index], elo_ratings[agent2_index], score_agent1
                )

            # Update results
            results[agent1_index][agent2_index]["wins_first"] += agent1_wins_first
            results[agent1_index][agent2_index]["draws_first"] += agent1_draws_first
            results[agent1_index][agent2_index]["loses_first"] += agent1_loses_first
        print(f"Episode {episode + 1} / 100 done.")

    env.close()

    sorted_agents = sorted(range(num_agents), key=lambda i: elo_ratings[i], reverse=True)

    columns = ["Agent", "Elo"] + [f"Agent {i}" for i in range(num_agents)]
    data = []

    for agent1_index in sorted_agents:
        agent_name = agent_names[agent1_index]
        row = [agent_name, f"{elo_ratings[agent1_index]:.2f}"]
        for agent2_index in sorted_agents:
            data_dict = results[agent1_index][agent2_index]
            row.append(f"{data_dict['wins_first']} / {data_dict['draws_first']} / {data_dict['loses_first']}")
        data.append(row)

    df = pd.DataFrame(data, columns=columns)
    return results, df

In [None]:
try:
    agent_dir = "/kaggle/input/connect-4-agents/"
    agent_files = [f for f in os.listdir(agent_dir)]
    agent_paths = [os.path.join(agent_dir, f) for f in agent_files]
    agent_paths = zip_directories(agent_paths, "/kaggle/working/agents")
except:
    agent_paths = []

In [None]:
table, df = evaluate_agents(agent_paths, num_episodes=100)

In [None]:
df