In [4]:
from kaggle_environments import evaluate, make, utils
from random import choice
import random
import numpy as np

In [5]:
def block_check_agent(observation, configuration):

    ROWS = configuration.rows
    COLS = configuration.columns
    INAROW = configuration.inarow
    board = observation.board
    player = observation.mark

    def get_next_open_row(col):
        for row in reversed(range(ROWS)):
            idx = row * COLS + col
            if board[idx] == 0:
                return idx
        return None

    def drop_piece(b, col, player_mark):
        b_copy = b.copy()
        row_idx = get_next_open_row(col)
        if row_idx is not None:
            b_copy[row_idx] = player_mark
            return b_copy
        return None

    def is_winning_move(b, mark):
        # Check all directions for win
        for row in range(ROWS):
            for col in range(COLS):
                idx = row * COLS + col
                # Horizontal
                if col + INAROW <= COLS:
                    if all(b[row * COLS + col + i] == mark for i in range(INAROW)):
                        return True
                # Vertical
                if row + INAROW <= ROWS:
                    if all(b[(row + i) * COLS + col] == mark for i in range(INAROW)):
                        return True
                # Diagonal /
                if row - INAROW + 1 >= 0 and col + INAROW <= COLS:
                    if all(b[(row - i) * COLS + col + i] == mark for i in range(INAROW)):
                        return True
                # Diagonal \
                if row + INAROW <= ROWS and col + INAROW <= COLS:
                    if all(b[(row + i) * COLS + col + i] == mark for i in range(INAROW)):
                        return True
        return False

    # Try to win
    for col in range(COLS):
        if get_next_open_row(col) is not None:
            new_board = drop_piece(board, col, player)
            if new_board and is_winning_move(new_board, player):
                return col

    # Try to block opponent
    opponent = 1 if player == 2 else 2
    for col in range(COLS):
        if get_next_open_row(col) is not None:
            new_board = drop_piece(board, col, opponent)
            if new_board and is_winning_move(new_board, opponent):
                return col

    # Otherwise pick random valid column
    valid_columns = [col for col in range(COLS) if get_next_open_row(col) is not None]
    return random.choice(valid_columns)


In [3]:
env = make("connectx", debug=True)
env.render()

In [4]:
env.reset()
env.render(mode="ipython", width=500, height=450)

In [6]:
def mean_reward(rewards):
    return sum(r[0] for r in rewards) / float(len(rewards))


In [6]:
print("Block check agent vs Random Agent:", mean_reward(evaluate("connectx", [block_check_agent, "random"], num_episodes=100)))
print("Block check agent vs Negamax Agent:", mean_reward(evaluate("connectx", [block_check_agent, "negamax"], num_episodes=100)))


Block check agent vs Random Agent: 0.94
Block check agent vs Negamax Agent: -0.27


In [7]:
import torch
import torch.nn as nn
import torch.nn.functional as F
from collections import deque
import random
import gym
from Alphabeta_agent import alphabeta_agent

In [8]:
class ConnectFourGym(gym.Env):
    def __init__(self, agent2=alphabeta_agent):
        super(ConnectFourGym, self).__init__()
        
        ks_env = make("connectx", debug=True)
        self.env = ks_env.train([None, agent2])
        self.rows = ks_env.configuration.rows
        self.columns = ks_env.configuration.columns
        self.inarow = ks_env.configuration.inarow

        # Action and observation space
        self.action_space = gym.spaces.Discrete(self.columns)
        self.observation_space = gym.spaces.Box(low=0, high=2, 
                                                shape=(1, self.rows, self.columns), dtype=np.int32)

        self.reward_range = (-10, 1)
        self.spec = None
        self.metadata = None

    def reset(self):
        self.obs = self.env.reset()
        board = np.array(self.obs['board']).reshape(1, self.rows, self.columns)
        return board.astype(np.float32)  

    def step(self, action):
        # Check if the move is valid 
        is_valid = self.obs['board'][action] == 0
        if is_valid:
            self.obs, old_reward, done, _ = self.env.step(action)
            reward = self._custom_reward(old_reward, done)
        else:
            # Penalize invalid moves
            reward, done, _ = -10, True, {}

        board = np.array(self.obs['board']).reshape(1, self.rows, self.columns).astype(np.float32)
        return board, reward, done, _

    def _custom_reward(self, old_reward, done):
        if old_reward == 1:      # Win
            return 1.0
        elif done:               # Loss
            return -1.0
        else:                    # Neutral move
            return 1.0 / (self.rows * self.columns)

