$$\text{Minesweeper Environment}$$

Required modules: NumPy

In [None]:
import numpy as np

# The MinesweeperEnv Class allows the user to instantiate a game board with the parameters [rows, columns, mines]. You can reset the game board using the method game.reset(), 
# or play in the existing game with game.step(move) where move is an integer, representing the flattened index of the cell you want to reveal.
# This version is configured for play by algorithms that interact directly with the game board, so the visual representations are not ideal for humans.

# Major features: flood filling, so if you reveal a square with adjacent "0s" (no neighboring mines), those will all be revealed along with their neighbors. 
# Mask: self.board is the completed/full board, but the player/agent interacts with self.mask, which shows only the revealed squares. The initial state is a sparse
# matrix where -1 implies mine. _neighbors adds adjacent mine sums to each 0 square to obtain a familiar minesweeper board look.
# _obs converts back into a tensor of {0,1} matrices. Matrix N has {1 if cell == N, 0 otherwise}, giving a tensor representation of the board after _neighbor_counts applies _neighbors
# all squares.
# Finally, _place_mines_after_first_click ensures the user's first click is safe, a feature from standard Minesweeper. Nice since it symmetrically helps all model success rates, so 
# it is easier to compare.

class MinesweeperEnv:
    """
    Grid: (r x c) with m mines.
    Observation: (H, W, 9) one-hot of VISIBLE counts 0..8; covered cells are all zeros.
    Actions: flat index [0..H*W-1] = click cell (i,j). Invalid to click already revealed.
    Rewards: +0.1 safe, +1 win, -1 lose. No extra reward for flood size.
    """
    def __init__(self, rows=9, cols=9, mines=10, seed=None):
        self.r, self.c, self.m = rows, cols, mines
        self.rng = np.random.default_rng(seed)
        self.board = np.zeros((rows, cols), dtype=np.int8)   # -1 = mine, 0 = empty
        self.mask  = np.ones((rows, cols), dtype=np.int8)    # 1 = covered, 0 = revealed
        self._generated = False
        self.score = 0
        self.explosion = False

    # ---------- helpers ----------
    def _idx(self, a): return divmod(a, self.c)
    def _flat(self, i, j): return i*self.c + j

    def _neighbors(self, i, j):
        for di, dj in ((1,0),(0,1),(1,1),(-1,0),(0,-1),(-1,-1),(1,-1),(-1,1)):
            x, y = i+di, j+dj
            if 0 <= x < self.r and 0 <= y < self.c:
                yield x, y

    def _neighbor_counts(self):
        mines = (self.board == -1).astype(np.int8)
        p = np.pad(mines, 1)
        return (
            p[0:-2,0:-2] + p[0:-2,1:-1] + p[0:-2,2:] +
            p[1:-1,0:-2]                + p[1:-1,2:] +
            p[2:  ,0:-2] + p[2:  ,1:-1] + p[2:  ,2:]
        )

    def _place_mines_after_first_click(self, safe_i, safe_j):
        n = self.r * self.c
        exclude = {safe_i*self.c + safe_j}
        assert self.m < n - 1
        candidates = np.fromiter((k for k in range(n) if k not in exclude), dtype=np.int32)
        mine_idx = self.rng.choice(candidates, self.m, replace=False)
        self.board[:] = 0
        self.board.flat[mine_idx] = -1
        self._generated = True

    def _flood_reveal(self, i, j):
        stack = [(i, j)]
        while stack:
            x, y = stack.pop()
            if self.mask[x, y] == 0 or self.board[x, y] == -1:
                continue
            self.mask[x, y] = 0  # reveal current
            if self._counts[x, y] == 0:
                # reveal all neighbors; push only zeros
                for nx, ny in self._neighbors(x, y):
                    if self.mask[nx, ny] == 1 and self.board[nx, ny] != -1:
                        self.mask[nx, ny] = 0
                        if self._counts[nx, ny] == 0:
                            stack.append((nx, ny))


    @property
    def action_space_n(self):  # number of actions
        return self.r * self.c

    def legal_action_mask(self):
        """True for legal (covered) cells; False for revealed cells."""
        return (self.mask == 1).reshape(-1)

    def _make_obs(self):
        """
        Return (H, W, 9) one-hot of VISIBLE counts 0..8.
        Covered cells contribute zeros across all 9 channels.
        """
        covered = (self.mask == 1)
        revealed = ~covered
        planes = []
        for k in range(9):  # 0..8
            planes.append(((self._counts == k) & revealed).astype(np.float32))
        obs = np.stack(planes, axis=-1)      # (H,W,9)
        return obs

    def reset(self):
        self.board.fill(0)
        self.mask.fill(1)
        self._generated = False
        self.score = 0
        self.explosion = False
        self._counts = np.zeros_like(self.board, dtype=np.int8)
        return self._make_obs()

    def step(self, action):
        """
        action: flat index 0..(H*W-1)
        returns: next_obs, reward, done, info
        """
        i, j = self._idx(action)

        if not self._generated:
            self._place_mines_after_first_click(i, j)
            self._counts = self._neighbor_counts()

        if self.mask[i, j] == 0:
            return self._make_obs(), -0.05, False, {"illegal": True}

        if self.board[i, j] == -1:
            self.mask[i, j] = 0
            self.explosion = True
            return self._make_obs(), -1.0, True, {"result": "lose"}

        self._flood_reveal(i, j)
        self.score = int((self.mask == 0).sum())

        if np.all(self.mask[self.board != -1] == 0):
            return self._make_obs(), +1.0, True, {"result": "win"}

        return self._make_obs(), +0.1, False, {} 


