In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import pickle

from tqdm.notebook import tqdm

In [None]:
# Define the neural network model for Minesweeper
class MinesweeperCNN(nn.Module):
    def __init__(self):
        super(MinesweeperCNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.conv4 = nn.Conv2d(128, 64, kernel_size=3, padding=1)
        self.bn4 = nn.BatchNorm2d(64)
        self.dropout = nn.Dropout2d(0.2)
        self.output_conv = nn.Conv2d(64, 1, kernel_size=1)

    def forward(self, x):
        x = self.dropout(nn.functional.relu(self.bn1(self.conv1(x))))
        x = self.dropout(nn.functional.relu(self.bn2(self.conv2(x))))
        x = self.dropout(nn.functional.relu(self.bn3(self.conv3(x))))
        x = self.dropout(nn.functional.relu(self.bn4(self.conv4(x))))
        x = torch.sigmoid(self.output_conv(x))
        return x.squeeze(1)  # output shape: (batch_size, height, width)
    

In [None]:
class MinesweeperStepDataset(Dataset):
    def __init__(self, data_path):
        """
        Args:
          data_list: list of dicts, each dict with keys:
            - 'game_state': numpy array representing the current visible game state (model input)
            - 'step_allowed': numpy array (H,W) with 1 where safe hidden cells exist (target)
            - 'bombs': numpy array (H,W) with 1 where bombs exist (target)
            - 'revealed': numpy array (H,W) with 1 where cells are revealed (target)
        """
        with open(data_path, 'rb') as f:
            self.data = pickle.load(f)

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        
        sample = self.data[idx]

        # Model input: game state (can be a numpy array; convert to tensor)
        game_state = torch.FloatTensor(sample['observable_state']).unsqueeze(0)

        # Targets: stack the three channels into a tensor of shape (3,H,W)
        targets = torch.stack([
            torch.FloatTensor(sample['step_allowed']),
            torch.FloatTensor(sample['bombs']),
            torch.FloatTensor(sample['revealed'])
        ], dim=0)

        return game_state, targets

In [None]:
device = None

if device is None:
    device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

In [None]:
train_dataset_path = '../data_generation/data/train_begginner.pkl'
val_dataset_path = '../data_generation/data/test_begginner.pkl'

train_dataset = MinesweeperStepDataset(train_dataset_path)
val_dataset = MinesweeperStepDataset(val_dataset_path)

train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=32)

In [None]:
class MinesweeperPenaltyLoss(nn.Module):
    def __init__(self, bomb_penalty=10.0, revealed_penalty=1.0, safe_penalty=1.0):
        """
        bomb_penalty: weight for bombs component
        revealed_penalty: weight for revealed cell component
        safe_penalty: weight for step_allowed component
        """
        super().__init__()
        self.bomb_penalty = bomb_penalty
        self.revealed_penalty = revealed_penalty
        self.safe_penalty = safe_penalty
        self.bceloss = nn.BCELoss(reduction='mean')

    def forward(self, y_pred, y_target):
        """
        y_pred: model output, shape (batch, H, W), probabilities (0-1)
        y_target: tuple of (step_allowed, revealed, bombs), all (batch, H, W) tensors
        """
        step_allowed, bombs, revealed = y_target[:, 0], y_target[:, 1], y_target[:, 2]

        # BCE for safe (step_allowed)
        safe_loss = self.bceloss(y_pred, step_allowed)

        # BCE for revealed (want model to predict 0 for those, so target is zeros)
        revealed_loss = self.bceloss(y_pred, 1.0 - revealed)

        # BCE for bombs (want model to predict 0 for bombs, so target is zeros)
        bomb_loss = self.bceloss(y_pred, 1.0 - bombs)

        total_loss = (
            self.safe_penalty * safe_loss +
            self.revealed_penalty * revealed_loss +
            self.bomb_penalty * bomb_loss
        )
        return total_loss

In [None]:
batch = next(iter(train_loader))

print(batch[0].size())
print(batch[1].size())

In [None]:
model = MinesweeperCNN()
model.to(device)
model.eval()


X_random, y_random = next(iter(train_loader))
y_pred = model(X_random)

loss = MinesweeperPenaltyLoss(bomb_penalty=10.0, revealed_penalty=1.0, safe_penalty=1.0)
loss(y_pred, y_random)

