In [None]:
# Connect 4 Self-Play Arena
# Two Q-Networks battle each other for continuous improvement

import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
from board_processor import BoardProcessor
from feature_generator import FeatureGenerator
from typing import Optional, List, Dict, Tuple
import pickle
from dataclasses import dataclass
import time

In [None]:

@dataclass
class SelfPlayConfig:
    """Configuration for self-play sessions"""
    model_path_alpha: str = "qnet_mc_pretrained.pth"
    model_path_bravo: str = "qnet_mc_pretrained.pth"
    epsilon_alpha: float = 0.1
    epsilon_bravo: float = 0.1
    alpha_plays_first: bool = True
    num_games: int = 100
    verbose: bool = True
    save_games: bool = True
    game_save_path: str = "selfplay_games.pkl"

In [None]:

@dataclass
class GameResult:
    """Store results of a single game"""
    moves: List[int]
    winner: int  # 1 for alpha, -1 for bravo, 0 for draw
    game_length: int
    game_code: str
    alpha_first: bool
    epsilon_alpha: float
    epsilon_bravo: float

In [None]:

class QNetwork(nn.Module):
    def __init__(self, input_dim, hidden_sizes=(256, 128, 64, 32, 16, 8)):
        super().__init__()
        layers = []
        last_dim = input_dim
        for h in hidden_sizes:
            layers.append(nn.Linear(last_dim, h))
            layers.append(nn.Tanh())
            last_dim = h
        layers.append(nn.Linear(last_dim, 1))
        layers.append(nn.Tanh())
        self.net = nn.Sequential(*layers)

    def forward(self, x):
        return self.net(x).squeeze(-1)

In [None]:

