# Install kaggle-environments

In [3]:
# 1. Enable Internet in the Kernel (Settings side pane)

# 2. Curl cache may need purged if v0.1.6 cannot be found (uncomment if needed).
# !curl -X PURGE https://pypi.org/simple/kaggle-environments

# ConnectX environment was defined in v0.1.6
%pip install -q "kaggle-environments>=0.1.6"
%pip install -q numpy
%pip install -q tqdm
%pip install -q stable-baselines3
%pip install -q kaggle-environments stable-baselines3 gymnasium numpy torch tqdm matplotlib
%pip install -q sb3-contrib
%pip install -q lmdb
%pip install -q gym tensorboard



Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.
Note: you may need to restart the kernel to use updated packages.


# Create ConnectX Environment

In [4]:
from kaggle_environments import evaluate, make, utils

env = make("connectx", debug=True)
env.render()

+---+---+---+---+---+---+---+
| 0 | 0 | 0 | 0 | 0 | 0 | 0 |
+---+---+---+---+---+---+---+
| 0 | 0 | 0 | 0 | 0 | 0 | 0 |
+---+---+---+---+---+---+---+
| 0 | 0 | 0 | 0 | 0 | 0 | 0 |
+---+---+---+---+---+---+---+
| 0 | 0 | 0 | 0 | 0 | 0 | 0 |
+---+---+---+---+---+---+---+
| 0 | 0 | 0 | 0 | 0 | 0 | 0 |
+---+---+---+---+---+---+---+
| 0 | 0 | 0 | 0 | 0 | 0 | 0 |
+---+---+---+---+---+---+---+



# Enviroment wrapper

To create the submission, an agent function should be fully encapsulated (no external dependencies).  

When your agent is being evaluated against others, it will not have access to the Kaggle docker image.  Only the following can be imported: Python Standard Library Modules, gym, numpy, scipy, pytorch (1.3.1, cpu only), and more may be added later.



In [5]:
import gym
import numpy as np
from gym import spaces
from kaggle_environments import make



class ConnectXGym(gym.Env):
    """
    A Gym-compatible wrapper for Kaggle's ConnectX environment.
    Allows training with Stable Baselines 3's algorithms.
    """
    def __init__(self, opponent="random"):
        super().__init__()
        # Initialize the Kaggle ConnectX environment
        self.env = make("connectx", debug=True)
        # Create a training helper that automatically handles the opponent's moves
        self.trainer = self.env.train([None, opponent])

        # Save the environment configuration
        self.config = self.env.configuration

        # Define Gym spaces
        self.action_space = spaces.Discrete(self.config.columns)
        self.observation_space = spaces.Box(
            low=0,
            high=2,
            shape=(self.config.rows * self.config.columns,),
            dtype=np.int8
        )

    def reset(self, seed=None, options=None):
        """
        Resets the environment to an initial state and returns an initial observation.
        """
        obs_dict = self.trainer.reset()
        obs = np.array(obs_dict["board"], dtype=np.int8)
        return obs, {}

    def step(self, action: int):
        # Check if action is invalid (the top cell is not empty)
        current_obs = self.env.state[0]["observation"]  # returns a Struct
        board = current_obs["board"]

        # If invalid, penalize and end
        if board[action] != 0:  # Invalid move
            obs = np.array(board, dtype=np.int8)
            reward = -1.0  # Small penalty instead of hard termination
            """
            done = False   # Allow continuation

            """
            done = True  # Ends the episode immediately (game over)
            truncated = False
            return np.array(board, dtype=np.int8), reward, done, truncated, {"invalid_action": True}
           
           
        # Otherwise, proceed
        obs_dict, reward, done, info = self.trainer.step(int(action))

        if reward is None:
            reward = 0.0
        else:
            reward = float(reward)

        obs = np.array(obs_dict["board"], dtype=np.int8)
        truncated = False
        return obs, reward, done, truncated, info