$$\text{Helpers and Analytics}$$

Required modules: NumPy, Tensorflow
And, must import mixed_precision from tf.keras

In [None]:
import tensorflow as tf
from tensorflow.keras import mixed_precision

# Mixed precision allows the model to train faster than in the default 32-bit float.
# LossLogger is a minimal class that uses a tensorflow feature "Callback," that lets the script pull values from stages in model training. This implmenetation just
# pulls the NN's loss at every epoch in training.
# CSVLogger dumps other analytics into a csv file.


mixed_precision.set_global_policy("mixed_float16")
tf.config.optimizer.set_jit(True)  

class LossLogger(tf.keras.callbacks.Callback):
    def on_epoch_end(self, epoch, logs=None):
        logs = logs or {}
        print(f"[epoch {epoch+1}] loss={logs.get('loss'):.6f}")

loss_logger = LossLogger()
csv_logger  = tf.keras.callbacks.CSVLogger('train_log.csv', append=False)

# D4_map takes advantage of symmetries in a Minesweeper board. Each cell cares about its 8 neighbors -- value in the cell is invariant under any D4 
# mapping of the 8 neighbors. Also, a square Minesweeper board has the same symmetries, although most (e.g. single 90 degree rotation) do NOT apply to rectangular non-square boards.

def d4_map(x, y, w):
    k = tf.random.uniform([], 0, 4, dtype=tf.int32)
    x = tf.image.rot90(x, k)
    y = tf.image.rot90(y, k)
    w = tf.image.rot90(w, k)
    do_flip = tf.random.uniform([], 0, 1) < 0.5
    x = tf.cond(do_flip, lambda: tf.image.flip_left_right(x), lambda: x)
    y = tf.cond(do_flip, lambda: tf.image.flip_left_right(y), lambda: y)
    w = tf.cond(do_flip, lambda: tf.image.flip_left_right(w), lambda: w)
    return x, y, w

# Generates a new board with a given seed and makes a (free) first move if not otherwise specified. This gets called in training since the current loop
# gets unique training boards by generating from a list (i.e. 10000 boards in training could be "for board_idx in [0, 10000]").
    
