# Reinforcement Learning Agent for Connect X

In this notebook, we'll build a reinforcement learning agent to play Connect X (a variant of Connect Four). We'll start by understanding the game mechanics, create a baseline agent, and then implement advanced RL algorithms like Deep Q-Network (DQN) and Proximal Policy Optimization (PPO).

In [101]:
import numpy as np
import random
import torch
import torch.nn as nn
import torch.optim as optim
import torch.nn.functional as F
from collections import deque, namedtuple
import matplotlib.pyplot as plt
import seaborn as sns

from kaggle_environments import make
from copy import deepcopy

print("Libraries imported successfully!")

Libraries imported successfully!


## Understanding Connect X Environment

Connect X is a classic "connect N in a row" game similar to Connect Four. The environment has the following characteristics:
- Board size: 7 columns × 6 rows (by default)
- Goal: Connect 4 pieces in a row (horizontally, vertically, or diagonally)
- Players: 2 players (Player 1 and Player 2)
- Actions: Drop a piece in any of the 7 columns
- Rewards: -1 for losing, 0 for draw/ongoing, +1 for winning

In [102]:
env = make("connectx")
print("Environment configuration:")
print(f"Columns: {env.configuration.columns}")
print(f"Rows: {env.configuration.rows}")
print(f"Pieces in a row to win: {env.configuration.inarow}")
print(f"Action space: {env.configuration.columns} columns (0 to {env.configuration.columns-1})")

Environment configuration:
Columns: 7
Rows: 6
Pieces in a row to win: 4
Action space: 7 columns (0 to 6)




Let's start by creating a simple random agent to establish a baseline for comparison.

In [103]:
def random_agent(obs, config):
    """
    Random agent that selects a valid column randomly.
    
    Args:
        obs: Observation from the environment
        config: Configuration of the environment
        
    Returns:
        int: Column index to place the piece
    """
    # Get all valid moves (non-full columns)
    valid_moves = [col for col in range(config.columns) if obs.board[col] == 0]
    return random.choice(valid_moves)

# Test the random agent
print("Testing random agent...")
trainer = env.train([None, "random"])  # Play against built-in random agent
obs = trainer.reset()
print(f"Initial board: {obs.board}")
print(f"Our mark: {obs.mark}")

Testing random agent...
Initial board: [0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]
Our mark: 1


##  Create  Baseline Agent

A slightly more sophisticated agent that implements basic heuristics like blocking opponent wins and completing own wins.

In [104]:
def random_agent(obs, config):
    """Random agent that selects a valid column randomly."""
    valid_moves = [col for col in range(config.columns) if obs.board[col] == 0]
    return random.choice(valid_moves)


def check_winning_move(obs, config, column, mark):
    """Check if placing a piece in the given column would result in a win."""
    board = np.array(obs.board).reshape(config.rows, config.columns)
    
    # Find the lowest empty row in the column
    row = 0
    for r in range(config.rows-1, -1, -1):
        if board[r][column] == 0:
            row = r
            break
    
    # Place the temporary piece
    board[row][column] = mark
    
    # Check for horizontal win
    inarow = config.inarow - 1
    count = 0
    for c in range(max(0, column-inarow), min(config.columns, column+inarow+1)):
        if board[row][c] == mark:
            count += 1
            if count == config.inarow:
                return True
        else:
            count = 0
    
    # Check for vertical win
    count = 0
    for r in range(max(0, row-inarow), min(config.rows, row+inarow+1)):
        if board[r][column] == mark:
            count += 1
            if count == config.inarow:
                return True
        else:
            count = 0
    
    # Check for diagonal wins
    count = 0
    for i in range(-inarow, inarow+1):
        r, c = row+i, column+i
        if 0 <= r < config.rows and 0 <= c < config.columns and board[r][c] == mark:
            count += 1
            if count == config.inarow:
                return True
        else:
            count = 0
    
    count = 0
    for i in range(-inarow, inarow+1):
        r, c = row+i, column-i
        if 0 <= r < config.rows and 0 <= c < config.columns and board[r][c] == mark:
            count += 1
            if count == config.inarow:
                return True
        else:
            count = 0
    
    return False