In [9]:
class DQNCNN(nn.Module):
    def __init__(self, input_shape, n_actions):
        super(DQNCNN, self).__init__()
        c, h, w = input_shape  # shape: (1, 6, 7)
        
        self.conv1 = nn.Conv2d(c, 64, kernel_size=4, stride=1)
        self.conv2 = nn.Conv2d(64, 128, kernel_size=2, stride=1)
        self.fc1 = nn.Linear(self._get_conv_output_size(c, h, w), 128)
        self.fc2 = nn.Linear(128, n_actions)

    def _get_conv_output_size(self, c, h, w):
        x = torch.zeros(1, c, h, w)
        x = self.conv1(x)
        x = self.conv2(x)
        return int(np.prod(x.size()))

    def forward(self, x):
        x = F.relu(self.conv1(x))      # [B, 64, H-3, W-3]
        x = F.relu(self.conv2(x))      # [B, 128, H-4, W-4]
        x = x.view(x.size(0), -1)      # flatten
        x = F.relu(self.fc1(x))
        return self.fc2(x)    

In [10]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)

    def push(self, state, action, reward, next_state, done):
        self.buffer.append((state, action, reward, next_state, done))

    def sample(self, batch_size):
        transitions = random.sample(self.buffer, batch_size)
        states, actions, rewards, next_states, dones = zip(*transitions)
        return (np.array(states), actions, rewards, np.array(next_states), dones)

    def __len__(self):
        return len(self.buffer)

In [23]:
import torch.optim as optim

def train_dql(env, episodes=10000, gamma=0.99, epsilon=1.0, epsilon_decay=0.9999, 
              min_epsilon=0.001, batch_size=64, buffer_size=10_000, target_update=10):

    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    input_shape = (1, env.rows, env.columns)
    n_actions = env.columns

    policy_net = DQNCNN(input_shape, n_actions).to(device)
    target_net = DQNCNN(input_shape, n_actions).to(device)
    target_net.load_state_dict(policy_net.state_dict())
    target_net.eval()

    optimizer = optim.Adam(policy_net.parameters(), lr=0.0005)
    replay_buffer = ReplayBuffer(buffer_size)

    for episode in range(episodes):
        state = env.reset()
        total_reward = 0
        done = False

        while not done:
            state_tensor = torch.tensor(state, dtype=torch.float32).unsqueeze(0).to(device)

            # ε-greedy action selection
            if random.random() < epsilon:
                action = random.randint(0, n_actions - 1)
            else:
                with torch.no_grad():
                    q_values = policy_net(state_tensor)
                    action = q_values.argmax().item()

            next_state, reward, done, _ = env.step(action)
            total_reward += reward
            replay_buffer.push(state, action, reward, next_state, done)
            state = next_state

            # Sample and train
            if len(replay_buffer) >= batch_size:
                states, actions, rewards, next_states, dones = replay_buffer.sample(batch_size)

                states_tensor = torch.tensor(states, dtype=torch.float32).to(device)
                actions_tensor = torch.tensor(actions).unsqueeze(1).to(device)
                rewards_tensor = torch.tensor(rewards, dtype=torch.float32).unsqueeze(1).to(device)
                next_states_tensor = torch.tensor(next_states, dtype=torch.float32).to(device)
                dones_tensor = torch.tensor(dones, dtype=torch.float32).unsqueeze(1).to(device)

                q_values = policy_net(states_tensor).gather(1, actions_tensor)
                next_q_values = target_net(next_states_tensor).max(1)[0].detach().unsqueeze(1)
                target = rewards_tensor + gamma * next_q_values * (1 - dones_tensor)

                loss = F.mse_loss(q_values, target)
                optimizer.zero_grad()
                loss.backward()
                optimizer.step()

        # Decay epsilon
        epsilon = max(min_epsilon, epsilon * epsilon_decay)

        # Update target network
        if episode % target_update == 0:
            target_net.load_state_dict(policy_net.state_dict())

        print(f"Episode {episode+1}, Total reward: {total_reward}, Epsilon: {epsilon:.4f}")

    # Save trained policy
    torch.save(policy_net.state_dict(), "dql_minimax.pt")


In [17]:
def create_dql_agent(model_path, input_shape=(1, 6, 7), n_actions=7):
    model = DQNCNN(input_shape, n_actions)
    model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu')))
    model.eval()

    def agent(obs, config):
        board = np.array(obs['board']).reshape(1, config.rows, config.columns)
        board_tensor = torch.tensor(board, dtype=torch.float32).unsqueeze(0)

        with torch.no_grad():
            q_values = model(board_tensor)
            action = q_values.argmax().item()

        valid_moves = [c for c in range(config.columns) if obs['board'][c] == 0]
        if action not in valid_moves:
            action = random.choice(valid_moves)

        return action

    return agent