# PPO Agent training

In [None]:
import torch
from stable_baselines3 import PPO
from stable_baselines3.common.vec_env import SubprocVecEnv  # or DummyVecEnv
from stable_baselines3 import DQN

# 1) Create environment functions
def make_env_random():
    def _init():
        return ConnectXGym(opponent="random")
    return _init

def make_env_negamax():
    def _init():
        return ConnectXGym(opponent="negamax")
    return _init

# 2) 2 envs with random, 2 envs with negamax => total n_envs=4
env_fns = [make_env_random(), make_env_random(), make_env_negamax(), make_env_negamax()]

vec_env = SubprocVecEnv(env_fns)  # or DummyVecEnv(env_fns)

# 3) Instantiate the PPO model

model = PPO(
    "MlpPolicy",
    vec_env,
    verbose=0,             # Disable logging
    tensorboard_log="./ppo_tensorboard/",
    device=torch.device("cuda" if torch.cuda.is_available() else "cpu"),
    gamma=0.99,           # Lower discount factor (default: 0.99)
    ent_coef=0.01,         # Higher entropy for more exploration (default: 0.01)
    learning_rate=3e-4,   # Keep default for now
    n_steps=2048,         # Keep default
)


print("🚀 Starting PPO training...")

# 4) Manual Loop: train in intervals of 10k steps, then evaluate
from kaggle_environments import evaluate
import numpy as np

def ppo_agent(obs, config):
    board_array = np.array(obs["board"], dtype=np.int8)
    action, _ = model.predict(board_array, deterministic=True)
    return int(action)

def mean_reward(rewards):
    valid_rewards = [(r[0] if r[0] else 0.0, r[1] if r[1] else 0.0)
                     for r in rewards]
    return sum(r[0] for r in valid_rewards) / float(len(valid_rewards))

TOTAL_STEPS = 250000
EVAL_INTERVAL = 10000
current_steps = 0


model.learn(total_timesteps=TOTAL_STEPS, reset_num_timesteps=False)

while current_steps < TOTAL_STEPS:
    model.learn(total_timesteps=EVAL_INTERVAL, reset_num_timesteps=False)
    current_steps += EVAL_INTERVAL

    # Evaluate vs random
    eval_rand = evaluate("connectx", [ppo_agent, "random"], num_episodes=500)
    mr_rand = mean_reward(eval_rand)

    # Evaluate vs negamax
    eval_nega = evaluate("connectx", [ppo_agent, "negamax"], num_episodes=500)
    mr_nega = mean_reward(eval_nega)

    print(f"Steps={current_steps}: vs. Random={mr_rand:.5f}, vs. Negamax={mr_nega:.5f}")

model.save("ppo_model")

print("✅ PPO Training complete!")


# DQN Agent training

In [None]:
import torch
from stable_baselines3 import DQN
from stable_baselines3.common.vec_env import SubprocVecEnv  # or DummyVecEnv

# 1) Create environment functions
def make_env_random():
    def _init():
        return ConnectXGym(opponent="random")
    return _init

def make_env_negamax():
    def _init():
        return ConnectXGym(opponent="negamax")
    return _init

# 2) 2 envs with random, 2 envs with negamax => total n_envs=4
env_fns = [make_env_random(), make_env_random(), make_env_negamax(), make_env_negamax()]
vec_env = SubprocVecEnv(env_fns)  # or DummyVecEnv(env_fns)

# 3) Instantiate the DQN model
model = DQN(
    "MlpPolicy",
    vec_env,
    learning_rate=1e-3,
    buffer_size=100_000,
    batch_size=64,
    gamma=0.95,
    exploration_fraction=0.2,
    exploration_final_eps=0.01,
    target_update_interval=500,
    train_freq=4,
    verbose=0,  # Disable logging
    device=torch.device("cuda" if torch.cuda.is_available() else "cpu"),
)