def rule_based_agent(obs, config):
    """
    Rule-based agent with heuristics:
    1. Play winning move if available
    2. Block opponent's winning move
    3. Prefer center column
    4. Otherwise play randomly
    """
    valid_moves = [col for col in range(config.columns) if obs.board[col] == 0]
    
    # Check for winning moves
    for col in valid_moves:
        if check_winning_move(obs, config, col, obs.mark):
            return col
    
    # Check for blocking opponent's winning moves
    opponent_mark = 3 - obs.mark
    for col in valid_moves:
        if check_winning_move(obs, config, col, opponent_mark):
            return col
    
    # Prefer center column
    center_col = config.columns // 2
    if center_col in valid_moves:
        return center_col
    
    return random.choice(valid_moves)

print("Baseline agents created!")

Baseline agents created!


## State Encoding and Helper Functions



In [105]:

def encode_board(obs, config):
    """
    Encode board state into neural network input format.
    Creates 3 channels: our pieces, opponent pieces, empty spaces.
    """
    board = np.array(obs.board).reshape(config.rows, config.columns)
    
    my_pieces = np.where(board == obs.mark, 1, 0)
    opp_pieces = np.where(board == (3 - obs.mark), 1, 0)
    empty_spaces = np.where(board == 0, 1, 0)
    
    state = np.stack([my_pieces, opp_pieces, empty_spaces]).flatten()
    return state.astype(np.float32)


def get_valid_actions(obs, config):
    """Get list of valid actions (columns that are not full)."""
    return [col for col in range(config.columns) if obs.board[col] == 0]

print("Helper functions defined!")




Helper functions defined!


## : DQN Neural Network

Now we'll implement a DQN agent which is a foundational deep reinforcement learning algorithm. DQN learns a Q-function that estimates the expected future rewards for taking each action in each state.

In [106]:


class DQN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super(DQN, self).__init__()
        self.fc1 = nn.Linear(input_size, hidden_size)
        self.fc2 = nn.Linear(hidden_size, hidden_size)
        self.fc3 = nn.Linear(hidden_size, hidden_size)
        self.fc4 = nn.Linear(hidden_size, output_size)
        
    def forward(self, x):
        x = F.relu(self.fc1(x))
        x = F.relu(self.fc2(x))
        x = F.relu(self.fc3(x))
        x = self.fc4(x)
        return x

print("DQN network architecture defined!")




DQN network architecture defined!


# CELL 6: Replay Buffer

In [107]:
class ReplayBuffer:
    def __init__(self, capacity):
        self.buffer = deque(maxlen=capacity)
    
    def push(self, state, action, next_state, reward):
        self.buffer.append((state, action, next_state, reward))
    
    def sample(self, batch_size):
        return random.sample(self.buffer, batch_size)
    
    def __len__(self):
        return len(self.buffer)

print("Replay buffer defined!")



Replay buffer defined!


#  FIXED DQN Agent