class SelfPlayArena:
    """Manages self-play between two Q-Networks"""

    def __init__(self, config: SelfPlayConfig):
        self.config = config
        self.feature_gen = FeatureGenerator()
        self.device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

        # Calculate feature dimensions
        _, dummy_features = self.feature_gen.convolution_feature_gen([[] for _ in range(7)])
        self.feature_dim = len(dummy_features) * 2  # State-action pairs

        # Load models and scalers
        self.model_alpha, self.scaler_alpha = self._load_model(config.model_path_alpha)
        self.model_bravo, self.scaler_bravo = self._load_model(config.model_path_bravo)

        # Game statistics
        self.reset_stats()

        print(f"Arena initialized! Using {self.device}")
        print(f"Feature dimension: {self.feature_dim}")
        print(f"Alpha model: {config.model_path_alpha}")
        print(f"Bravo model: {config.model_path_bravo}")

    def _load_model(self, model_path: str) -> Tuple[QNetwork, object]:
        """Load a model and its scaler"""
        model = QNetwork(input_dim=self.feature_dim).to(self.device)
        checkpoint = torch.load(model_path, map_location=self.device)
        model.load_state_dict(checkpoint['model_state_dict'])
        model.eval()
        scaler = checkpoint['scaler']
        return model, scaler

    def reset_stats(self):
        """Reset game statistics"""
        self.stats = {
            'alpha_wins': 0,
            'bravo_wins': 0,
            'draws': 0,
            'total_games': 0,
            'avg_game_length': 0,
            'game_results': []
        }

    def get_q_value(self, state_list: List[List[int]], move: int, player: int,
                   model: QNetwork, scaler) -> float:
        """Get Q-value for a state-action pair"""
        _, curr_feats = self.feature_gen.convolution_feature_gen(state_list)
        next_state = [col[:] for col in state_list]
        next_state[move].append(player)
        _, next_feats = self.feature_gen.convolution_feature_gen(next_state)

        features = np.concatenate([curr_feats, next_feats])
        scaled = scaler.transform([features])

        with torch.no_grad():
            return model(torch.FloatTensor(scaled).to(self.device)).item()

    def get_ai_move(self, state_list: List[List[int]], player: int, epsilon: float,
                   model: QNetwork, scaler) -> Tuple[int, float, Dict[int, float]]:
        """Select AI move using epsilon-greedy strategy"""
        valid = [c for c in range(7) if len(state_list[c]) < 6]
        q_values = {}

        # Calculate Q-values for all valid moves
        for col in valid:
            q = self.get_q_value(state_list, col, player, model, scaler) * player
            q_values[col] = q

        # Epsilon-greedy selection
        if np.random.random() < epsilon:
            selected_col = int(np.random.choice(valid))
        else:
            selected_col = max(q_values.keys(), key=lambda k: q_values[k])

        return selected_col, q_values[selected_col], q_values

    def check_win(self, state_list: List[List[int]]) -> int:
        """Check for win using convolution features. Returns 1, -1, or 0"""
        _, features = self.feature_gen.convolution_feature_gen(state_list)
        if 4 in features:
            return 1
        elif -4 in features:
            return -1
        return 0

    def play_single_game(self, game_num: int = 0) -> GameResult:
        """Play a single game between Alpha and Bravo"""
        board = BoardProcessor()
        moves = []

        # Determine who plays first and assign player values
        if self.config.alpha_plays_first:
            alpha_player, bravo_player = 1, -1
            current_is_alpha = True
        else:
            alpha_player, bravo_player = -1, 1
            current_is_alpha = False

        if self.config.verbose:
            starter = "Alpha" if current_is_alpha else "Bravo"
            print(f"\nGame {game_num + 1}: {starter} plays first")

        # Game loop
        while True:
            # Determine current player and model
            if current_is_alpha:
                player_value = alpha_player
                model, scaler = self.model_alpha, self.scaler_alpha
                epsilon = self.config.epsilon_alpha
                player_name = "Alpha"
            else:
                player_value = bravo_player
                model, scaler = self.model_bravo, self.scaler_bravo
                epsilon = self.config.epsilon_bravo
                player_name = "Bravo"

            # Get move
            col, q_val, q_values = self.get_ai_move(
                board.state_list, player_value, epsilon, model, scaler
            )

            moves.append(col)
            board.generate_state_list(moves)

            if self.config.verbose:
                print(f"{player_name} plays column {col} (Q={q_val:.3f})")

            # Check for game end
            winner = self.check_win(board.state_list)
            if winner != 0:
                # Convert winner to Alpha/Bravo perspective
                if winner == alpha_player:
                    result_winner = 1  # Alpha wins
                    winner_name = "Alpha"
                else:
                    result_winner = -1  # Bravo wins
                    winner_name = "Bravo"

                if self.config.verbose:
                    print(f"{winner_name} wins in {len(moves)} moves! Code: {board.moves_code()}")
                break

            if len(moves) >= 42:
                result_winner = 0
                if self.config.verbose:
                    print(f"Draw in {len(moves)} moves! Code: {board.moves_code()}")
                break

            # Switch players
            current_is_alpha = not current_is_alpha

        return GameResult(
            moves=moves,
            winner=result_winner,
            game_length=len(moves),
            game_code=board.moves_code(),
            alpha_first=self.config.alpha_plays_first,
            epsilon_alpha=self.config.epsilon_alpha,
            epsilon_bravo=self.config.epsilon_bravo
        )

    def run_tournament(self) -> Dict:
        """Run a tournament of multiple games"""
        print(f"\n=== Starting tournament: {self.config.num_games} games ===")
        print(f"Alpha ε={self.config.epsilon_alpha}, Bravo ε={self.config.epsilon_bravo}")

        start_time = time.time()

        for game_num in range(self.config.num_games):
            result = self.play_single_game(game_num)

            # Update statistics
            if result.winner == 1:
                self.stats['alpha_wins'] += 1
            elif result.winner == -1:
                self.stats['bravo_wins'] += 1
            else:
                self.stats['draws'] += 1

            self.stats['total_games'] += 1
            self.stats['game_results'].append(result)

            # Alternate who plays first (optional)
            if (game_num + 1) % 2 == 0:
                self.config.alpha_plays_first = not self.config.alpha_plays_first

        # Calculate final statistics
        total_length = sum(r.game_length for r in self.stats['game_results'])
        self.stats['avg_game_length'] = total_length / self.config.num_games
        elapsed = time.time() - start_time

        # Print summary
        self._print_tournament_summary(elapsed)

        # Find and display most common game
        self._display_most_common_game()

        # Save results if requested
        if self.config.save_games:
            self._save_results()

        return self.stats

    def _print_tournament_summary(self, elapsed_time: float):
        """Print tournament results"""
        print(f"\n=== Tournament Results ===")
        print(f"Games played: {self.stats['total_games']}")
        print(f"Alpha wins: {self.stats['alpha_wins']} ({self.stats['alpha_wins']/self.stats['total_games']*100:.1f}%)")
        print(f"Bravo wins: {self.stats['bravo_wins']} ({self.stats['bravo_wins']/self.stats['total_games']*100:.1f}%)")
        print(f"Draws: {self.stats['draws']} ({self.stats['draws']/self.stats['total_games']*100:.1f}%)")
        print(f"Average game length: {self.stats['avg_game_length']:.1f} moves")
        print(f"Time elapsed: {elapsed_time:.1f} seconds")
        print(f"Games per second: {self.stats['total_games']/elapsed_time:.1f}")

    def _display_most_common_game(self):
        """Find and display the most common game pattern"""
        if not self.stats['game_results']:
            print("No games to analyze!")
            return

        # Count game codes
        from collections import Counter
        game_codes = [result.game_code for result in self.stats['game_results']]
        code_counts = Counter(game_codes)

        if not code_counts:
            print("No game codes found!")
            return

        # Find most common
        most_common_code, count = code_counts.most_common(1)[0]

        print(f"\n=== MOST COMMON GAME PATTERN ===")
        print(f"Game code: {most_common_code}")
        print(f"Occurred {count} times out of {len(game_codes)} games ({count/len(game_codes)*100:.1f}%)")

        # Recreate and display the game
        try:
            board = BoardProcessor()
            moves = board.decode_moves_code(most_common_code)
            board.generate_state_list(moves)

            print(f"Move sequence: {moves}")
            print(f"Game length: {len(moves)} moves")
            print(f"Final board position:")
            board.display_board()

            # Check winner
            winner = self.check_win(board.state_list)
            if winner == 1:
                print("Winner: Player 1 (X)")
            elif winner == -1:
                print("Winner: Player -1 (O)")
            else:
                print("Result: Draw")

        except Exception as e:
            print(f"Error decoding game: {e}")

    def _save_results(self):
        """Save tournament results to file"""
        save_data = {
            'config': self.config,
            'stats': self.stats,
            'timestamp': time.time()
        }

        with open(self.config.game_save_path, 'wb') as f:
            pickle.dump(save_data, f)

        print(f"Results saved to {self.config.game_save_path}")

    def plot_results(self):
        """Plot tournament results"""
        if not self.stats['game_results']:
            print("No games played yet!")
            return

        fig, ((ax1, ax2), (ax3, ax4)) = plt.subplots(2, 2, figsize=(12, 8))

        # Win percentages pie chart
        labels = ['Alpha Wins', 'Bravo Wins', 'Draws']
        sizes = [self.stats['alpha_wins'], self.stats['bravo_wins'], self.stats['draws']]
        colors = ['lightblue', 'lightcoral', 'lightgray']
        ax1.pie(sizes, labels=labels, colors=colors, autopct='%1.1f%%', startangle=90)
        ax1.set_title('Win Distribution')

        # Game length histogram
        lengths = [r.game_length for r in self.stats['game_results']]
        ax2.hist(lengths, bins=20, alpha=0.7, color='skyblue', edgecolor='black')
        ax2.set_xlabel('Game Length (moves)')
        ax2.set_ylabel('Frequency')
        ax2.set_title('Game Length Distribution')
        ax2.axvline(np.mean(lengths), color='red', linestyle='--', label=f'Mean: {np.mean(lengths):.1f}')
        ax2.legend()

        # Running win rate
        alpha_wins = []
        running_alpha = 0
        for i, result in enumerate(self.stats['game_results']):
            if result.winner == 1:
                running_alpha += 1
            alpha_wins.append(running_alpha / (i + 1))

        ax3.plot(alpha_wins, label='Alpha Win Rate', color='blue')
        ax3.axhline(0.5, color='gray', linestyle='--', alpha=0.5)
        ax3.set_xlabel('Game Number')
        ax3.set_ylabel('Win Rate')
        ax3.set_title('Running Win Rate')
        ax3.legend()
        ax3.grid(True, alpha=0.3)

        # Win rate by starting player
        alpha_first_wins = sum(1 for r in self.stats['game_results']
                              if r.alpha_first and r.winner == 1)
        alpha_first_total = sum(1 for r in self.stats['game_results'] if r.alpha_first)

        bravo_first_wins = sum(1 for r in self.stats['game_results']
                              if not r.alpha_first and r.winner == -1)
        bravo_first_total = sum(1 for r in self.stats['game_results'] if not r.alpha_first)

        if alpha_first_total > 0 and bravo_first_total > 0:
            first_player_advantage = [
                alpha_first_wins / alpha_first_total,
                bravo_first_wins / bravo_first_total
            ]
            ax4.bar(['Alpha plays first', 'Bravo plays first'], first_player_advantage,
                   color=['lightblue', 'lightcoral'])
            ax4.set_ylabel('Win Rate when playing first')
            ax4.set_title('First Player Advantage')
            ax4.set_ylim(0, 1)
        else:
            ax4.text(0.5, 0.5, 'Not enough alternating games', ha='center', va='center')
            ax4.set_title('First Player Advantage (insufficient data)')

        plt.tight_layout()
        plt.show()