print("🚀 Starting DQN training...")

# 4) Manual Loop: train in intervals of 10k steps, then evaluate
from kaggle_environments import evaluate
import numpy as np

def dqn_agent(obs, config):
    board_array = np.array(obs["board"], dtype=np.int8)
    action, _ = model.predict(board_array, deterministic=True)
    return int(action)

def mean_reward(rewards):
    valid_rewards = [(r[0] if r[0] else 0.0, r[1] if r[1] else 0.0)
                     for r in rewards]
    return sum(r[0] for r in valid_rewards) / float(len(valid_rewards))

TOTAL_STEPS = 250000  
EVAL_INTERVAL = 10000
current_steps = 0

while current_steps < TOTAL_STEPS:
    model.learn(total_timesteps=EVAL_INTERVAL, reset_num_timesteps=False)
    current_steps += EVAL_INTERVAL

    eval_rand = evaluate("connectx", [dqn_agent, "random"], num_episodes=500)
    mr_rand = mean_reward(eval_rand)

    eval_nega = evaluate("connectx", [dqn_agent, "negamax"], num_episodes=500)
    mr_nega = mean_reward(eval_nega)

    print(f"Steps={current_steps}: vs. Random={mr_rand:.5f}, vs. Negamax={mr_nega:.5f}")

# Save the DQN model
model.save("dqn_model")

print("✅ DQN Training complete!")


# Q Learning

In [None]:
# import os

# # Define path to main.py
# main_script = os.path.join(os.getcwd(), "main.py")

# # Run the script to train Q-learning
# print("🚀 Running Q-learning training script...")
# os.system(f"python {main_script}")  # This executes main.py like a command-line script
# print("✅ Training completed!")


Q-Learning Training Results:

```
Episode 20000 - Random: 0.551, Negamax: 0.02
Episode 30000 - Random: 0.564, Negamax: 0.017
Episode 40000 - Random: 0.589, Negamax: 0.022
Episode 50000 - Random: 0.579, Negamax: 0.033
Episode 60000 - Random: 0.576, Negamax: 0.023
Episode 70000 - Random: 0.559, Negamax: 0.023
Episode 80000 - Random: 0.577, Negamax: 0.022
Episode 90000 - Random: 0.58, Negamax: 0.02
Episode 100000 - Random: 0.55, Negamax: 0.016
Episode 110000 - Random: 0.553, Negamax: 0.015
Episode 120000 - Random: 0.589, Negamax: 0.018
Episode 130000 - Random: 0.56, Negamax: 0.02
Episode 140000 - Random: 0.571, Negamax: 0.02
Episode 150000 - Random: 0.546, Negamax: 0.013
Episode 160000 - Random: 0.568, Negamax: 0.026
Episode 170000 - Random: 0.586, Negamax: 0.031
Episode 180000 - Random: 0.563, Negamax: 0.027
Episode 190000 - Random: 0.575, Negamax: 0.021
Episode 200000 - Random: 0.545, Negamax: 0.024
Episode 210000 - Random: 0.589, Negamax: 0.023
Episode 220000 - Random: 0.565, Negamax: 0.019
Episode 230000 - Random: 0.531, Negamax: 0.017
Episode 240000 - Random: 0.569, Negamax: 0.025
Episode 250000 - Random: 0.535, Negamax: 0.027
Q-table size: 176014
```

In [2]:
import sys
import os
import numpy as np
from kaggle_environments import evaluate

# Add src/ directory to Python path
sys.path.append(os.path.join(os.getcwd(), "src"))

# Import Q-learning components
from qtable import QTable
from connectx import ConnectX
import zipfile

# Initialize ConnectX environment
env = ConnectX()

# Define the path to the zip file and the extraction directory
zip_file_path = "qtable_backup.zip"
extract_dir = "qtable.lmdb"