In [None]:
# Define the neural network model for Minesweeper
class MinesweeperCNN(nn.Module):
    def __init__(self):
        super(MinesweeperCNN, self).__init__()
        self.conv1 = nn.Conv2d(1, 32, kernel_size=3, padding=1)
        self.bn1 = nn.BatchNorm2d(32)
        self.conv2 = nn.Conv2d(32, 64, kernel_size=3, padding=1)
        self.bn2 = nn.BatchNorm2d(64)
        self.conv3 = nn.Conv2d(64, 128, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm2d(128)
        self.conv4 = nn.Conv2d(128, 64, kernel_size=3, padding=1)
        self.bn4 = nn.BatchNorm2d(64)
        self.dropout = nn.Dropout2d(0.2)
        self.output_conv = nn.Conv2d(64, 1, kernel_size=1)

    def forward(self, x):
        x = self.dropout(nn.functional.relu(self.bn1(self.conv1(x))))
        x = self.dropout(nn.functional.relu(self.bn2(self.conv2(x))))
        # x = self.dropout(nn.functional.relu(self.bn3(self.conv3(x))))
        # x = self.dropout(nn.functional.relu(self.bn4(self.conv4(x))))
        x = torch.sigmoid(self.output_conv(x))
        return x.squeeze(1)  # output shape: (batch_size, height, width)
    

In [None]:
train_dataset_path = '../data_generation/data/train_begginner.pkl'
val_dataset_path = '../data_generation/data/test_begginner.pkl'

train_dataset = MinesweeperStepDataset(train_dataset_path)
val_dataset = MinesweeperStepDataset(val_dataset_path)

batch_size = 32
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
val_loader = DataLoader(val_dataset, batch_size=batch_size)

In [None]:
import torch
from tqdm import tqdm
import matplotlib.pyplot as plt
import IPython.display as display

model = MinesweeperCNN()
loss_fn = MinesweeperPenaltyLoss(bomb_penalty=10.0, revealed_penalty=1.0, safe_penalty=1.0)
optimizer = torch.optim.Adam(model.parameters(), lr=1e-3)

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = model.to(device)
loss_fn = loss_fn.to(device)

num_epochs = 100

train_losses = []
val_losses = []

plt.ion()  # Enable interactive mode for live plotting

for epoch in range(num_epochs):
    model.train()
    batch_train_losses = []

    for X, y in tqdm(train_loader):
        X = X.to(device)
        y = y.to(device)
        optimizer.zero_grad()
        y_pred = model(X)
        loss = loss_fn(y_pred, y)
        loss.backward()
        optimizer.step()

        batch_train_losses.append(loss.item())
        train_losses.append(loss.item())

        # Live plot update every 50 batches
        if len(train_losses) % 200 == 0:
            model.eval()
            val_loss = 0.0
            with torch.no_grad():
                for X_val, y_val in val_loader:
                    X_val = X_val.to(device)
                    y_val = y_val.to(device)
                    y_val_pred = model(X_val)
                    val_loss += loss_fn(y_val_pred, y_val).item()
            val_loss /= len(val_loader)
            val_losses.append(val_loss)

            display.clear_output(wait=True)
            plt.figure(figsize=(12,6))
            plt.plot(train_losses, label='Train Loss per Batch')
            val_x = [i*200 for i in range(len(val_losses))]
            plt.plot(val_x, val_losses, label='Validation Loss per Epoch (every 50 batches)')
            plt.xlabel('Batch Number')
            plt.ylabel('Loss')
            plt.title(f'Training and Validation Loss - Epoch {epoch+1}')
            plt.legend()
            plt.grid(True)
            plt.show()
            model.train()

    # Epoch-level summary print
    epoch_train_loss = sum(batch_train_losses) / len(batch_train_losses)
    if val_losses:
        print(f"Epoch {epoch+1}/{num_epochs}: Train Loss: {epoch_train_loss:.4f} | Val Loss: {val_losses[-1]:.4f}")
    else:
        print(f"Epoch {epoch+1}/{num_epochs}: Train Loss: {epoch_train_loss:.4f}")

plt.ioff()  # Disable interactive mode after training

# Save model after training
torch.save(model.state_dict(), 'minesweeper_cnn.pth')


In [None]:
model.eval()

X_random, y_random = next(iter(train_loader))
y_pred = model(X_random)

loss = MinesweeperPenaltyLoss(bomb_penalty=10.0, revealed_penalty=1.0, safe_penalty=1.0)
loss(y_pred, y_random)

In [None]:
X_random[1]

In [None]:
y_random[1]

In [None]:
y_pred[1]

In [None]:
import os

os.chdir('..')
from minesweeper.game import MinesweeperGame, GameState, CellState
os.chdir('evaluating')

class GamePlayer:
    def __init__(self, model, game, max_steps=100):
        """
        Args:
          model: object implementing predict_move(board_array, game) -> (row, col) or None
          game: MinesweeperGame instance to be played
          max_steps: maximum moves to play to avoid infinite loops
        """
        self.model = model
        self.game = game
        self.max_steps = max_steps
        self.steps = []

    def play(self, verbose=False, stop_after_invalid_move=False):
        step_count = 0
        while self.game.state == GameState.IN_PROGRESS and step_count < self.max_steps:
            board_array = self.game.board.to_numpy_bombs_safe()
            move = self.model.predict_move(board_array)
            if move is None:
                if verbose:
                    print("Model did not provide a move. Stopping.")
                break
            r, c = move
            if self.game.board.grid[r][c].state != CellState.HIDDEN:
                if verbose:
                    print(f"Model selected invalid cell ({r},{c}) which is not hidden.")
                if stop_after_invalid_move:
                    break
            state = self.game.play_move(r, c)
            self.steps.append({'move': (r, c), 'result': state})
            step_count += 1
            if state in [GameState.WON, GameState.LOST]:
                break
        return self.game.state, self.steps
    
class Evaluator:
    def __init__(self, model, max_steps=100):
        """
        Args:
          model: object with method predict_move(board_array, game) -> (row, col) or None
          max_steps: maximum moves allowed per game
        """
        self.model = model
        self.max_steps = max_steps
        self.results = []

    def evaluate(self, games):
        """
        Runs the model-controlled play on each game and collects statistics.

        Args:
          games: list of initialized MinesweeperGame instances

        Returns:
          dict with 'win_rate', 'avg_steps', and 'games_results'
        """
        wins = 0
        total_steps = 0
        self.results = []

        for orig_game in games:
            game = MinesweeperGame(orig_game.board.rows, orig_game.board.cols, orig_game.board.num_mines)
            player = GamePlayer(self.model, game, self.max_steps)
            final_state, steps = player.play()

            if final_state == GameState.WON:
                wins += 1
            total_steps += len(steps)

            self.results.append({
                'final_state': final_state,
                'steps_count': len(steps),
                'moves': steps
            })

        num_games = len(games)
        return {
            'win_rate': wins / num_games,
            'avg_steps': total_steps / num_games,
            'games_results': self.results
        }

In [None]:
class CNNSolver:
    def __init__(self, model):
        self.model = model

    def predict_move(self, board_array):
        
        """
        Picks a random cell coordinate blindly without explicit knowledge which are hidden.
        Verifies cell state internally.

        Args:
          board_array: numpy array (not used here)
          game: MinesweeperGame instance for validation

        Returns:
          (row, col) for next move or None if no hidden cells found in max_attempts
        """
        board_array = torch.Tensor(board_array).unsqueeze(0).unsqueeze(0)
        prob = self.model(board_array)
        # print(prob)

        max_idx_flat = torch.argmax(prob)

        # Convert flat index to 2D indices (row, col)
        num_cols = prob.size(1)
        row = int(max_idx_flat // num_cols)
        col = int(max_idx_flat % num_cols)
        

        return (row, col)

In [None]:
game = MinesweeperGame(9, 9, 20)
solver_model = CNNSolver(model)
player = GamePlayer(solver_model, game, max_steps=10)
final_state, moves_made = player.play(verbose=True, stop_after_invalid_move=False)

print("Game ended with state:", final_state)
print("Moves made:")
for step in moves_made:
    print(step)

In [None]:
solver_model = CNNSolver(model)

evaluator = Evaluator(solver_model)
games = [MinesweeperGame(9, 9, 10) for _ in range(100)]
results = evaluator.evaluate(games, )

print("Win Rate:", results['win_rate'])
print("Average Steps:", results['avg_steps'])