In [None]:

# Example configurations for different experiments

# Basic self-play with same model
config_basic = SelfPlayConfig(
    model_path_alpha="qnet_mc_pretrained.pth",
    model_path_bravo="qnet_mc_pretrained.pth",
    epsilon_alpha=0.1,
    epsilon_bravo=0.1,
    num_games=50,
    verbose=False
)

# Exploration vs Exploitation
config_explore_exploit = SelfPlayConfig(
    model_path_alpha="qnet_mc_pretrained.pth",
    model_path_bravo="qnet_mc_pretrained.pth",
    epsilon_alpha=0.3,  # High exploration
    epsilon_bravo=0.05, # Low exploration
    num_games=100,
    verbose=False
)

# Different models (when you have them)
config_different_models = SelfPlayConfig(
    model_path_alpha="qnet_mc_pretrained.pth",
    model_path_bravo="qnet_improved.pth",  # When you create an improved model
    epsilon_alpha=0.1,
    epsilon_bravo=0.1,
    num_games=200,
    verbose=False
)

In [None]:

if __name__ == "__main__":
    print("Self-Play Arena ready!")
    print("Example usage:")
    print("arena = SelfPlayArena(config_basic)")
    print("results = arena.run_tournament()")

    # Uncomment to run a basic tournament
    # arena = SelfPlayArena(config_basic)
    # results = arena.run_tournament()