In [108]:
class DQNAgent:
    def __init__(self, state_size, action_size, hidden_size=128, lr=0.001):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        
        self.state_size = state_size
        self.action_size = action_size
        self.hidden_size = hidden_size
        
        # Neural networks
        self.q_network = DQN(state_size, hidden_size, action_size).to(self.device)
        self.target_network = DQN(state_size, hidden_size, action_size).to(self.device)
        self.target_network.load_state_dict(self.q_network.state_dict())
        self.optimizer = optim.Adam(self.q_network.parameters(), lr=lr)
        
        # Training parameters
        self.memory = ReplayBuffer(10000)
        self.epsilon = 1.0
        self.epsilon_min = 0.01
        self.epsilon_decay = 0.995
        self.batch_size = 64
        self.gamma = 0.99
        self.update_target_freq = 100  # FIXED: Update every 100 learning steps
        self.learn_step = 0
        self.t_step = 0
        
    def step(self, state, action, reward, next_state, done):
        # Save experience in replay memory
        self.memory.push(state, action, next_state, reward)
        
        # Learn every 4 steps
        self.t_step = (self.t_step + 1) % 4
        if self.t_step == 0 and len(self.memory) > self.batch_size:
            experiences = self.memory.sample(self.batch_size)
            self.learn(experiences)
    
    def act(self, state, valid_actions, eps=None):
        """FIXED: Returns actions with proper action masking."""
        if eps is None:
            eps = self.epsilon
            
        state = torch.FloatTensor(state).unsqueeze(0).to(self.device)
        self.q_network.eval()
        with torch.no_grad():
            action_values = self.q_network(state)
        self.q_network.train()
        
        # FIXED: Epsilon-greedy with action masking
        if random.random() > eps:
            # Mask invalid actions with -inf
            action_values_np = action_values.cpu().data.numpy()[0]
            masked_values = np.full(self.action_size, -np.inf)
            for action in valid_actions:
                masked_values[action] = action_values_np[action]
            return np.argmax(masked_values)
        else:
            return random.choice(valid_actions)
    
    def learn(self, experiences):
        """FIXED: Update with gradient clipping."""
        states = torch.FloatTensor(np.array([e[0] for e in experiences])).to(self.device)
        actions = torch.LongTensor([e[1] for e in experiences]).to(self.device)
        next_states = torch.FloatTensor(np.array([e[2] for e in experiences])).to(self.device)
        rewards = torch.FloatTensor([e[3] for e in experiences]).to(self.device)
        
        # Get max predicted Q values from target model
        Q_targets_next = self.target_network(next_states).detach().max(1)[0]
        
        # Compute Q targets
        Q_targets = rewards + (self.gamma * Q_targets_next)
        
        # Get expected Q values from local model
        Q_expected = self.q_network(states).gather(1, actions.unsqueeze(1)).squeeze(1)
        
        # Compute loss
        loss = F.mse_loss(Q_expected, Q_targets)
        
        # Minimize the loss
        self.optimizer.zero_grad()
        loss.backward()
        torch.nn.utils.clip_grad_norm_(self.q_network.parameters(), 1.0)  # ADDED: Gradient clipping
        self.optimizer.step()
        
        # FIXED: Update target network at proper frequency
        self.learn_step += 1
        if self.learn_step % self.update_target_freq == 0:
            self.target_network.load_state_dict(self.q_network.state_dict())
    
    def update_epsilon(self):
        """Decrease epsilon for more exploitation over time."""
        self.epsilon = max(self.epsilon_min, self.epsilon * self.epsilon_decay)

print("FIXED DQN Agent defined!")




FIXED DQN Agent defined!


#  FIXED DQN Training Function

In [109]:
def train_dqn_agent(episodes=1000, opponent='random', epsilon_start=1.0):
    """
    FIXED: Train DQN agent with proper reward tracking and shaping.
    """
    env = make("connectx")
    
    state_size = env.configuration.columns * env.configuration.rows * 3
    action_size = env.configuration.columns
    
    agent = DQNAgent(state_size, action_size)
    agent.epsilon = epsilon_start
    
    # FIXED: Track wins and rewards properly
    wins_window = deque(maxlen=100)
    episode_rewards = deque(maxlen=100)
    
    print("Training DQN agent...")
    for episode in range(episodes):
        # Set up opponent
        if opponent == 'random':
            trainer = env.train([None, random_agent])
        elif opponent == 'rule_based':
            trainer = env.train([None, rule_based_agent])
        else:
            trainer = env.train([None, opponent])
            
        obs = trainer.reset()
        state = encode_board(obs, env.configuration)
        done = False
        episode_reward = 0
        
        while not done:
            # Get valid actions
            valid_actions = get_valid_actions(obs, env.configuration)
            
            # Select action
            action = agent.act(state, valid_actions)
            
            # Take action
            next_obs, reward, done, info = trainer.step(action)
            
            # FIXED: Better reward shaping
            if done:
                if reward == 1:  # Win
                    shaped_reward = 1.0
                    wins_window.append(1)
                elif reward == -1:  # Loss
                    shaped_reward = -1.0
                    wins_window.append(0)
                else:  # Draw
                    shaped_reward = 0.5
                    wins_window.append(0)
            else:
                shaped_reward = 0.0  # No step penalty
                next_state = encode_board(next_obs, env.configuration)
            
            # Store experience
            if done:
                next_state = state  # Terminal state
            
            agent.step(state, action, shaped_reward, next_state, done)
            
            episode_reward += shaped_reward
            
            # Move to next state
            if not done:
                state = next_state
                obs = next_obs
        
        episode_rewards.append(episode_reward)
        
        # Update epsilon
        agent.update_epsilon()
        
        # Print progress
        if (episode + 1) % 100 == 0:
            win_rate = np.mean(wins_window) * 100 if wins_window else 0
            avg_reward = np.mean(episode_rewards) if episode_rewards else 0
            print(f'Episode {episode+1}/{episodes} | Win Rate: {win_rate:.1f}% | '
                  f'Avg Reward: {avg_reward:.3f} | Epsilon: {agent.epsilon:.3f}')
    
    final_win_rate = np.mean(wins_window) * 100 if wins_window else 0
    print(f'Training completed! Final win rate: {final_win_rate:.1f}%')
    return agent

