In [1]:
import random
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
%matplotlib inline

import gym
from kaggle_environments import make, evaluate
from gym import spaces

import torch
import torch as th
import torch.nn as nn

#!pip install "stable-baselines3"
from stable_baselines3 import PPO 
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor

  from pkg_resources import resource_stream, resource_exists
Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)
Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)
Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(pkg)
Implementing implicit namespace packages (as specified in PEP 420) is preferred to `pkg_resources.declare_namespace`. See https://setuptools.pypa.io/en/latest/references/keywords.html#keyword-namespace-packages
  declare_namespace(

In [None]:
#https://medium.com/@chen-yu/building-a-customized-residual-cnn-with-pytorch-471810e894ed

# ✅ Define Residual Block
class ResidualBlock(nn.Module):
    def __init__(self, in_channels: int, out_channels: int) -> None:
        super().__init__()
        self.conv1 = nn.Conv2d(in_channels, out_channels, kernel_size=3, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(out_channels)
        self.relu = nn.ReLU()
        self.conv2 = nn.Conv2d(out_channels, out_channels, kernel_size=3, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(out_channels)

        # 🔥 Downsample if input channels ≠ output channels
        self.downsample = None
        if in_channels != out_channels:
            self.downsample = nn.Sequential(
                nn.Conv2d(in_channels, out_channels, kernel_size=1, bias=False),
                nn.BatchNorm2d(out_channels)
            )

    def forward(self, x):
        identity = self.downsample(x) if self.downsample else x
        x = self.conv1(x)
        x = self.bn1(x)
        x = self.relu(x)
        x = self.conv2(x)
        x = self.bn2(x)
        x += identity  # 🔥 Residual Connection
        return self.relu(x)


# ✅ Define Custom Feature Extractor with Residual Blocks
class CustomResNetCNN(BaseFeaturesExtractor):
    def __init__(self, observation_space: gym.spaces.Box, features_dim: int = 256):
        super().__init__(observation_space, features_dim)
        
        n_input_channels = observation_space.shape[0]  # 1 channel for Connect 4

        self.cnn = nn.Sequential(
            ResidualBlock(n_input_channels, 32),  # Expand to 64
            ResidualBlock(32, 64),  # Keep 64
            ResidualBlock(64, 64),  # Keep 64
            ResidualBlock(64, 128),  # Expand to 128
            ResidualBlock(128, 128),  # Keep 128
            ResidualBlock(128, 256),  # Expand to 256
            ResidualBlock(256, 256),  # Keep 256
            nn.Flatten(),
        )

        # Compute output shape dynamically
        with th.no_grad():
            n_flatten = self.cnn(th.as_tensor(observation_space.sample()[None]).float()).shape[1]

        self.linear = nn.Sequential(
            nn.Linear(n_flatten, 512),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(512, features_dim),
            nn.ReLU()
        )

    def forward(self, observations: th.Tensor) -> th.Tensor:
        return self.linear(self.cnn(observations))

In [None]:
class CustomCNN(BaseFeaturesExtractor):
    
    def __init__(self, observation_space: gym.spaces.Box, features_dim: int=256):
        super(CustomCNN, self).__init__(observation_space, features_dim)
        # CxHxW images (channels first)
        n_input_channels = observation_space.shape[0]
        self.cnn = nn.Sequential(
            nn.Conv2d(n_input_channels, 32, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.Flatten(),
        )

        # Compute shape by doing one forward pass
        with th.no_grad():
            n_flatten = self.cnn(
                th.as_tensor(observation_space.sample()[None]).float()
            ).shape[1]

        self.linear = nn.Sequential(nn.Linear(n_flatten, features_dim), nn.ReLU())

    def forward(self, observations: th.Tensor) -> th.Tensor:
        return self.linear(self.cnn(observations))


In [None]:
def MinMaxAB_agent(obs, config): #MinMax with alpha-beta pruning
    ################################
    # Imports and helper functions #
    ################################
    
    import numpy as np
    import random
    # Helper function for score_move: gets board at next step if agent drops piece in selected column
    def drop_piece(grid, col, mark, config):
        next_grid = grid.copy()
        for row in range(config.rows-1, -1, -1):
            if next_grid[row][col] == 0:
                break
        next_grid[row][col] = mark
        return next_grid
            
    # Helper function for get_heuristic: checks if window satisfies heuristic conditions
    def check_window(window, num_discs, piece, config):
        return (window.count(piece) == num_discs and window.count(0) == config.inarow-num_discs)
        
    # Helper function for get_heuristic: counts number of windows satisfying specified heuristic conditions
    def count_windows(grid, num_discs, piece, config):
        num_windows = 0
        # horizontal
        for row in range(config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[row, col:col+config.inarow])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1
        # vertical
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns):
                window = list(grid[row:row+config.inarow, col])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1
        # positive diagonal
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row+config.inarow), range(col, col+config.inarow)])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1
        # negative diagonal
        for row in range(config.inarow-1, config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1
        return num_windows
    
    # Helper function for minimax: calculates value of heuristic for grid
    def get_heuristic(grid, mark, config):
        num_threes = count_windows(grid, 3, mark, config)
        num_fours = count_windows(grid, 4, mark, config)
        num_threes_opp = count_windows(grid, 3, mark%2+1, config)
        num_fours_opp = count_windows(grid, 4, mark%2+1, config)
        score = num_threes - 1e2*num_threes_opp - 1e4*num_fours_opp + 1e6*num_fours
        return score

    # Uses minimax to calculate value of dropping piece in selected column
    def score_move(grid, col, mark, config, nsteps):
        next_grid = drop_piece(grid, col, mark, config)
        score = minimax(next_grid, nsteps-1, False, mark, config)
        return score
    
    # Helper function for minimax: checks if agent or opponent has four in a row in the window
    def is_terminal_window(window, config):
        return window.count(1) == config.inarow or window.count(2) == config.inarow
    
    # Helper function for minimax: checks if game has ended
    def is_terminal_node(grid, config):
        # Check for draw 
        if list(grid[0, :]).count(0) == 0:
            return True
        # Check for win: horizontal, vertical, or diagonal
        # horizontal 
        for row in range(config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[row, col:col+config.inarow])
                if is_terminal_window(window, config):
                    return True
        # vertical
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns):
                window = list(grid[row:row+config.inarow, col])
                if is_terminal_window(window, config):
                    return True
        # positive diagonal
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row+config.inarow), range(col, col+config.inarow)])
                if is_terminal_window(window, config):
                    return True
        # negative diagonal
        for row in range(config.inarow-1, config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
                if is_terminal_window(window, config):
                    return True
        return False
    
    # Minimax implementation
    def minimax(node, depth, maximizingPlayer, mark, config, a=-np.Inf, b=np.Inf):
        is_terminal = is_terminal_node(node, config)
        valid_moves = [c for c in range(config.columns) if node[0][c] == 0]
        if depth == 0 or is_terminal:
            return get_heuristic(node, mark, config)
        if maximizingPlayer:
            value = -np.Inf
            for col in valid_moves:
                child = drop_piece(node, col, mark, config)
                value = max(value, minimax(child, depth-1, False, mark, config, a, b))
                if value > b:
                    break
            a = max(a, value)
            return value
        else:
            value = np.Inf
            for col in valid_moves:
                child = drop_piece(node, col, mark%2+1, config)
                value = min(value, minimax(child, depth-1, True, mark, config, a, b))
                if value < a:
                    break
            b = min(b, value)
            return value

    
    #########################
    # Agent makes selection #
    #########################


    # How deep to make the game tree: higher values take longer to run!
    N_STEPS = 1
    # Get list of valid moves
    valid_moves = [c for c in range(config.columns) if obs.board[c] == 0]
    # Convert the board to a 2D grid
    grid = np.asarray(obs.board).reshape(config.rows, config.columns)
    # Use the heuristic to assign a score to each possible board in the next step
    scores = dict(zip(valid_moves, [score_move(grid, col, obs.mark, config, N_STEPS) for col in valid_moves]))
    # Get a list of columns (moves) that maximize the heuristic
    max_cols = [key for key in scores.keys() if scores[key] == max(scores.values())]
    # Select at random from the maximizing columns
    return random.choice(max_cols)


    
    #valid_moves = [col for col in range(config.columns) if obs.board[col] == 0]
    #for col in valid_moves:
        #if check_winning_move(obs, config, col, obs.mark):
            #return col
            
    #return random.choice(valid_moves)

In [None]:
class ConnectFourGym(gym.Env):
    def __init__(self, agent2="random"):
        ks_env = make("connectx", debug=True)
        self.env = ks_env.train([None, agent2])
        self.rows = ks_env.configuration.rows
        self.columns = ks_env.configuration.columns
        # Learn about spaces here: http://gym.openai.com/docs/#spaces
        self.action_space = spaces.Discrete(self.columns)

        self.observation_space = spaces.Box(low=0, high=2, 
                                    shape=(1, self.rows, self.columns), 
                                    dtype=np.float32)  # ✅ Change dtype for DQN

        # Tuple corresponding to the min and max possible rewards
        self.reward_range = (-10, 1)
        # StableBaselines throws error if these are not defined
        self.spec = None
        self.metadata = None

        self.history = []  # Store (episode, wins, losses, draws)
        self.wins = 0
        self.losses = 0
        self.draws = 0
        self.timestep = 0  # ✅ Track total timesteps
        self.winR = 2
        self.lostR = -5
        self.drawR = -0.5

    def print_board(self, board):
        symbols = {0: ".", 1: "X", 2: "O"}  # X for agent, O for opponent
        for r in range(self.rows):
            row = [symbols[board[r * self.columns + c]] for c in range(self.columns)]
            print(" ".join(row))
        print("-" * 10)  # Separator

    
    def reset(self):
        self.episode_reward = 0  # Track total reward. Reset for every new gamble.
        
        self.obs = self.env.reset()
        return np.array(self.obs['board']).reshape(1, self.rows, self.columns).astype(np.float32)
        
    def change_reward(self, old_reward, done):
        if old_reward == 1:  # Model wins
            return self.winR
        elif done:  # Model loses
            return self.lostR
        else:
            #return 0.05  # Small positive reward
            return 1/(self.rows*self.columns)

            
    def step(self, action):
        # Check if agent's move is valid
        is_valid = (self.obs['board'][int(action)] == 0)
        if is_valid: # Play the move
            self.obs, old_reward, done, _ = self.env.step(int(action))
            reward = self.change_reward(old_reward, done)
        else: # End the game and penalize agent
            reward, done, _ = self.drawR, True, {}


        #print(f"Action: {action}, Reward: {reward}, Done: {done}")  # Debug print
        
        self.episode_reward += reward  # Track total episode reward
        
        if done: #Print traning condition.
            self.timestep += 1  # ✅ Increment timestep counter
            #print(f"Episode Finished! Total Reward: {self.episode_reward}", done)
            
            if reward == self.winR:
                self.wins += 1
                #print(f"Episode Finished! Model Won! Total Reward: {self.episode_reward}")
            elif reward == self.lostR:
                self.losses += 1
                #print(f"Episode Finished! Model Lost! Total Reward: {self.episode_reward}")
            else:
                self.draws += 1
                #print(f"Episode Finished! Draw! Total Reward: {self.episode_reward}")
            
            # Store history
            self.history.append((len(self.history) + 1, self.wins, self.losses, self.draws))

            # ✅ Print every 50 timesteps
            if self.timestep % 50 == 0:
                print(f"[Step {self.timestep}] Total Wins: {self.wins}, Losses: {self.losses}, Draws: {self.draws}, Total Reward: {self.episode_reward}")


            #print(f"Total Win:{ self.wins}; Total Lost:{self.losses}; Total draw:{self.draws}")

        #print(f"Model chose action: {action}, Reward: {reward}")
        #self.print_board(self.obs['board'])  # 🔥 Visualize board after each move
        
        return np.array(self.obs['board']).reshape(1,self.rows,self.columns), reward, done, _

In [None]:
from stable_baselines3.common.callbacks import BaseCallback
import os
import time

class StopTrainingCallback(BaseCallback):
    def __init__(self, save_freq=1000, threshold=0.05, verbose=1):
        super().__init__(verbose)
        self.threshold = threshold
        self.save_freq = save_freq
        self.bestWinRate = 0
        #self.save_path = save_path

    def _on_step(self) -> bool:
        """Called at every step during training."""
        total_games = self.training_env.envs[0].wins + self.training_env.envs[0].losses + self.training_env.envs[0].draws
        if total_games > 25:
            win_rate = self.training_env.envs[0].wins / total_games
        else:
            win_rate = -1
        #print(win_rate)
        if win_rate > self.bestWinRate: #self.training_env.envs[0].bestWinRate:
            print(self.bestWinRate)
            self.bestWinRate = win_rate
            self.model.save(f"ppo_connect4_DynReward_WR_{win_rate:.2f}.pkl")  # Save updated model
        #print(self.bestWinRate)
        
        """
        if self.n_calls % self.save_freq == 0:  # Every 50 steps
            timestamp = time.strftime("%Y-%m-%d_%H-%M-%S")  # Format: YYYY-MM-DD_HH-MM-SS
            model_path = f"ppo_{timestamp}_step_{self.n_calls}.zip"
            self.model.save(model_path)
            if self.verbose:
                print(f"✅ Model saved at: {model_path}")     
        """
                
        total_games = self.training_env.envs[0].wins + self.training_env.envs[0].losses + self.training_env.envs[0].draws
        if total_games > 100:  # Avoid division by zero
            loss_rate = self.training_env.envs[0].losses / total_games
            #if self.verbose > 0:
                #print(f"🔎 Checking Stop Condition: Loss Rate = {loss_rate:.4f}")

            if loss_rate < self.threshold:  # ✅ Stop training if loss rate is too low
                print(f"🚀 Stopping Training! Loss Rate = {loss_rate:.4f} < {self.threshold}")
                return False  # Returning False stops training
        return True  # Continue training

# ✅ Initialize the callback
stop_callback = StopTrainingCallback(threshold=0.06, verbose=1)

In [None]:
class DymaicRewardConnectFour(gym.Env):
    def __init__(self, agent2="random"):
        ks_env = make("connectx", debug=True)
        self.env = ks_env.train([None, agent2])
        self.config = ks_env.configuration
        #ks_env.configuration['inarow'] = 5
        self.rows = ks_env.configuration.rows
        self.columns = ks_env.configuration.columns
        # Learn about spaces here: http://gym.openai.com/docs/#spaces
        self.action_space = spaces.Discrete(self.columns)
        self.observation_space = spaces.Box(low=0, high=2, 
                                            shape=(1,self.rows,self.columns), dtype=int)
        # Tuple corresponding to the min and max possible rewards
        self.reward_range = (-10, 10)
        # StableBaselines throws error if these are not defined
        self.spec = None
        self.metadata = None
        self.N = 1
        self.winR = 8/self.N
        self.lostR = -5/self.N
        self.illM = -8/self.N
        self.draw = 2/self.N

        self.wins = 0
        self.losses = 0
        self.draws = 0
        
        self.timestep = 0
        self.history = []

        self.bestWinRate = 0
        
        
        #####Dynaimc reward
    # Helper function for get_heuristic: checks if window satisfies heuristic conditions
    def check_window(self, window, num_discs, piece, config):
        return (window.count(piece) == num_discs and window.count(0) == config.inarow-num_discs)
        
    # Helper function for get_heuristic: counts number of windows satisfying specified heuristic conditions
    def count_windows(self, grid, num_discs, piece, config):
        num_windows = 0
        # horizontal
        for row in range(config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[row, col:col+config.inarow])
                if self.check_window(window, num_discs, piece, config):
                    num_windows += 1
        # vertical
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns):
                window = list(grid[row:row+config.inarow, col])
                if self.check_window(window, num_discs, piece, config):
                    num_windows += 1
        # positive diagonal
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row+config.inarow), range(col, col+config.inarow)])
                if self.check_window(window, num_discs, piece, config):
                    num_windows += 1
    # negative diagonal
        for row in range(config.inarow-1, config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
                if self.check_window(window, num_discs, piece, config):
                    num_windows += 1
        return num_windows

    
    # Helper function for minimax: calculates value of heuristic for grid
    def get_heuristic(self, grid, mark, config):
        num_threes = self.count_windows(grid, 3, mark, config)
        num_fours = self.count_windows(grid, 4, mark, config)
        num_threes_opp = self.count_windows(grid, 3, mark%2+1, config)
        num_fours_opp = self.count_windows(grid, 4, mark%2+1, config)
        score = num_threes - 1e2*num_threes_opp - 1e4*num_fours_opp + 1e6*num_fours
        return score
        
    def reset(self):
        self.obs = self.env.reset()
        self.episode_reward = 0
        
        return np.array(self.obs['board']).reshape(1,self.rows,self.columns)

        
    #def change_reward(self, old_reward, done, grid, mark, config):
        #return self.get_heuristic(grid, mark, config)#/1e6*5
        """
        if old_reward == 1: # The agent won the game
            return self.winR
        elif done: # The opponent won the game
            return self.lostR
        else: # Reward 1/42
            return self.get_heuristic(grid, mark, config)
            #return 1/(self.rows*self.columns)
        """
    
    
    def change_reward(self, old_reward, done, grid, mark, config):
        #Assigns a dynamic reward based on board state.
       
        #heuristic_score = self.get_heuristic(grid, mark, config)  # Compute heuristic score
    
        if old_reward == 1:  # Agent won the game 🎉
            return self.winR  # ✅ Give a high reward for winning
        
        elif done:  # Game ended
            return self.lostR  # ❌ Give a harsh penalty for losing
        
        else:
            return self.draw #(1 / (self.rows * self.columns)) * self.winR  # Neutral reward for normal moves

        """
        elif heuristic_score > 5000:  # If the move creates a strong advantage
            return 2  # 🔥 Reward highly for good strategy
        
        elif heuristic_score < -5000:  # If the move helps opponent significantly
            return -3  # ❌ Penalize bad moves that give opponent advantage
        
        elif heuristic_score > 1000:  # If the move is decent
            return 0.5  # 👍 Small encouragement
        
        elif heuristic_score < -1000:  # If the move is weak
            return -1  # ⚠️ Small penalty
        """
           
    def step(self, action):
        # Check if agent's move is valid
        is_valid = (self.obs['board'][int(action)] == 0)
        grid = np.asarray(self.obs.board).reshape(self.config.rows, self.config.columns)
        if is_valid: # Play the move
            self.obs, old_reward, done, _ = self.env.step(int(action))
            reward = self.change_reward(old_reward, done, grid, self.obs.mark, self.config)
        else: # End the game and penalize agent
            reward, done, _ = self.illM, False, {} #True False

        self.episode_reward += reward  # Track total episode reward
        
        if done: #Print traning condition.
            self.timestep += 1  # ✅ Increment timestep counter
            #print(f"Episode Finished! Total Reward: {self.episode_reward}", done)
            
            if reward == self.winR:
                self.wins += 1
                #print(f"Episode Finished! Model Won! Total Reward: {self.episode_reward}")
            elif reward == self.lostR:
                self.losses += 1
                #print(f"Episode Finished! Model Lost! Total Reward: {self.episode_reward}")
            else:
                self.draws += 1
                #print(f"Episode Finished! Draw! Total Reward: {self.episode_reward}")
            
            # Store history
            self.history.append((len(self.history) + 1, self.wins, self.losses, self.draws))
            #print(f"[Step {self.timestep}] Win: {self.wins}, Loss: {self.losses}, Draw: {self.draws}, Total Reward: {self.episode_reward}")
            #print(f"Total Reward: {self.episode_reward}")
               
            # ✅ Print every 50 timesteps
            if self.timestep % 50 == 0:
                total = self.wins +self.losses +self.draws
                win_rate = self.wins / total
                loss_rate = self.losses / total
                draw_rate = self.draws / total
                print(f"[Step {self.timestep}] Win rate: {win_rate:.2f}, Loss rate: {loss_rate:.2f}, Draw rate: {draw_rate:.2f}, Total Reward: {self.episode_reward}")
                #Recompute
                self.wins = 0
                self.losses = 0
                self.draws = 0

        #print(f"Total Reward: {self.episode_reward}")
        
        return np.array(self.obs['board']).reshape(1,self.rows,self.columns), reward, done, _


In [None]:
import torch

if torch.cuda.is_available():
    print("✅ GPU is available: Using", torch.cuda.get_device_name(0))
else:
    print("❌ GPU not available: Using CPU")


In [None]:
# Create ConnectFour environment 
#envTrain = ConnectFourGym(agent2="random")
#envTrain = DymaicRewardConnectFour(agent2="random")
#envTrain = ConnectFourGym(agent2=MinMaxAB_agent)
#policy_kwargs = dict(
   #features_extractor_class=CustomResNetCNN,
#)

In [None]:
from stable_baselines3 import DQN
from stable_baselines3.common.vec_env import DummyVecEnv

device = "cuda" if torch.cuda.is_available() else "cpu"

# Create the environment
envTrain_new = DummyVecEnv([lambda: DymaicRewardConnectFour(agent2="random")])  # Start with a weak opponent

policy_kwargs = dict(
    features_extractor_class=CustomResNetCNN, #CustomResNetCNN,  # Use custom CNN CustomCNN
    features_extractor_kwargs=dict(features_dim=256)  # Output feature size
)


#model = DQN.load("/kaggle/working/dqn_connect4.pkl", env=envTrain_new, device=device)

# Define the DQN model
new_model = DQN("CnnPolicy", envTrain_new, 
            policy_kwargs=policy_kwargs,
            learning_rate=0.00015,  # ✅ Reduce learning rate for smoother updates
            buffer_size=200000,  # ✅ Larger buffer for better experience replay
            batch_size=128,  # ✅ Larger batch size for better training stability
            gamma=0.98,  # ✅ Slightly reduce discount factor to balance short vs. long-term rewards
            exploration_fraction=0.08,  # ✅ Reduce exploration decay for better exploration
            exploration_final_eps=0.05,  # ✅ Ensure some exploration remains
            target_update_interval=5000,  # ✅ Less frequent target updates for stable learning
            train_freq=8,  # ✅ Train every 8 steps for more updates
            verbose=0, 
            device="cuda")  # ✅ Force GPU usage
"""

# Define PPO Model
new_model = PPO("CnnPolicy", envTrain_new, 
            policy_kwargs=policy_kwargs,
            learning_rate=0.00015,  # ✅ Smooth learning rate
            n_steps=2048,  # ✅ Increase rollout buffer size
            batch_size=128,  # ✅ Larger batch size for stable training
            n_epochs=10,  # ✅ More epochs for policy updates
            gamma=0.98,  # ✅ Reduce discount factor for balanced long-term rewards
            gae_lambda=0.95,  # ✅ Generalized Advantage Estimation
            clip_range=0.2,  # ✅ Standard PPO clip range
            ent_coef=0.01,  # ✅ Encourage exploration with entropy
            vf_coef=0.5,  # ✅ Value function coefficient
            max_grad_norm=0.5,  # ✅ Gradient clipping for stability
            verbose=0,  
            device="cuda")  # ✅ Force GPU usag
"""
print(new_model.device)
# Load the trained parameters from the old model into the new one
#new_model.policy.load_state_dict(model.policy.state_dict())

# Train the model
new_model.learn(total_timesteps=100000, callback=stop_callback)
new_model.save("dqn_connect4.pkl")

In [None]:
from IPython.display import Javascript

# ✅ Save the model first
#new_model.save("/kaggle/working/dqn_connect4.pkl")

# ✅ Auto-download (triggers without clicking)
#auto_download("dqn_connect4.pkl")

from IPython.display import FileLink

#model.save("/kaggle/working/dqn_connect4.pkl")  # ✅ Save model
FileLink("/kaggle/working/dqn_connect4.pkl")  # ✅ Clickable download link

"""
import time

# Start timer
start_time = time.time()


end_time = time.time()

# Print total training time
print(f"Training completed in {end_time - start_time:.2f} seconds")
"""

In [None]:
# Use GPU if available
device = "cuda" if torch.cuda.is_available() else "cpu"

#envTrain_new = ConnectFourGym(agent2="random")  # Change the opponent
envTrain_new = ConnectFourGym(agent2=MinMaxAB_agent)  # Change the opponent

model = PPO.load("ppo_connect4.pkl", env=envTrain_new, device=device)
#model = PPO.load("/kaggle/input/ppo_connect4/pytorch/default/1/ppo_connect4.pkl", env=envTrain_new, device=device)

# Create a new model with modified hyperparameters, using the old model's parameters
new_model = PPO("CnnPolicy", env=envTrain_new, policy_kwargs=model.policy_kwargs, 
                learning_rate=0.0003,  # Change learning rate here
                gamma=0.99,            # Change gamma here
                clip_range=0.1,       # Change clip range here
                verbose=1, device=device)

# Load the trained parameters from the old model into the new one
new_model.policy.load_state_dict(model.policy.state_dict())

# Continue training
new_model.learn(total_timesteps=50000)
new_model.save("ppo_connect4.pkl")

In [None]:
new_model.save("ppo_connect42.pkl")

In [None]:
import matplotlib.pyplot as plt

# Extract data
episodes = [h[0] for h in envTrain_new.history]
wins = [h[1] for h in envTrain_new.history]
losses = [h[2] for h in envTrain_new.history]
draws = [h[3] for h in envTrain_new.history]

# Plot results
plt.figure(figsize=(10, 5))
plt.plot(episodes, wins, label="Wins", linestyle="-", marker="o")
plt.plot(episodes, losses, label="Losses", linestyle="--", marker="x")
plt.plot(episodes, draws, label="Draws", linestyle=":", marker="s")

plt.xlabel("Episode")
plt.ylabel("Count")
plt.title("Connect Four Training Progress")
plt.legend()
plt.grid(True)
plt.show()


In [None]:
#model.save("ppo_connect4.pkl")
#model = PPO.load("/kaggle/input/ppo_connect4/pytorch/default/1/ppo_connect4.pkl")
model = PPO.load("ppo_connect4.pkl")

def agent1(obs, config):
    # Use the best model to select a column
    col, _ = model.predict(np.array(obs['board']).reshape(1, 6,7))
    # Check if selected column is valid
    is_valid = (obs['board'][int(col)] == 0)
    # If not valid, select random move. 
    if is_valid:
        return int(col)
    else:
        return random.choice([col for col in range(config.columns) if obs.board[int(col)] == 0])
        
def get_win_percentages(agent1, agent2, n_rounds=100):
    # Use default Connect Four setup
    config = {'rows': 6, 'columns': 7, 'inarow': 4}
    # Agent 1 goes first (roughly) half the time          
    outcomes = evaluate("connectx", [agent1, agent2], config, [], n_rounds//2)
    # Agent 2 goes first (roughly) half the time      
    outcomes += [[b,a] for [a,b] in evaluate("connectx", [agent2, agent1], config, [], n_rounds-n_rounds//2)]
    print("Agent 1 Win Percentage:", np.round(outcomes.count([1,-1])/len(outcomes), 2))
    print("Agent 2 Win Percentage:", np.round(outcomes.count([-1,1])/len(outcomes), 2))
    print("Number of Invalid Plays by Agent 1:", outcomes.count([None, 0]))
    print("Number of Invalid Plays by Agent 2:", outcomes.count([0, None]))



get_win_percentages(agent1=agent1, agent2="random", n_rounds=20)

In [None]:
def agent1(obs, config):
    # Use the best model to select a column
    col, _ = model.predict(np.array(obs['board']).reshape(1, 6,7))
    # Check if selected column is valid
    is_valid = (obs['board'][int(col)] == 0)
    # If not valid, select random move. 
    if is_valid:
        return int(col)
    else:
        return random.choice([col for col in range(config.columns) if obs.board[int(col)] == 0])

In [None]:
# Create the game environment
env = make("connectx")

# Two random agents play one game round
env.run([agent1, "random"])

# Show the game
env.render(mode="ipython")

In [None]:
import random
import numpy as np
import pandas as pd
import gym
import matplotlib.pyplot as plt
%matplotlib inline

from kaggle_environments import make, evaluate
from gym import spaces

import torch
import torch as th
import torch.nn as nn

#!pip install "stable-baselines3"
from stable_baselines3 import PPO 
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor



print("GPU Available:", torch.cuda.is_available())
print("Device:", torch.cuda.get_device_name(0) if torch.cuda.is_available() else "CPU")

In [None]:
def MinMaxAB_agent(obs, config): #MinMax with alpha-beta pruning
    ################################
    # Imports and helper functions #
    ################################
    
    import numpy as np
    import random
    # Helper function for score_move: gets board at next step if agent drops piece in selected column
    def drop_piece(grid, col, mark, config):
        next_grid = grid.copy()
        for row in range(config.rows-1, -1, -1):
            if next_grid[row][col] == 0:
                break
        next_grid[row][col] = mark
        return next_grid
            
    # Helper function for get_heuristic: checks if window satisfies heuristic conditions
    def check_window(window, num_discs, piece, config):
        return (window.count(piece) == num_discs and window.count(0) == config.inarow-num_discs)
        
    # Helper function for get_heuristic: counts number of windows satisfying specified heuristic conditions
    def count_windows(grid, num_discs, piece, config):
        num_windows = 0
        # horizontal
        for row in range(config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[row, col:col+config.inarow])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1
        # vertical
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns):
                window = list(grid[row:row+config.inarow, col])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1
        # positive diagonal
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row+config.inarow), range(col, col+config.inarow)])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1
        # negative diagonal
        for row in range(config.inarow-1, config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1
        return num_windows
    
    # Helper function for minimax: calculates value of heuristic for grid
    def get_heuristic(grid, mark, config):
        num_threes = count_windows(grid, 3, mark, config)
        num_fours = count_windows(grid, 4, mark, config)
        num_threes_opp = count_windows(grid, 3, mark%2+1, config)
        num_fours_opp = count_windows(grid, 4, mark%2+1, config)
        score = num_threes - 1e2*num_threes_opp - 1e4*num_fours_opp + 1e6*num_fours
        return score

    # Uses minimax to calculate value of dropping piece in selected column
    def score_move(grid, col, mark, config, nsteps):
        next_grid = drop_piece(grid, col, mark, config)
        score = minimax(next_grid, nsteps-1, False, mark, config)
        return score
    
    # Helper function for minimax: checks if agent or opponent has four in a row in the window
    def is_terminal_window(window, config):
        return window.count(1) == config.inarow or window.count(2) == config.inarow
    
    # Helper function for minimax: checks if game has ended
    def is_terminal_node(grid, config):
        # Check for draw 
        if list(grid[0, :]).count(0) == 0:
            return True
        # Check for win: horizontal, vertical, or diagonal
        # horizontal 
        for row in range(config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[row, col:col+config.inarow])
                if is_terminal_window(window, config):
                    return True
        # vertical
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns):
                window = list(grid[row:row+config.inarow, col])
                if is_terminal_window(window, config):
                    return True
        # positive diagonal
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row+config.inarow), range(col, col+config.inarow)])
                if is_terminal_window(window, config):
                    return True
        # negative diagonal
        for row in range(config.inarow-1, config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
                if is_terminal_window(window, config):
                    return True
        return False
    
    # Minimax implementation
    def minimax(node, depth, maximizingPlayer, mark, config, a=-np.Inf, b=np.Inf):
        is_terminal = is_terminal_node(node, config)
        valid_moves = [c for c in range(config.columns) if node[0][c] == 0]
        if depth == 0 or is_terminal:
            return get_heuristic(node, mark, config)
        if maximizingPlayer:
            value = -np.Inf
            for col in valid_moves:
                child = drop_piece(node, col, mark, config)
                value = max(value, minimax(child, depth-1, False, mark, config, a, b))
                if value > b:
                    break
            a = max(a, value)
            return value
        else:
            value = np.Inf
            for col in valid_moves:
                child = drop_piece(node, col, mark%2+1, config)
                value = min(value, minimax(child, depth-1, True, mark, config, a, b))
                if value < a:
                    break
            b = min(b, value)
            return value

    
    #########################
    # Agent makes selection #
    #########################


    # How deep to make the game tree: higher values take longer to run!
    N_STEPS = 3
    # Get list of valid moves
    valid_moves = [c for c in range(config.columns) if obs.board[c] == 0]
    # Convert the board to a 2D grid
    grid = np.asarray(obs.board).reshape(config.rows, config.columns)
    # Use the heuristic to assign a score to each possible board in the next step
    scores = dict(zip(valid_moves, [score_move(grid, col, obs.mark, config, N_STEPS) for col in valid_moves]))
    # Get a list of columns (moves) that maximize the heuristic
    max_cols = [key for key in scores.keys() if scores[key] == max(scores.values())]
    # Select at random from the maximizing columns
    return random.choice(max_cols)


    
    #valid_moves = [col for col in range(config.columns) if obs.board[col] == 0]
    #for col in valid_moves:
        #if check_winning_move(obs, config, col, obs.mark):
            #return col
            
    #return random.choice(valid_moves)

In [None]:
def OneStep_agent(obs, config): #One-Step Look
    ################################
    # Imports and helper functions #
    ################################
    
    import numpy as np
    import random
    
    # Calculates score if agent drops piece in selected column
    def score_move(grid, col, mark, config):
        next_grid = drop_piece(grid, col, mark, config)
        score = get_heuristic(next_grid, mark, config)
        return score
    
    # Helper function for score_move: gets board at next step if agent drops piece in selected column
    def drop_piece(grid, col, mark, config):
        next_grid = grid.copy()
        for row in range(config.rows-1, -1, -1):
            if next_grid[row][col] == 0:
                break
        next_grid[row][col] = mark
        return next_grid
    
    # Helper function for score_move: calculates value of heuristic for grid
    A = 1*10**9
    B = 1*10**6
    C = 1*10**2
    D = -1*10**6
    E = -1*10**9
    def get_heuristic(grid, mark, config):
        num_fours = count_windows(grid, 4, mark, config)
        num_threes = count_windows(grid, 3, mark, config)
        num_two = count_windows(grid, 2, mark, config)
        num_two_opp = count_windows(grid, 2, mark%2+1, config)
        num_threes_opp = count_windows(grid, 3, mark%2+1, config)
        score = A*num_fours+B*num_threes+C*num_two+D*num_two_opp+E*num_threes_opp
        return score
    
    # Helper function for get_heuristic: checks if window satisfies heuristic conditions
    def check_window(window, num_discs, piece, config):
        return (window.count(piece) == num_discs and window.count(0) == config.inarow-num_discs)
        
    # Helper function for get_heuristic: counts number of windows satisfying specified heuristic conditions
    def count_windows(grid, num_discs, piece, config):
        num_windows = 0
        # horizontal
        for row in range(config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[row, col:col+config.inarow])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1
        # vertical
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns):
                window = list(grid[row:row+config.inarow, col])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1
        # positive diagonal
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row+config.inarow), range(col, col+config.inarow)])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1
        # negative diagonal
        for row in range(config.inarow-1, config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
                if check_window(window, num_discs, piece, config):
                    num_windows += 1
        return num_windows
        
    #########################
    # Agent makes selection #
    #########################

    valid_moves = [c for c in range(config.columns) if obs.board[c] == 0]
    grid = np.asarray(obs.board).reshape(config.rows, config.columns)
    scores = dict(zip(valid_moves, [score_move(grid, col, obs.mark, config) for col in valid_moves]))
    max_cols = [key for key in scores.keys() if scores[key] == max(scores.values())]
    
    return random.choice(max_cols)

In [None]:
def Simple_agent(obs, config): #Simple agent with basic rulers block and winning move!
    # Your code here: Amend the agent!
    import numpy as np
    import random

    """
    valid_moves = [col for col in range(config.columns) if obs.board[col] == 0]
    return random.choice(valid_moves)
    """
    # Gets board at next step if agent drops piece in selected column
    def drop_piece(grid, col, piece, config):
        next_grid = grid.copy()
        for row in range(config.rows-1, -1, -1):
            if next_grid[row][col] == 0:
                break
        next_grid[row][col] = piece
        return next_grid
    
    # Returns True if dropping piece in column results in game win
    def check_winning_move(obs, config, col, piece):
        # Convert the board to a 2D grid
        grid = np.asarray(obs.board).reshape(config.rows, config.columns)
        next_grid = drop_piece(grid, col, piece, config)
        # horizontal
        for row in range(config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(next_grid[row,col:col+config.inarow])
                if window.count(piece) == config.inarow:
                    return True
        # vertical
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns):
                window = list(next_grid[row:row+config.inarow,col])
                if window.count(piece) == config.inarow:
                    return True
        # positive diagonal
        for row in range(config.rows-(config.inarow-1)):
            for col in range(config.columns-(config.inarow-1)):
                window = list(next_grid[range(row, row+config.inarow), range(col, col+config.inarow)])
                if window.count(piece) == config.inarow:
                    return True
        # negative diagonal
        for row in range(config.inarow-1, config.rows):
            for col in range(config.columns-(config.inarow-1)):
                window = list(next_grid[range(row, row-config.inarow, -1), range(col, col+config.inarow)])
                if window.count(piece) == config.inarow:
                    return True
        return False
    
    OpnPlayer = 1
    if obs.mark == 1:
        OpnPlayer = 2
    
    Blocking = []
    empty = []

    for col in range(config.columns): #Winning
        if check_winning_move(obs, config, col, obs.mark):
            return col
        if check_winning_move(obs, config, col, OpnPlayer):
            Blocking.append(col)
        if obs.board[col] == 0:
            empty.append(col)

    return Blocking[0] if Blocking else random.choice(empty)


In [None]:
from kaggle_environments import make, evaluate

# Create the game environment
env = make("connectx")

# Two random agents play one game round
env.run([MinMaxAB_agent, "random"]) #my_agent_Nab my_agent3

# Show the game
env.render(mode="ipython")

In [None]:
import random
import numpy as np
import pandas as pd
import gym
import matplotlib.pyplot as plt
%matplotlib inline

from kaggle_environments import make, evaluate
from gym import spaces

class ConnectFourGym(gym.Env):
    def __init__(self, agent2="random"):
        ks_env = make("connectx", debug=True)
        self.env = ks_env.train([None, agent2])
        self.rows = ks_env.configuration.rows
        self.columns = ks_env.configuration.columns
        # Learn about spaces here: http://gym.openai.com/docs/#spaces
        self.action_space = spaces.Discrete(self.columns)
        self.observation_space = spaces.Box(low=0, high=2, 
                                            shape=(1,self.rows,self.columns), dtype=int)
        # Tuple corresponding to the min and max possible rewards
        self.reward_range = (-10, 1)
        # StableBaselines throws error if these are not defined
        self.spec = None
        self.metadata = None
        
    def reset(self):
        self.obs = self.env.reset()
        return np.array(self.obs['board']).reshape(1,self.rows,self.columns)
        
    def change_reward(self, old_reward, done):
        if old_reward == 1: # The agent won the game
            return 1
        elif done: # The opponent won the game
            return -1
        else: # Reward 1/42
            return 1/(self.rows*self.columns)
            
    def step(self, action):
        # Check if agent's move is valid
        is_valid = (self.obs['board'][int(action)] == 0)
        if is_valid: # Play the move
            self.obs, old_reward, done, _ = self.env.step(int(action))
            reward = self.change_reward(old_reward, done)
        else: # End the game and penalize agent
            reward, done, _ = -10, True, {}
        return np.array(self.obs['board']).reshape(1,self.rows,self.columns), reward, done, _
    
# Create ConnectFour environment 
envTrain = ConnectFourGym(agent2="random")
#envTrain = ConnectFourGym(agent2=MinMaxAB_agent)

In [None]:
## Old Structure

import torch as th
import torch.nn as nn

#!pip install "stable-baselines3"
from stable_baselines3 import PPO 
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor

# Neural network for predicting action values
class CustomCNN(BaseFeaturesExtractor):
    
    def __init__(self, observation_space: gym.spaces.Box, features_dim: int=128):
        super(CustomCNN, self).__init__(observation_space, features_dim)
        # CxHxW images (channels first)
        n_input_channels = observation_space.shape[0]
        self.cnn = nn.Sequential(
            nn.Conv2d(n_input_channels, 32, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=0),
            nn.ReLU(),
            nn.Flatten(),
        )

        # Compute shape by doing one forward pass
        with th.no_grad():
            n_flatten = self.cnn(
                th.as_tensor(observation_space.sample()[None]).float()
            ).shape[1]

        self.linear = nn.Sequential(nn.Linear(n_flatten, features_dim), nn.ReLU())

    def forward(self, observations: th.Tensor) -> th.Tensor:
        return self.linear(self.cnn(observations))

policy_kwargs = dict(
    features_extractor_class=CustomCNN,
)

In [None]:
## New Structure

import torch as th
import torch.nn as nn
import gym
from stable_baselines3.common.torch_layers import BaseFeaturesExtractor

class CustomCNN(BaseFeaturesExtractor):
    def __init__(self, observation_space: gym.spaces.Box, features_dim: int = 128):
        super(CustomCNN, self).__init__(observation_space, features_dim)
        
        n_input_channels = observation_space.shape[0]
        
        self.cnn = nn.Sequential(
            nn.Conv2d(n_input_channels, 32, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.Conv2d(32, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Conv2d(64, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.Conv2d(128, 256, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.Conv2d(256, 128, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.Conv2d(128, 64, kernel_size=3, stride=1, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Flatten(),
        )

        # Compute shape by doing a forward pass with a dummy tensor
        with th.no_grad():
            n_flatten = self.cnn(th.as_tensor(observation_space.sample()[None]).float()).shape[1]

        # Multi-layer MLP head for better feature extraction
        self.linear = nn.Sequential(
            nn.Linear(n_flatten, 256),
            nn.ReLU(),
            nn.Dropout(0.2),  # Dropout for regularization
            nn.Linear(256, features_dim),
            nn.ReLU()
        )

    def forward(self, observations: th.Tensor) -> th.Tensor:
        return self.linear(self.cnn(observations))

policy_kwargs = dict(
    features_extractor_class=CustomCNN,
)


In [None]:
from stable_baselines3.common.callbacks import BaseCallback
import time
import numpy as np

class RewardTrackingCallback(BaseCallback):
    def __init__(self, check_freq=1000, verbose=1):
        super(RewardTrackingCallback, self).__init__(verbose)
        self.check_freq = check_freq
        self.episode_rewards = []
        self.episode_wins = 0  # Count wins
        self.episode_losses = 0  # Count losses
        self.episode_count = 0
        self.start_time = time.time()
    
    def _on_step(self) -> bool:
        # Check if episode is done
        if "episode" in self.locals:
            episode_reward = self.locals["episode"]["r"]
            self.episode_rewards.append(episode_reward)
            self.episode_count += 1
            
            # Assume reward of 1 means a win, -1 means a loss
            if episode_reward == 1:
                self.episode_wins += 1
            elif episode_reward == -1:
                self.episode_losses += 1
            
            # Print reward stats every check_freq steps
            if self.num_timesteps % self.check_freq == 0:
                avg_reward = np.mean(self.episode_rewards[-10:])  # Average last 10 episodes
                win_rate = (self.episode_wins / self.episode_count) * 100 if self.episode_count > 0 else 0
                elapsed_time = time.time() - self.start_time
                
                print(f"Timestep: {self.num_timesteps} | Avg Reward: {avg_reward:.2f} | Wins: {self.episode_wins} | Losses: {self.episode_losses} | Win Rate: {win_rate:.2f}% | Time: {elapsed_time:.2f}s")

        return True  # Continue training

# Attach the callback
callback = RewardTrackingCallback(check_freq=1)

# Create PPO model with GPU support
model = PPO("CnnPolicy", envTrain, policy_kwargs=policy_kwargs, verbose=1, device=device)

# Train the agent
model.learn(total_timesteps=50000, callback=callback)


In [None]:
from stable_baselines3.common.callbacks import BaseCallback
import time

class StepRewardTrackingCallback(BaseCallback):
    def __init__(self, verbose=1):
        super(StepRewardTrackingCallback, self).__init__(verbose)
        self.start_time = time.time()

    def _on_step(self) -> bool:
        # Retrieve reward from locals
        reward = self.locals["rewards"]
        done = self.locals["dones"]

        # Print step-wise reward details
        elapsed_time = time.time() - self.start_time
        print(f"Timestep: {self.num_timesteps} | Reward: {reward} | Done: {done} | Time: {elapsed_time:.2f}s")

        return True  # Continue training

# Attach the callback
callback = StepRewardTrackingCallback()

# Create PPO model with GPU support
#model = PPO("CnnPolicy", envTrain, policy_kwargs=policy_kwargs, verbose=1, device=device)

# Train the agent
#model.learn(total_timesteps=50000, callback=callback)

policy_kwargs = dict(
    features_extractor_class=CustomCNN,
)
        
# Initialize agent
model = PPO("CnnPolicy", envTrain, policy_kwargs=policy_kwargs, verbose=0)


In [None]:
# Train agent
model.learn(total_timesteps=6)

In [None]:
import time

# Start timer
start_time = time.time()

# Use GPU if available
device = "cuda" if torch.cuda.is_available() else "cpu"

# Create PPO model with GPU support
model = PPO("CnnPolicy", envTrain, policy_kwargs=policy_kwargs, verbose=1, device=device)

# Train the agent
model.learn(total_timesteps=50000, callback=callback)


###################

# Initialize agent
#model = PPO("CnnPolicy", env, policy_kwargs=policy_kwargs, verbose=0)

# Train agent
#model.learn(total_timesteps=1) #50000

# Save the trained model
#model.save("/kaggle/working/ppo_model")

# End timer
end_time = time.time()

# Print total training time
print(f"Training completed in {end_time - start_time:.2f} seconds")

In [None]:
# Create PPO model with GPU support
model = PPO("CnnPolicy", envTrain, policy_kwargs=policy_kwargs, verbose=1, device=device)

# Train the agent
model.learn(total_timesteps=50000, callback=callback)


In [None]:
model.save("/kaggle/working/ppo_model_winMM")

In [None]:
def RL_agent(obs, config):
    from stable_baselines3 import PPO
    import random
    import numpy as np

   # model = PPO.load("/kaggle/input/ppo_modelpkl/pytorch/default/1/ppo_model.pkl")
    model = PPO.load("/kaggle/working/ppo_model_winMM")
    
    # Use the best model to select a column
    col, _ = model.predict(np.array(obs['board']).reshape(1, 6,7))
    # Check if selected column is valid
    is_valid = (obs['board'][int(col)] == 0)
    # If not valid, select random move. 
    if is_valid:
        return int(col)
    else:
        return random.choice([col for col in range(config.columns) if obs.board[int(col)] == 0])



In [None]:
from kaggle_environments import make, evaluate

# Create the game environment
env = make("connectx")

# Two random agents play one game round
#env.run([RL_agent, MinMaxAB_agent])
env.run([RL_agent, 'random'])

# Show the game
env.render(mode="ipython")

In [None]:
import numpy as np

def get_win_percentages(agent1, agent2, n_rounds=100):
    # Use default Connect Four setup
    config = {'rows': 6, 'columns': 7, 'inarow': 4}
    # Agent 1 goes first (roughly) half the time          
    outcomes = evaluate("connectx", [agent1, agent2], config, [], n_rounds//2)
    # Agent 2 goes first (roughly) half the time      
    outcomes += [[b,a] for [a,b] in evaluate("connectx", [agent2, agent1], config, [], n_rounds-n_rounds//2)]
    print("Agent 1 Win Percentage:", np.round(outcomes.count([1,-1])/len(outcomes), 2))
    print("Agent 2 Win Percentage:", np.round(outcomes.count([-1,1])/len(outcomes), 2))
    print("Number of Invalid Plays by Agent 1:", outcomes.count([None, 0]))
    print("Number of Invalid Plays by Agent 2:", outcomes.count([0, None]))

get_win_percentages(agent1=RL_agent, agent2='random', n_rounds=10)

In [None]:
import inspect
import os

def write_agent_to_file(function, file):
    with open(file, "a" if os.path.exists(file) else "w") as f:
        f.write(inspect.getsource(function))
        print(function, "written to", file)

write_agent_to_file(RL_agent, "submission.py") #my_agent my_agent_Nab

In [None]:
import sys
from kaggle_environments import agent
from kaggle_environments import utils

out = sys.stdout
submission = utils.read_file("/kaggle/working/submission.py")
agent = agent.get_last_callable(submission, path=submission)
sys.stdout = out

env = make("connectx", debug=True)
env.run([agent, agent])
print("Success!" if env.state[0].status == env.state[1].status == "DONE" else "Failed...")

In [None]:
!ls -R /kaggle/working

<h1>Submisstion Version<h1/>

In [None]:
def RL_agent(obs, config):
    from stable_baselines3 import PPO
    import random
    import numpy as np

    model = PPO.load("./ppo_model.pkl")
    
    # Use the best model to select a column
    col, _ = model.predict(np.array(obs['board']).reshape(1, 6,7))
    # Check if selected column is valid
    is_valid = (obs['board'][int(col)] == 0)
    # If not valid, select random move. 
    if is_valid:
        return int(col)
    else:
        return random.choice([col for col in range(config.columns) if obs.board[int(col)] == 0])


import inspect
import os

def write_agent_to_file(function, file):
    with open(file, "a" if os.path.exists(file) else "w") as f:
        f.write(inspect.getsource(function))
        print(function, "written to", file)

write_agent_to_file(RL_agent, "submission.py") #my_agent my_agent_Nab


The submission way was referenced by  
[NickMacd's PPO using SB3 Notebook](https://www.kaggle.com/code/nickmacd/ppo-using-sb3/notebook)  
and  
[Connect X Exercise: Deep RL Submission V1](https://www.kaggle.com/code/svendaj/connect-x-exercise-deep-rl-submission-v1/notebook).


In [None]:
#Load a previous agent
model = PPO.load("/kaggle/input/ppo_modelpkl/pytorch/default/1/ppo_model.pkl")


In [None]:
#save agent
model.save('ppo_connectx')

In [None]:
%%writefile main.py
import os, sys, random
import numpy as np
from stable_baselines3 import PPO
import torch

cwd = '/kaggle_simulations/agent/'
if os.path.exists(cwd):
    sys.path.append(cwd)
else:
    cwd = ''

model = None

def agent(obs, config):
    global model
    # load the trained model
    if model == None:
        model = PPO.load(cwd + "ppo_connectx")
    """
    #reshape the board into the expected output    
    board = torch.tensor(obs['board'], dtype=torch.float32)
    mark = obs['mark']
    board[(board !=mark) & (board != 0)] = 8
    board[board==mark] = 4
    board = board/8
    board = torch.reshape(board, (6,7))
    board = board.unsqueeze(dim=0)
    
    #predict the action
    action, _ = model.predict(board, deterministic=True)
    
    #if valid, return the action, else choose a random action
    if board[0][0][action] ==0:
        return int(action)
    return random.choice([col for col in range(config.columns) if obs.board[int(col)] == 0])
    """
    
    # Use the best model to select a column
    col, _ = model.predict(np.array(obs['board']).reshape(1, 6,7))
    # Check if selected column is valid
    is_valid = (obs['board'][int(col)] == 0)
    # If not valid, select random move. 
    if is_valid:
        return int(col)
    else:
        return random.choice([col for col in range(config.columns) if obs.board[int(col)] == 0])
    

In [None]:
# pack files used for submission
!tar cvfz submission.tar.gz main.py ppo_connectx.zip

In [None]:
import sys
from kaggle_environments import utils, agent

out = sys.stdout
submission = utils.read_file("main.py")
agent = agent.get_last_callable(submission, path= submission)
sys.stdout = out

env = make("connectx", debug=True)
env.run([agent, MinMaxAB_agent])
print("Success!" if env.state[0].status == env.state[1].status == "DONE" else "Failed...")
env.render(mode="ipython")