In [None]:
# Quick test with 10 games
config_basic.num_games = 1000
config_basic.epsilon_alpha = 0.5
config_basic.epsilon_bravo = 0.5
arena = SelfPlayArena(config_basic)
results = arena.run_tournament()
arena.plot_results()

In [None]:
import numpy as np
import matplotlib.pyplot as plt
from tqdm import tqdm
import pickle
import time

def run_epsilon_sweep(arena, epsilon_values=[0.0, 0.05, 0.1, 0.2, 0.3, 0.5, 0.7, 1.0],
                      games_per_epsilon=50, opponent_epsilon=0.1):
    """
    Run epsilon sweep using existing SelfPlayArena

    Args:
        arena: Initialized SelfPlayArena instance
        epsilon_values: List of epsilon values to test
        games_per_epsilon: Number of games per epsilon value
        opponent_epsilon: Fixed epsilon for opponent

    Returns:
        Dictionary with sweep results
    """
    results = {}

    print(f"Starting epsilon sweep: {len(epsilon_values)} values, {games_per_epsilon} games each")
    print(f"Opponent epsilon fixed at {opponent_epsilon}")
    print("-" * 60)

    for eps in tqdm(epsilon_values, desc="Epsilon values"):
        # Configure arena for this epsilon
        arena.config.epsilon_alpha = eps
        arena.config.epsilon_bravo = opponent_epsilon
        arena.config.num_games = games_per_epsilon
        arena.config.verbose = False
        arena.config.save_games = False

        # Reset stats and run games
        arena.reset_stats()
        arena.run_tournament()

        # Store results
        results[eps] = {
            'epsilon': eps,
            'wins': arena.stats['alpha_wins'],
            'losses': arena.stats['bravo_wins'],
            'draws': arena.stats['draws'],
            'win_rate': arena.stats['alpha_wins'] / games_per_epsilon,
            'avg_game_length': arena.stats['avg_game_length'],
            'games': arena.stats['game_results']
        }

        # Calculate additional metrics
        win_lengths = [g.game_length for g in arena.stats['game_results'] if g.winner == 1]
        loss_lengths = [g.game_length for g in arena.stats['game_results'] if g.winner == -1]

        results[eps]['avg_win_length'] = np.mean(win_lengths) if win_lengths else 0
        results[eps]['avg_loss_length'] = np.mean(loss_lengths) if loss_lengths else 0

        print(f"ε={eps:.2f}: Win rate={results[eps]['win_rate']:.3f}, "
              f"Avg game={results[eps]['avg_game_length']:.1f} moves")

    return results