def board_from_seed(env, seed, first_click=None):
    env.rng = np.random.default_rng(seed)
    _ = env.reset()
    if first_click is None:
        first_click = (env.r // 2, env.c // 2)
    a = first_click[0] * env.c + first_click[1]
    s, r, done, info = env.step(a)
    return s

# Gets neighbors from boolean inputs, so for example if check if mine == True, this function would compute the number of mines adjacent to a square.
def _nb_sum_uint8(mat_bool: np.ndarray) -> np.ndarray:
    p = np.pad(mat_bool.astype(np.uint8), 1, mode="constant")
    s = (
        p[0:-2, 0:-2] + p[0:-2, 1:-1] + p[0:-2, 2:] +
        p[1:-1, 0:-2] +                     p[1:-1, 2:] +
        p[2:  , 0:-2] + p[2:  , 1:-1] + p[2:  , 2:]
    )
    return s

# Quickly finds mines in the "frontier" (nonzero in the mask/player can see these) squares 
def _frontier_mask_fast(covered: np.ndarray, revealed: np.ndarray) -> np.ndarray:
    return covered & (_nb_sum_uint8(revealed) > 0)

# This function builds a dataset based on the environment.
# Take g = MinesweeperEnv(9, 9, 10, seed): 
#           - Generated boards have 9 rows, 9 columns, 10 mines
#           - Seeds are from a list of seeds via board_from_seed
# Generates user specified number of board states:
#           - A random generator picks non-mine squares in the frontier (feasible good moves for an agent) and reveals one
#           - For each game, get N of these random guaranteed safe moves. Each time, re-initialize the game tensor.
# Looping over this function gives a set of sets of tensors to train the model on.
# The outputs are X (the boardstate tensors), Y (labels for if squares are mines), M (the masks for the tensors)

def collect_safety_dataset(env, seeds, states_per_board=10, random_safe_steps=3):
    H, W = env.r, env.c
    C = 9
    total = len(seeds) * states_per_board
    X = np.empty((total, H, W, C), dtype=np.float32)
    Y = np.empty((total, H, W, 1), dtype=np.float32)
    M = np.empty((total, H, W, 1), dtype=np.float32)

    rng = np.random.default_rng(0)
    write_i = 0
    for sd in seeds:
        _ = board_from_seed(env, sd)
        for _k in range(states_per_board):
            obs = env._make_obs().astype(np.float32)

            covered  = (env.mask == 1)
            revealed = ~covered
            frontier = _frontier_mask_fast(covered, revealed)

            y = ((env.board != -1) & frontier).astype(np.float32)[..., None]
            m = frontier.astype(np.float32)[..., None]

            X[write_i] = obs
            Y[write_i] = y
            M[write_i] = m
            write_i += 1

            for _step in range(random_safe_steps):
                safe_mask = (env.mask == 1) & (env.board != -1)
                if not safe_mask.any():
                    break
                safe_flat = np.flatnonzero(safe_mask.ravel())
                a = int(safe_flat[rng.integers(len(safe_flat))])
                i, j = divmod(a, env.c)
                env.step(i * env.c + j)
        _ = board_from_seed(env, sd)
    return X, Y, M

# Last function combines the outputs from collect_safety_datasets into objects with tensorflow dataset type, then shuffles these. Finally, it applies the d4 mapping
# to quickly add symmetric boardstates to the dataset. Saves time by doing this instead of generating more boards, and uses tf to speed up further with AUTOTUNE,
# which lets tf decide how to optimize performance.

def make_dataset(X, Y, W, batch=32, shuffle=True):
    ds = tf.data.Dataset.from_tensor_slices((X, Y, W))
    if shuffle:
        ds = ds.shuffle(buffer_size=min(len(X), 10000), reshuffle_each_iteration=True)
    ds = ds.map(d4_map, num_parallel_calls=tf.data.AUTOTUNE)
    ds = ds.batch(batch, drop_remainder=False).prefetch(tf.data.AUTOTUNE)
    return ds
        

$$\text{Graph Neural Network Model}$$

Required modules: tensorflow
\
From tf.keras: layers, models

In [None]:
# Neural Network class; methods explained inline

import tensorflow as tf
from tensorflow.keras import layers, models

class NeuralNetwork:
    """
    GNN implementation using Keras, via different CNN layers.
    - Fully convolutional; supports variable kernel sizes and board sizes.
    - Residual message passing using DepthwiseConv2D(3x3) + Conv2D(1x1).
    """
    def __init__(self, input_channels: int = 9, hidden: int = 64, depth: int = 5):
    # Initializes with the number of input channels and depth. By default take 9 input channels since inputs are 9-tensors.
        self.c = input_channels
        self.hidden = hidden
        self.depth = depth
        self.model = None

    def build(self, h=None, w=None):
        """
        Builds a variable-size (None,None) model that outputs per-cell logits (H,W,1). Specifying h, w makes the model only take specific board sizes.
        """
    # Takes input tensors
        inp = layers.Input(shape=(h if h is not None else None,
                                  w if w is not None else None,
                                  self.c))
    # Nx1 Convolutional layer: dimensionality reduction, converting the NxMxC tensor into an NxM matrix. Idea: use the one-hot encoded tensor to reproduce
    # Minesweeper topography, which is a combination of the tensor layers.
        x = layers.Conv2D(self.hidden, 1, use_bias=False)(inp)
    # Normalization transforms Conv2D output to be close to 0 in mean, 1 in standard deviation.
        x = layers.BatchNormalization()(x)
    # Using ReLU as the activation function
        x = layers.ReLU()(x)
    # This ends the input layer part of the network -- after this the main loop iterates for each hidden layer.
        h = x
        for _ in range(self.depth):
    # 3x3 convolution acts at each hidden layer. The idea: 3x3 chunks of the inputs capture local per-cell dynamics with each cell and its neighbors.
    # Depthwise network reduces the number of operations -- uses the 3x3 for each channel, for each cell in the input.
    # By taking the 1x1 network, should collect the real graph nodes/embeddings, and then the 3x3 should aggregate the neighbors' features, to complete the Graph Convolutional Network.
            y = layers.DepthwiseConv2D(3, padding="same", use_bias=False)(h)
    # Another normalization/ReLU activation
            y = layers.BatchNormalization()(y)
            y = layers.ReLU()(y)
    # Additional Nx1 convolutional layer added to reproduce boardstate structure again.
            y = layers.Conv2D(self.hidden, 1, use_bias=False)(y)
            y = layers.BatchNormalization()(y)
            y = layers.ReLU()(y)
    # Takes the inputs and adds the residuals to pass them on
            h = layers.Add()([h, y])

    # 1x1 layer performs dimension reduction again to get an NxM set of logits as outputs. These logits are then converted into probabilities with Softmax.
        out = layers.Conv2D(1, 1, padding="same", activation=None)(h)
        self.model = models.Model(inp, out)
        return self.model

class WeightedBCEFromLogits(tf.keras.losses.Loss):
# Loss function is effectively tf's built in cross entropy with logits, which differs from cross entropy in having sigmoid built in to save computations.
# The difference is that there is an added weight that makes loss values for squares that aren't in the frontier (visible) high. Picking these ~= guessing,
# so the weight basically discourages guessing.
    def __init__(self, pos_weight=3.0):
        super().__init__(reduction=tf.keras.losses.Reduction.NONE)
        self.pos_weight = tf.cast(pos_weight, tf.float32)
    def call(self, y_true, y_pred):
        return tf.nn.weighted_cross_entropy_with_logits(
            labels=tf.cast(y_true, tf.float32),
            logits=tf.cast(y_pred, tf.float32),
            pos_weight=self.pos_weight
        )

def compile_gnn(model, pos_weight=3.0, lr=3e-4, use_xla=True):
# Uses the standard optimizer and loss function to compile the model.
    loss = WeightedBCEFromLogits(pos_weight)
    opt = tf.keras.optimizers.Adam(lr)
    model.compile(
        optimizer=opt,
        loss=loss,
        jit_compile=bool(use_xla),
        steps_per_execution=128,
        run_eagerly=False,
    )
    return model

def train_gnn(env, seeds, model, *,
                           epochs=35, batch=128, states_per_board=6, steps_per_state=4,
                           pos_weight=3.0, lr=3e-4, use_xla=True,
                           loss_logger=None, csv_logger=None):
    # Builds the datasets as before, then compiles the model.
    X, Y, W = collect_safety_dataset(env, seeds, states_per_board, steps_per_state)
    X = X.astype("float16"); Y = Y.astype("float16"); W = W.astype("float16")
    ds = make_dataset(X, Y, W, batch=batch, shuffle=True)
    model = compile_gnn(model, pos_weight=pos_weight, lr=lr, use_xla=use_xla)
    callbacks = []
    if loss_logger is not None: 
        callbacks.append(loss_logger)
    if csv_logger is not None: 
        callbacks.append(csv_logger)
    # After initializing the analytics, this fits the model to the data over specified number of epochs, and prints the loss/saves the csv.
    model.fit(ds, epochs=epochs, verbose=1, callbacks=callbacks)
    return model

# Uses sigmoid to convert model outputs (logits) to probabilities. These prepresent P[Cell != Mine] for each cell in the board.
def predict(model, X):
    logits = model(X, training=False)
    return tf.math.sigmoid(logits)

$$\text{Testing loop}$$

Required modules: NumPy

In [None]:
# This function uses the neural network outputs -- which are per-cell probabilities -- to predict where mines are.
# The algo pulls all probabilities of each cell not being a mine, then gets the highest, i.e. the safest cell, and digs it. Then, it 
# re-computes all of the logits and probabilities in the new board state. 
# Note: shoud try to be careful to avoid seeds the model trained on -- doesn't say anything about generalization and gives weird behavior.
def greedy_gnn_algo(env, model, seeds, max_moves=None, tta=0, verbose=False):
    wins = 0; H, W = env.r, env.c
    # Plays for each seed given
    for sd in seeds:
        _ = board_from_seed(env, sd); done = False; moves = 0
        while not done:
        # Gets board state
            s = env._make_obs()[None, ...].astype("float32")
        # NN predictions on board state
            p_tf = predict(model, s, tta=tta) 
        # Only considers non-guesses/adjacent squares. Downside: rarely the only available frontier educated guess might be worse than a pure guess.
            p = p_tf[0, ..., 0].numpy()           
            covered  = (env.mask == 1)
            revealed = ~covered
            frontier = _frontier_mask_fast(covered, revealed)
            cand = frontier & covered                        
            if not cand.any(): 
                cand = covered
            scores = np.where(cand, p, -1e9)              
        # Gets highest probability of nonmine cell
            i, j = np.unravel_index(np.argmax(scores), scores.shape)
            _, _, done, info = env.step(i * W + j); moves += 1
            if max_moves and moves >= max_moves: break
        if verbose: 
            print(info.get("result"))
        wins += 1 if info.get("result") == "win" else 0
    # Ouputs the proportion of wins to games played
    return wins / len(seeds)


$$\text{Run the script}$$

In [None]:
# Create the model; input channels should be 9, hidden and depth up to user

from calendar import c


nn = NeuralNetwork(input_channels=9, hidden=64, depth=5)

# Builds the model; h, w = None lets the model train on any board size. Setting these --> easier to separate trained models.
model = nn.build(h=None, w=None)
# Train on 10x10 boards: these make it easy to compute mine densities, and are relatively not many computations. Large boards 
# increase work, and make the loss surface take longer to traverse --> much longer training. Impact on winrate for models ~E[Guesses] 
# so larger boards on similar densities should monotonically decrease winrates for deterministic and stochastic models.
# To play with variable or different boardstates, would recommend adding other tuples to the list: (16, 16, [40]) is intermediate mode, for example.
# The inner list is the set of densities the model trains on.

train_schedules = [(10,10,[10,15,20,25])]
for (rows, cols, mine_list) in train_schedules:
    for mines in mine_list:
        env = MinesweeperEnv(rows=rows, cols=cols, mines=mines, seed=0)
        train_gnn(env, list(range(0,25000)), model,
                               epochs=10, batch=128,
                               states_per_board=7, steps_per_state=6,
                               pos_weight=3.0, lr=1e-3, use_xla=True,
                               loss_logger=loss_logger, csv_logger=csv_logger)


rang = np.arange(20, 60)
winrates = []
rows = 10
cols = 10

# Plays Minesweeper over a range of mine densities and returns winrates at each density
for m in rang:
    env = MinesweeperEnv(rows=rows, cols=cols, mines=m, seed=123)
    wr_gnn = greedy_gnn_algo(env, model, list(range(20000,20100)))
    winrates.append(wr_gnn)
    print(f"{rows}x{cols}, mines={m}: winrate={wr_gnn:.3f}")