print("FIXED DQN training function defined!")



FIXED DQN training function defined!


#  FIXED PPO Actor-Critic Network

In [110]:
class ActorCritic(nn.Module):
    def __init__(self, state_size, action_size, hidden_size=256):
        super(ActorCritic, self).__init__()
        
        # Shared layers
        self.shared_fc1 = nn.Linear(state_size, hidden_size)
        self.shared_fc2 = nn.Linear(hidden_size, hidden_size)
        
        # Actor head (policy network)
        self.actor_fc = nn.Linear(hidden_size, hidden_size // 2)
        self.actor_out = nn.Linear(hidden_size // 2, action_size)
        
        # Critic head (value network)
        self.critic_fc = nn.Linear(hidden_size, hidden_size // 2)
        self.critic_out = nn.Linear(hidden_size // 2, 1)
        
    def forward(self, x):
        x = F.relu(self.shared_fc1(x))
        x = F.relu(self.shared_fc2(x))
        
        # Actor (policy)
        actor_x = F.relu(self.actor_fc(x))
        action_probs = F.softmax(self.actor_out(actor_x), dim=-1)
        
        # FIXED: Critic output - squeeze to 1D
        critic_x = F.relu(self.critic_fc(x))
        state_value = self.critic_out(critic_x).squeeze(-1)
        
        return action_probs, state_value

print("FIXED Actor-Critic network defined!")




FIXED Actor-Critic network defined!


# PPO Memory

In [111]:


class Memory:
    def __init__(self):
        self.actions = []
        self.states = []
        self.logprobs = []
        self.rewards = []
        self.is_terminals = []
    
    def clear_memory(self):
        del self.actions[:]
        del self.states[:]
        del self.logprobs[:]
        del self.rewards[:]
        del self.is_terminals[:]

print("PPO Memory defined!")




PPO Memory defined!


#  FIXED PPO Agent

In [112]:

class PPOAgent:
    def __init__(self, state_size, action_size, lr=0.0003, gamma=0.99, eps_clip=0.2, K_epochs=4):
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
        
        self.gamma = gamma
        self.eps_clip = eps_clip
        self.K_epochs = K_epochs
        
        self.policy = ActorCritic(state_size, action_size).to(self.device)
        self.optimizer = optim.Adam(self.policy.parameters(), lr=lr)
        
        self.policy_old = ActorCritic(state_size, action_size).to(self.device)
        self.policy_old.load_state_dict(self.policy.state_dict())
        
        self.MseLoss = nn.MSELoss()
    
    def act(self, state, valid_actions):
        """Select action using current policy."""
        state = torch.FloatTensor(state).to(self.device).unsqueeze(0)
        
        with torch.no_grad():
            action_probs, state_val = self.policy_old(state)
            
            # Mask invalid actions
            masked_probs = torch.zeros_like(action_probs)
            for action in valid_actions:
                masked_probs[0][action] = action_probs[0][action]
            
            # Renormalize
            masked_probs = masked_probs / (masked_probs.sum() + 1e-10)
            
            # Sample action
            dist = torch.distributions.Categorical(masked_probs)
            action = dist.sample().item()
            
            # Safety check
            if action not in valid_actions:
                action = random.choice(valid_actions)
            
        return action, state_val.item()
    
    def update(self, memory):
        """FIXED: Update policy with proper tensor dimensions."""
        # Calculate discounted rewards
        rewards = []
        discounted_reward = 0
        for reward, is_terminal in zip(reversed(memory.rewards), reversed(memory.is_terminals)):
            if is_terminal:
                discounted_reward = 0
            discounted_reward = reward + (self.gamma * discounted_reward)
            rewards.insert(0, discounted_reward)
        
        # Normalize rewards
        rewards = torch.FloatTensor(rewards).to(self.device)
        if len(rewards) > 1:
            rewards = (rewards - rewards.mean()) / (rewards.std() + 1e-8)
        
        # FIXED: Proper numpy array conversion
        old_states = torch.FloatTensor(np.array(memory.states)).to(self.device)
        old_actions = torch.LongTensor(memory.actions).to(self.device)
        old_logprobs = torch.FloatTensor(memory.logprobs).to(self.device)
        
        # Optimize policy for K epochs
        for _ in range(self.K_epochs):
            # Evaluate old actions and values
            action_probs, state_values = self.policy(old_states)
            
            dist = torch.distributions.Categorical(action_probs)
            new_logprobs = dist.log_prob(old_actions)
            entropy = dist.entropy()
            
            # Importance ratio
            ratio = torch.exp(new_logprobs - old_logprobs)
            
            # Surrogate loss
            advantages = rewards - state_values.detach()
            surr1 = ratio * advantages
            surr2 = torch.clamp(ratio, 1 - self.eps_clip, 1 + self.eps_clip) * advantages
            
            # FIXED: Both state_values and rewards are now 1D
            loss = -torch.min(surr1, surr2) + 0.5 * self.MseLoss(state_values, rewards) - 0.01 * entropy
            
            # Gradient step with clipping
            self.optimizer.zero_grad()
            loss.mean().backward()
            torch.nn.utils.clip_grad_norm_(self.policy.parameters(), 0.5)
            self.optimizer.step()
        
        # Update old policy
        self.policy_old.load_state_dict(self.policy.state_dict())

print("FIXED PPO Agent defined!")




FIXED PPO Agent defined!


#  FIXED PPO Training Function

In [113]:

def train_ppo_agent(episodes=500, opponent='random'):
    """
    FIXED: Train PPO agent with proper logprob calculation.
    """
    env = make("connectx")
    
    state_size = env.configuration.columns * env.configuration.rows * 3
    action_size = env.configuration.columns
    
    agent = PPOAgent(state_size, action_size)
    memory = Memory()
    
    wins_window = deque(maxlen=100)
    scores_window = deque(maxlen=100)
    
    print("Training PPO agent...")
    for episode in range(episodes):
        # Set up opponent
        if opponent == 'random':
            trainer = env.train([None, random_agent])
        elif opponent == 'rule_based':
            trainer = env.train([None, rule_based_agent])
        else:
            trainer = env.train([None, opponent])
            
        obs = trainer.reset()
        state = encode_board(obs, env.configuration)
        score = 0
        done = False
        
        while not done:
            # Get valid actions
            valid_actions = get_valid_actions(obs, env.configuration)
            
            # Select action
            action, state_val = agent.act(state, valid_actions)
            
            # Get action probability for logging
            state_tensor = torch.FloatTensor(state).to(agent.device).unsqueeze(0)
            with torch.no_grad():
                action_probs, _ = agent.policy_old(state_tensor)
                masked_probs = torch.zeros_like(action_probs)
                for act in valid_actions:
                    masked_probs[0][act] = action_probs[0][act]
                masked_probs = masked_probs / (masked_probs.sum() + 1e-10)
                action_logprob = torch.log(masked_probs[0][action] + 1e-10)
            
            # Store state, action, logprob
            memory.states.append(state)
            memory.actions.append(action)
            memory.logprobs.append(action_logprob.item())
            
            # Take action
            next_obs, reward, done, info = trainer.step(action)
            
            # Reward shaping
            if done:
                if reward == 1:
                    shaped_reward = 1.0
                    wins_window.append(1)
                elif reward == -1:
                    shaped_reward = -1.0
                    wins_window.append(0)
                else:
                    shaped_reward = 0.5
                    wins_window.append(0)
            else:
                shaped_reward = 0.0
            
            # Store reward and terminal
            memory.rewards.append(shaped_reward)
            memory.is_terminals.append(done)
            
            # Move to next state
            state = encode_board(next_obs, env.configuration)
            obs = next_obs
            score += shaped_reward
        
        scores_window.append(score)
        
        # Update policy every 10 episodes
        if (episode + 1) % 10 == 0:
            agent.update(memory)
            memory.clear_memory()
        
        # Print progress
        if (episode + 1) % 100 == 0:
            avg_score = np.mean(scores_window) if scores_window else 0
            win_rate = np.mean(wins_window) * 100 if wins_window else 0
            print(f'Episode {episode+1}/{episodes} | Win Rate: {win_rate:.1f}% | Avg Score: {avg_score:.2f}')
    
    print(f'Training completed! Final win rate: {np.mean(wins_window) * 100:.1f}%')
    return agent

print("FIXED PPO training function defined!")




FIXED PPO training function defined!


#  Agent Comparison Function

In [114]:

def compare_agents(agent1, agent2, agent1_name, agent2_name, n_rounds=100):
    """Compare performance of two agents."""
    print(f"\nComparing {agent1_name} vs {agent2_name} over {n_rounds} rounds...")
    
    env = make("connectx")
    agent1_wins = 0
    agent2_wins = 0
    draws = 0
    
    for i in range(n_rounds):
        if i % 2 == 0:
            # agent1 plays first
            trainer = env.train([None, agent2])
            obs = trainer.reset()
            done = False
            while not done:
                my_action = agent1(obs, env.configuration)
                obs, reward, done, info = trainer.step(my_action)
                if not done:
                    my_action = agent2(obs, env.configuration)
                    obs, reward, done, info = trainer.step(my_action)
            
            if reward == 1:
                agent1_wins += 1
            elif reward == -1:
                agent2_wins += 1
            else:
                draws += 1
        else:
            # agent2 plays first
            trainer = env.train([None, agent1])
            obs = trainer.reset()
            done = False
            while not done:
                my_action = agent2(obs, env.configuration)
                obs, reward, done, info = trainer.step(my_action)
                if not done:
                    my_action = agent1(obs, env.configuration)
                    obs, reward, done, info = trainer.step(my_action)
            
            if reward == 1:
                agent2_wins += 1
            elif reward == -1:
                agent1_wins += 1
            else:
                draws += 1
    
    print(f"{agent1_name} wins: {agent1_wins} ({agent1_wins/n_rounds*100:.1f}%)")
    print(f"{agent2_name} wins: {agent2_wins} ({agent2_wins/n_rounds*100:.1f}%)")
    print(f"Draws: {draws} ({draws/n_rounds*100:.1f}%)")
    
    return agent1_wins, agent2_wins, draws

print("Comparison function defined!")



Comparison function defined!


#  Test Baseline Agents

In [115]:

print("\n" + "="*60)
print("TESTING BASELINE AGENTS")
print("="*60)

compare_agents(rule_based_agent, random_agent, "Rule-based", "Random", n_rounds=50)



TESTING BASELINE AGENTS

Comparing Rule-based vs Random over 50 rounds...
Rule-based wins: 46 (92.0%)
Random wins: 4 (8.0%)
Draws: 0 (0.0%)


(46, 4, 0)

#  Train DQN Agent

In [116]:



print("\n" + "="*60)
print("TRAINING DQN AGENT")
print("="*60)

# Train against random agent first
print("\nPhase 1: Training against random agent...")
dqn_agent_trained = train_dqn_agent(episodes=500, opponent='random')

# Optionally train more against rule-based
print("\nPhase 2: Training against rule-based agent...")
dqn_agent_final = train_dqn_agent(episodes=500, opponent='rule_based', epsilon_start=0.3)




TRAINING DQN AGENT

Phase 1: Training against random agent...
Training DQN agent...


KeyboardInterrupt: 

#  Create DQN Wrapper Function

In [None]:

def dqn_agent_wrapper(obs, config, trained_agent=dqn_agent_final):
    """Wrapper for trained DQN agent."""
    state = encode_board(obs, config)
    valid_actions = get_valid_actions(obs, config)
    action = trained_agent.act(state, valid_actions, eps=0.0)
    return action

print("DQN wrapper created!")




#  Evaluate DQN Agent

In [None]:


print("\n" + "="*60)
print("EVALUATING DQN AGENT")
print("="*60)

compare_agents(dqn_agent_wrapper, random_agent, "DQN", "Random", n_rounds=100)
compare_agents(dqn_agent_wrapper, rule_based_agent, "DQN", "Rule-based", n_rounds=100)




#  Train PPO Agent

In [None]:

print("\n" + "="*60)
print("TRAINING PPO AGENT")
print("="*60)

print("\nPhase 1: Training against random agent...")
ppo_agent_trained = train_ppo_agent(episodes=300, opponent='random')

print("\nPhase 2: Training against rule-based agent...")
ppo_agent_final = train_ppo_agent(episodes=300, opponent='rule_based')




#  Create PPO Wrapper Function

In [None]:

def ppo_agent_wrapper(obs, config, trained_agent=ppo_agent_final):
    """Wrapper for trained PPO agent."""
    state = encode_board(obs, config)
    valid_actions = get_valid_actions(obs, config)
    action, _ = trained_agent.act(state, valid_actions)
    return action

print("PPO wrapper created!")



#  Evaluate PPO Agent

In [None]:


print("\n" + "="*60)
print("EVALUATING PPO AGENT")
print("="*60)

compare_agents(ppo_agent_wrapper, random_agent, "PPO", "Random", n_rounds=100)
compare_agents(ppo_agent_wrapper, rule_based_agent, "PPO", "Rule-based", n_rounds=100)




#  Compare DQN vs PPO

In [None]:

print("\n" + "="*60)
print("DQN vs PPO COMPARISON")
print("="*60)

compare_agents(dqn_agent_wrapper, ppo_agent_wrapper, "DQN", "PPO", n_rounds=100)




#  Visualize Sample Game

In [None]:


def visualize_game(agent1, agent2, agent1_name="Agent 1", agent2_name="Agent 2"):
    """Visualize a sample game between two agents."""
    print(f"\n{'='*60}")
    print(f"GAME: {agent1_name} vs {agent2_name}")
    print('='*60)
    
    env = make("connectx")
    trainer = env.train([None, agent2])
    obs = trainer.reset()
    
    print("\nInitial board:")
    board = np.array(obs.board).reshape(env.configuration.rows, env.configuration.columns)
    print(board)
    
    step = 1
    done = False
    while not done:
        # agent1's turn
        action = agent1(obs, env.configuration)
        print(f"\nStep {step}: {agent1_name} → column {action}")
        obs, reward, done, info = trainer.step(action)
        
        if not done:
            # agent2's turn
            action = agent2(obs, env.configuration)
            print(f"Step {step}: {agent2_name} → column {action}")
            obs, reward, done, info = trainer.step(action)
        
        # Show board
        board = np.array(obs.board).reshape(env.configuration.rows, env.configuration.columns)
        print(board)
        
        step += 1
        if step > 50:
            print("\nGame exceeded 50 steps!")
            break
    
    # Result
    if reward == 1:
        winner = agent1_name
    elif reward == -1:
        winner = agent2_name
    else:
        winner = "Draw"
    
    print(f"\n{'='*60}")
    print(f"RESULT: {winner} wins!")
    print('='*60)

# Visualize sample games
visualize_game(rule_based_agent, random_agent, "Rule-based", "Random")
visualize_game(dqn_agent_wrapper, rule_based_agent, "DQN", "Rule-based")
visualize_game(ppo_agent_wrapper, rule_based_agent, "PPO", "Rule-based")




#  Performance Summary

In [None]:


print("\n" + "="*60)
print("FINAL PERFORMANCE SUMMARY")
print("="*60)

print("\nAll tests completed!")
print("\nKey improvements in this fixed version:")
print("1. ✓ DQN agent properly masks invalid actions")
print("2. ✓ Target network updates at correct frequency")
print("3. ✓ Reward shaping is balanced (±1 for win/loss)")
print("4. ✓ PPO critic outputs 1D tensors (no dimension mismatch)")
print("5. ✓ Proper win rate and reward tracking")
print("6. ✓ Gradient clipping prevents exploding gradients")
print("7. ✓ Agents should now learn effectively!")

print("\nExpected performance after training:")
print("- DQN vs Random: 60-80% win rate")
print("- DQN vs Rule-based: 30-50% win rate")
print("- PPO vs Random: 70-90% win rate")
print("- PPO vs Rule-based: 40-60% win rate")