In [13]:
dql_agent_3 = create_dql_agent("dql_alphabeta_model_3.pt")
dql_agent_2 = create_dql_agent("dql_cnn_connectx_model_2.pt")
dql_agent_1 = create_dql_agent("dql_cnn_connectx_model_1.pt")
dql_agent_b = create_dql_agent("dql_blockcheck.pt")
dql_agent_m = create_dql_agent("dql_minimax.pt")

  model.load_state_dict(torch.load(model_path, map_location=torch.device('cpu')))


In [24]:
env = ConnectFourGym(agent2=block_check_agent)
train_dql(env, episodes=30000)


Episode 1, Total reward: -0.7857142857142857, Epsilon: 0.9999
Episode 2, Total reward: -0.6904761904761905, Epsilon: 0.9998
Episode 3, Total reward: -0.7380952380952381, Epsilon: 0.9997
Episode 4, Total reward: -0.7857142857142857, Epsilon: 0.9996
Episode 5, Total reward: -0.9047619047619048, Epsilon: 0.9995
Episode 6, Total reward: -0.8333333333333334, Epsilon: 0.9994
Episode 7, Total reward: -0.8095238095238095, Epsilon: 0.9993
Episode 8, Total reward: -0.7380952380952381, Epsilon: 0.9992
Episode 9, Total reward: -0.8809523809523809, Epsilon: 0.9991
Episode 10, Total reward: -9.666666666666666, Epsilon: 0.9990
Episode 11, Total reward: -0.7380952380952381, Epsilon: 0.9989
Episode 12, Total reward: -0.8333333333333334, Epsilon: 0.9988
Episode 13, Total reward: -0.7619047619047619, Epsilon: 0.9987
Episode 14, Total reward: -0.7857142857142857, Epsilon: 0.9986
Episode 15, Total reward: -0.9285714285714286, Epsilon: 0.9985
Episode 16, Total reward: -0.8571428571428572, Epsilon: 0.9984
Ep

In [17]:
trained_policy = DQNCNN((1, 6, 7), 7)
trained_policy.load_state_dict(torch.load("dql_cnn_connectx_model_2.pt", map_location=torch.device('cpu')))
trained_policy.eval()

DQNCNN(
  (conv1): Conv2d(1, 64, kernel_size=(4, 4), stride=(1, 1))
  (conv2): Conv2d(64, 128, kernel_size=(2, 2), stride=(1, 1))
  (fc1): Linear(in_features=768, out_features=128, bias=True)
  (fc2): Linear(in_features=128, out_features=7, bias=True)
)

In [None]:
env = make("connectx", debug=True)
env.reset()

# Play one episode: DQL agent goes first, Negamax goes second
env.run([dql_agent_1, "negamax"])

env.render(mode="ipython")

In [14]:
def get_win_percentages(agent1, agent2, n_rounds=100):
    config = {'rows': 6, 'columns': 7, 'inarow': 4}
    
    # Agent 1 goes first half the time
    outcomes = evaluate("connectx", [agent1, agent2], config, [], n_rounds//2)
    
    # Agent 2 goes first the other half 
    outcomes += [[b,a] for [a,b] in evaluate("connectx", [agent2, agent1], config, [], n_rounds-n_rounds//2)]

    total = len(outcomes)
    agent1_wins = outcomes.count([1, -1])
    agent2_wins = outcomes.count([-1, 1])
    ties = outcomes.count([0, 0])
    invalid_1 = outcomes.count([None, 0])
    invalid_2 = outcomes.count([0, None])

    print(f"Total games: {total}")
    print(f"Agent 1 Win Percentage: {agent1_wins / total:.2%}")
    print(f"Agent 2 Win Percentage: {agent2_wins / total:.2%}")
    print(f"Tie Percentage: {ties / total:.2%}")
    print(f"Invalid plays by Agent 1: {invalid_1}")
    print(f"Invalid plays by Agent 2: {invalid_2}")


In [26]:
get_win_percentages(dql_agent_b, dql_agent_2, n_rounds=200)

Total games: 200
Agent 1 Win Percentage: 100.00%
Agent 2 Win Percentage: 0.00%
Tie Percentage: 0.00%
Invalid plays by Agent 1: 0
Invalid plays by Agent 2: 0