# Check if the extraction directory already exists
if not os.path.exists(extract_dir):
    # If not, unzip the file into the directory
    with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
        zip_ref.extractall(extract_dir)
    print(f"✅ Extracted {zip_file_path} into {extract_dir}")
else:
    print(f"ℹ️ Directory {extract_dir} already exists. Skipping extraction.")

q_table = QTable(action_space=env.action_space.n)

print("✅ Q-learning model loaded successfully!")


✅ Extracted qtable_backup.zip into qtable.lmdb
✅ Q-learning model loaded successfully!


# Models battle

In [3]:
from random import choice
from kaggle_environments import evaluate, make
from stable_baselines3 import PPO, DQN

def mean_reward(rewards):
    valid_rewards = [(r[0] if r[0] else 0.0, r[1] if r[1] else 0.0)
                     for r in rewards]
    return sum(r[0] for r in valid_rewards) / float(len(valid_rewards))

ppo_model = PPO.load("ppo_model")
dqn_model = DQN.load("dqn_model")
q_model = q_table.get_table()

# Define agent functions using loaded models
def ppo_agent(obs, config):
    board_array = np.array(obs["board"], dtype=np.int8)
    action, _ = ppo_model.predict(board_array, deterministic=True)
    return int(action)

def dqn_agent(obs, config):
    board_array = np.array(obs["board"], dtype=np.int8)
    action, _ = dqn_model.predict(board_array, deterministic=True)
    return int(action)

def q_agent(obs, conf):
    state_key = hex(int(''.join(map(str, obs.board + [obs.mark])), 3))[2:] # convert the state to a unique key
    return q_model.get(state_key, choice([c for c in range(conf.columns) if obs.board[c] == 0])) # get the action from the Q-table

# Evaluate PPO vs DQN
eval_results = evaluate("connectx", [dqn_agent, ppo_agent], num_episodes=500)
mr_dqn_vs_ppo = mean_reward(eval_results)

eval_results = evaluate("connectx", [ppo_agent, dqn_agent], num_episodes=500)
mr_ppo_vs_dqn = mean_reward(eval_results)

# Evaluate Q-Learning vs PPO
eval_results = evaluate("connectx", [q_agent, ppo_agent], num_episodes=500)
mr_qlearn_vs_ppo = mean_reward(eval_results)

eval_results = evaluate("connectx", [ppo_agent, q_agent], num_episodes=500)
mr_ppo_vs_qlearn = mean_reward(eval_results)

# Evaluate Q-Learning vs DQN
eval_results = evaluate("connectx", [dqn_agent, q_agent], num_episodes=500)
mr_dqn_vs_qlearn = mean_reward(eval_results)

eval_results = evaluate("connectx", [q_agent, dqn_agent], num_episodes=500)
mr_qlearn_vs_dqn = mean_reward(eval_results)

print(f"🔵 DQN vs PPO Mean Reward: {mr_dqn_vs_ppo:.5f}")
print(f"🔵 PPO vs DQN Mean Reward: {mr_ppo_vs_dqn:.5f}")
print(f"🔵 Q-Learning vs PPO Mean Reward: {mr_qlearn_vs_ppo:.5f}")
print(f"🔵 PPO vs Q-Learning Mean Reward: {mr_ppo_vs_qlearn:.5f}")
print(f"🔵 DQN vs Q-Learning Mean Reward: {mr_dqn_vs_qlearn:.5f}")
print(f"🔵 Q-Learning vs DQN Mean Reward: {mr_qlearn_vs_dqn:.5f}")

🔵 DQN vs PPO Mean Reward: 1.00000
🔵 PPO vs DQN Mean Reward: 1.00000
🔵 Q-Learning vs PPO Mean Reward: 0.14800
🔵 PPO vs Q-Learning Mean Reward: 0.96400
🔵 DQN vs Q-Learning Mean Reward: 0.97600
🔵 Q-Learning vs DQN Mean Reward: 0.41700