def plot_epsilon_sweep(results):
    """Create comprehensive visualization of epsilon sweep results"""

    epsilons = sorted(results.keys())
    metrics = {
        'win_rate': [results[e]['win_rate'] for e in epsilons],
        'draw_rate': [results[e]['draws']/len(results[e]['games']) for e in epsilons],
        'avg_length': [results[e]['avg_game_length'] for e in epsilons],
        'avg_win_length': [results[e]['avg_win_length'] for e in epsilons]
    }

    fig, axes = plt.subplots(2, 2, figsize=(12, 10))

    # Win rate vs epsilon
    ax = axes[0, 0]
    ax.plot(epsilons, metrics['win_rate'], 'b-o', linewidth=2, markersize=8)
    ax.fill_between(epsilons, metrics['win_rate'], alpha=0.3)
    ax.set_xlabel('Epsilon (ε)', fontsize=12)
    ax.set_ylabel('Win Rate', fontsize=12)
    ax.set_title('Win Rate vs Exploration Rate', fontsize=14, fontweight='bold')
    ax.grid(True, alpha=0.3)
    ax.set_ylim([0, 1])

    # Optimal epsilon marker
    best_eps = epsilons[np.argmax(metrics['win_rate'])]
    best_rate = max(metrics['win_rate'])
    ax.plot(best_eps, best_rate, 'r*', markersize=15, label=f'Best: ε={best_eps:.2f}')
    ax.legend()

    # Win/Loss/Draw distribution
    ax = axes[0, 1]
    width = 0.25
    x = np.arange(len(epsilons))
    wins = [results[e]['wins'] for e in epsilons]
    losses = [results[e]['losses'] for e in epsilons]
    draws = [results[e]['draws'] for e in epsilons]

    ax.bar(x - width, wins, width, label='Wins', color='green', alpha=0.7)
    ax.bar(x, losses, width, label='Losses', color='red', alpha=0.7)
    ax.bar(x + width, draws, width, label='Draws', color='gray', alpha=0.7)
    ax.set_xlabel('Epsilon (ε)', fontsize=12)
    ax.set_ylabel('Number of Games', fontsize=12)
    ax.set_title('Game Outcomes Distribution', fontsize=14, fontweight='bold')
    ax.set_xticks(x)
    ax.set_xticklabels([f'{e:.2f}' for e in epsilons], rotation=45)
    ax.legend()
    ax.grid(True, alpha=0.3, axis='y')

    # Average game length
    ax = axes[1, 0]
    ax.plot(epsilons, metrics['avg_length'], 'g-s', linewidth=2, markersize=8)
    ax.set_xlabel('Epsilon (ε)', fontsize=12)
    ax.set_ylabel('Average Game Length (moves)', fontsize=12)
    ax.set_title('Game Length vs Exploration Rate', fontsize=14, fontweight='bold')
    ax.grid(True, alpha=0.3)

    # Win rate heatmap with confidence
    ax = axes[1, 1]
    n_games = len(results[epsilons[0]]['games'])
    conf_intervals = []
    for e in epsilons:
        wr = results[e]['win_rate']
        # 95% confidence interval using normal approximation
        ci = 1.96 * np.sqrt(wr * (1 - wr) / n_games)
        conf_intervals.append(ci)

    ax.errorbar(epsilons, metrics['win_rate'], yerr=conf_intervals,
                fmt='o-', capsize=5, capthick=2, linewidth=2)
    ax.fill_between(epsilons,
                     [m - c for m, c in zip(metrics['win_rate'], conf_intervals)],
                     [m + c for m, c in zip(metrics['win_rate'], conf_intervals)],
                     alpha=0.2, label='95% CI')
    ax.set_xlabel('Epsilon (ε)', fontsize=12)
    ax.set_ylabel('Win Rate', fontsize=12)
    ax.set_title('Win Rate with Confidence Intervals', fontsize=14, fontweight='bold')
    ax.grid(True, alpha=0.3)
    ax.legend()

    plt.suptitle(f'Epsilon Sweep Analysis ({n_games} games per ε)',
                 fontsize=16, fontweight='bold', y=1.02)
    plt.tight_layout()
    plt.show()

    # Print summary
    print("\n" + "="*60)
    print("EPSILON SWEEP SUMMARY")
    print("="*60)
    print(f"Best epsilon: {best_eps:.2f} (win rate: {best_rate:.3f})")
    print(f"Pure exploitation (ε=0.0): {results[0.0]['win_rate']:.3f}")
    print(f"Pure exploration (ε=1.0): {results[1.0]['win_rate']:.3f}")

    # Find sweet spot range
    threshold = best_rate - 0.05
    good_epsilons = [e for e in epsilons if results[e]['win_rate'] >= threshold]
    print(f"Good epsilon range (within 5% of best): [{min(good_epsilons):.2f}, {max(good_epsilons):.2f}]")

    return best_eps


def save_sweep_results(results, filename='epsilon_sweep_results.pkl'):
    """Save sweep results to file"""
    save_data = {
        'results': results,
        'timestamp': time.time(),
        'analysis': {
            'best_epsilon': max(results.keys(), key=lambda e: results[e]['win_rate']),
            'best_win_rate': max(r['win_rate'] for r in results.values())
        }
    }
    with open(filename, 'wb') as f:
        pickle.dump(save_data, f)
    print(f"\nResults saved to {filename}")


# Usage example:
if __name__ == "__main__":
    print("Epsilon Sweep Analysis")
    print("Usage:")
    print("  from SPlay import SelfPlayArena, SelfPlayConfig")
    print("  config = SelfPlayConfig()")
    print("  arena = SelfPlayArena(config)")
    print("  results = run_epsilon_sweep(arena)")
    print("  plot_epsilon_sweep(results)")
    print("  save_sweep_results(results)")