# Notebook 04 — Task 3 (Thinking Deeper)

In this notebook, I implement **Task 3** for **all three difficulties**:

- Easy (22×22, 50 mines)
- Medium (22×22, 80 mines)
- Hard (22×22, 100 mines)

My core idea is to treat “thinking longer” as **running the same reasoning block more times** (a sequential computation / unrolling). Concretely, I reuse a shared Transformer-style block, and I let the model take more “internal steps” before it outputs a mine-probability map.

What I’m trying to prove (per the PDF):
- As I increase thinking steps, **the prediction loss goes down**.
- As I increase thinking steps, **the gameplay performance goes up**.
- The predicted mine heatmap changes in an interpretable way as I let the model think longer.

Assumption: I already unzipped my project so the repo lives at `/content/repo/`.


In [None]:
# I install the small deps I need for this notebook.
# I avoid re-installing torch in Colab because checkpoint loading can get flaky
# if the runtime's torch version changes mid-session.
%pip install -q numpy tqdm matplotlib


In [None]:
# I locate my repo root so this notebook can import my code.
import sys
from pathlib import Path

repo_root = Path('/content/repo')
if not ((repo_root / 'minesweeper').exists() and (repo_root / 'models').exists()):
    kids = [p for p in repo_root.iterdir() if p.is_dir()]
    if len(kids) == 1:
        repo_root = kids[0]

if not ((repo_root / 'minesweeper').exists() and (repo_root / 'models').exists()):
    raise FileNotFoundError(f'Bad repo_root: {repo_root}')

sys.path.insert(0, str(repo_root))
print('Repo root:', repo_root)

import torch

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('CUDA available:', torch.cuda.is_available())
if torch.cuda.is_available():
    print('GPU:', torch.cuda.get_device_name(0))


In [None]:
# Step 1 — Load the Task 1 datasets (I reuse them for Task 3)
import json
from pathlib import Path

from models.dataset_cache import dataset_dir_for_task
from models.task1 import load_task1_npz

# I reuse the Task 1 `.npz`s because Task 3 uses the same dataset format.
# I do this to avoid regenerating more data and to keep everything consistent.
DATA_DIR = dataset_dir_for_task(repo_root=repo_root, task='task1')

DIFFICULTIES = {
    'easy': {'height': 22, 'width': 22, 'num_mines': 50},
    'medium': {'height': 22, 'width': 22, 'num_mines': 80},
    'hard': {'height': 22, 'width': 22, 'num_mines': 100},
}

datasets = {}
for name in DIFFICULTIES.keys():
    out_npz = DATA_DIR / f'task1_{name}_teacher_logic.npz'
    if not out_npz.exists():
        raise FileNotFoundError(
            f"Missing dataset: {out_npz}\n"
            "Run Notebook 02 first to generate the Task 1 datasets."
        )

    npz = load_task1_npz(out_npz)
    meta = json.loads(npz.meta_json)
    print(f"{name}: samples={npz.x_visible.shape[0]} games={meta.get('num_games')}")
    datasets[name] = npz


In [None]:
# Step 2 — Train the "thinking" mine predictor
from dataclasses import asdict

import numpy as np
import torch
from torch.utils.data import DataLoader, Subset

from models.metrics import (
    masked_bce_with_logits,
    masked_binary_confusion_from_logits,
    binary_metrics_from_confusion,
    pos_weight_from_targets,
)
from models.task1.dataset import Task1Dataset
from models.task3.model import ThinkingMinePredictor, ThinkingMinePredictorConfig


def _add_conf(dst: dict, src: dict) -> None:
    for k in ('tp', 'fp', 'tn', 'fn', 'n'):
        dst[k] = int(dst.get(k, 0) or 0) + int(src.get(k, 0) or 0)


def train_task3(
    *,
    npz,
    cfg: ThinkingMinePredictorConfig,
    run_name: str = "",
    steps_train: int = 4,
    epochs: int = 15,
    batch_size: int = 64,
    lr: float = 3e-4,
    weight_decay: float = 1e-2,
    val_frac: float = 0.1,
    seed: int = 0,
    threshold: float = 0.5,
    use_pos_weight: bool = True,
    early_stop_patience: int = 4,
    early_stop_min_delta: float = 1e-4,
):
    # On A100/H100 (Ampere+), TF32 can speed up matmuls/conv a lot.
    # PyTorch is deprecating the old allow_tf32 flags, so I use the new API with a fallback.
    if torch.cuda.is_available():
        try:
            torch.backends.cuda.matmul.fp32_precision = 'tf32'
        except Exception:
            torch.backends.cuda.matmul.allow_tf32 = True
        try:
            torch.backends.cudnn.conv.fp32_precision = 'tf32'
        except Exception:
            torch.backends.cudnn.allow_tf32 = True

    ds = Task1Dataset(npz)

    tag = (f"[{str(run_name)}] " if str(run_name) else "")
    print(f"{tag}train_task3: samples={len(ds)} steps_train={int(steps_train)} epochs={int(epochs)} batch_size={int(batch_size)} seed={int(seed)}")

    g = torch.Generator().manual_seed(int(seed))
    perm = torch.randperm(len(ds), generator=g)
    n_val = int(len(ds) * float(val_frac))
    val_idx = perm[:n_val].tolist()
    train_idx = perm[n_val:].tolist()

    use_cuda = (device.type == 'cuda')

    # On A100, I can usually push batch_size a bit higher, but Task 3 is heavier than Task 1.
    # I bump it modestly to speed things up without risking OOM.
    bs = int(batch_size)
    if use_cuda:
        try:
            if 'A100' in torch.cuda.get_device_name(0):
                bs = max(bs, 96)
        except Exception:
            pass

    # In Colab, DataLoader workers occasionally hang on me, so I keep `num_workers` small.
    # If I want more throughput later, this is the first knob I try.
    num_workers = 2 if use_cuda else 0
    dl_common = dict(num_workers=int(num_workers), pin_memory=bool(use_cuda))
    if int(num_workers) > 0:
        dl_common['persistent_workers'] = True

    train_loader = DataLoader(Subset(ds, train_idx), batch_size=int(bs), shuffle=True, **dl_common)
    val_loader = DataLoader(Subset(ds, val_idx), batch_size=int(bs), shuffle=False, **dl_common)

    model = ThinkingMinePredictor(cfg).to(device)

    # I intentionally skip `torch.compile` here: on Colab I kept running into Dynamo/FakeTensor crashes
    # (`DataDependentOutputException`). I'd rather have a boring run that finishes.

    opt = torch.optim.AdamW(model.parameters(), lr=float(lr), weight_decay=float(weight_decay))

    use_amp = bool(use_cuda)
    scaler = (torch.amp.GradScaler('cuda') if use_amp else None)

    best_f1 = -1.0
    best_state = None
    patience = 0

    for epoch in range(1, int(epochs) + 1):
        model.train()
        tr = 0.0
        tr_n = 0
        tr_conf = {'tp': 0, 'fp': 0, 'tn': 0, 'fn': 0, 'n': 0}
        for batch in train_loader:
            x = batch['x'].to(device, non_blocking=use_cuda)
            y = batch['y'].to(device, non_blocking=use_cuda)
            m = batch['mask'].to(device, non_blocking=use_cuda)

            opt.zero_grad(set_to_none=True)
            with torch.amp.autocast(device_type=device.type, enabled=use_amp):
                logits, per_step = model(x, steps=int(steps_train), return_all=True)

                # Deep supervision: I compute loss at every internal think-step, and I weight later steps more.
                # I normalize the weights so the total loss scale doesn't change with steps_train.
                pw = pos_weight_from_targets(y, m) if bool(use_pos_weight) else None
                denom = float(sum(range(1, int(len(per_step)) + 1)))
                loss = torch.zeros((), device=device)
                for i, li in enumerate(per_step, start=1):
                    w = float(i) / denom
                    loss = loss + w * masked_bce_with_logits(li, y, m, pos_weight=pw)

            if scaler is not None:
                scaler.scale(loss).backward()
                scaler.step(opt)
                scaler.update()
            else:
                loss.backward()
                opt.step()

            tr += float(loss.item())
            tr_n += 1
            _add_conf(tr_conf, masked_binary_confusion_from_logits(logits.detach(), y, m, threshold=float(threshold)))

        model.eval()
        va = 0.0
        va_n = 0
        va_conf = {'tp': 0, 'fp': 0, 'tn': 0, 'fn': 0, 'n': 0}
        with torch.no_grad():
            for batch in val_loader:
                x = batch['x'].to(device, non_blocking=use_cuda)
                y = batch['y'].to(device, non_blocking=use_cuda)
                m = batch['mask'].to(device, non_blocking=use_cuda)
                with torch.amp.autocast(device_type=device.type, enabled=use_amp):
                    logits = model(x, steps=int(steps_train))
                    pw = pos_weight_from_targets(y, m) if bool(use_pos_weight) else None
                    vloss = masked_bce_with_logits(logits, y, m, pos_weight=pw)

                va += float(vloss.item())
                va_n += 1
                _add_conf(va_conf, masked_binary_confusion_from_logits(logits, y, m, threshold=float(threshold)))

        tr_m = binary_metrics_from_confusion(tr_conf['tp'], tr_conf['fp'], tr_conf['tn'], tr_conf['fn'])
        va_m = binary_metrics_from_confusion(va_conf['tp'], va_conf['fp'], va_conf['tn'], va_conf['fn'])

        print(
            f"{tag}epoch {epoch}/{epochs} | "
            f"train loss {tr/max(1,tr_n):.4f} acc {tr_m['acc']:.3f} prec {tr_m['precision']:.3f} rec {tr_m['recall']:.3f} f1 {tr_m['f1']:.3f} | "
            f"val loss {va/max(1,va_n):.4f} acc {va_m['acc']:.3f} prec {va_m['precision']:.3f} rec {va_m['recall']:.3f} f1 {va_m['f1']:.3f}"
        )

        cur_f1 = float(va_m.get('f1', 0.0) or 0.0)
        if cur_f1 > (best_f1 + float(early_stop_min_delta)):
            best_f1 = cur_f1
            best_state = {k: v.detach().clone() for k, v in model.state_dict().items()}
            patience = 0
        else:
            patience += 1
            if patience >= int(early_stop_patience):
                print(f'early stop: no val f1 improvement for {early_stop_patience} epoch(s). best_f1={best_f1:.4f}')
                break

    if best_state is not None:
        model.load_state_dict(best_state)

    return model


# I train + save one model per difficulty.
CKPT_DIR = Path(repo_root) / 'models' / 'task3' / 'checkpoints'
CKPT_DIR.mkdir(parents=True, exist_ok=True)

# I append a version tag to my Task 3 checkpoints the same way I did for Task 2 runs.
# I keep the tag short so I can store multiple experiments side-by-side.
CKPT_TAG = 'v2_longer_train_ci'  # longer training + thinking-time eval + CIs + figure export

# I keep an explicit overwrite flag so reruns don't silently destroy checkpoints.
OVERWRITE = False

# In Colab, TorchDynamo/FakeTensor can crash on me even when I don't need compilation.
# I hard-disable Dynamo so training runs eagerly and reliably.
try:
    import torch._dynamo as _dynamo

    _dynamo.config.disable = True
    _dynamo.config.suppress_errors = True
except Exception:
    pass

EPOCHS_BY_DIFF = {
    # Easy is already strong, but in my run the val metrics were still improving at epoch 15, so I let it go longer.
    'easy': 25,
    # Medium/hard usually need more optimization steps to converge.
    'medium': 35,
    'hard': 40,
}

TRAINING_CFG = dict(
    # I set epochs per difficulty via EPOCHS_BY_DIFF.
    batch_size=64,
    lr=3e-4,
    weight_decay=1e-2,
    val_frac=0.1,
    seed=0,
    early_stop_patience=6,
)

trained = {}
for name, diff in DIFFICULTIES.items():
    ckpt_path = CKPT_DIR / f'task3_{name}_{CKPT_TAG}.pt'

    if (not bool(OVERWRITE)) and ckpt_path.exists():
        print(f"Skipping {name}: checkpoint already exists -> {ckpt_path}")
        trained[name] = str(ckpt_path)
        continue

    npz = datasets[name]
    cfg = ThinkingMinePredictorConfig(height=int(diff['height']), width=int(diff['width']), default_steps=4)
    print(f"\nTraining Task 3 model for {name}...")
    epochs_here = int(EPOCHS_BY_DIFF.get(str(name), 25))
    model = train_task3(npz=npz, cfg=cfg, run_name=str(name), steps_train=4, epochs=epochs_here, **TRAINING_CFG)

    meta = json.loads(npz.meta_json)

    torch.save(
        {
            'task': 'task3_thinking_deeper',
            'difficulty': name,
            'difficulty_cfg': diff,
            'checkpoint_tag': str(CKPT_TAG),
            'dataset_meta': meta,
            'model_cfg': asdict(cfg),
            'state_dict': model.state_dict(),
        },
        ckpt_path,
    )

    # I also save a convenience copy for the GUI (unversioned name).
    # If I want to keep multiple versions side-by-side, I load the tagged file.
    try:
        torch.save(
            {
                'task': 'task3_thinking_deeper',
                'difficulty': name,
                'difficulty_cfg': diff,
                'checkpoint_tag': str(CKPT_TAG),
                'dataset_meta': meta,
                'model_cfg': asdict(cfg),
                'state_dict': model.state_dict(),
            },
            CKPT_DIR / f'task3_{name}.pt',
        )
    except Exception:
        pass

    print(f"Saved {name} -> {ckpt_path}")
    trained[name] = str(ckpt_path)

trained


In [None]:
# Step 3 — I show that more thinking helps (loss vs steps)
import matplotlib.pyplot as plt
from pathlib import Path

import torch
from torch.utils.data import DataLoader, Subset

from models.metrics import masked_bce_with_logits
from models.task1.dataset import Task1Dataset
from models.task3.model import ThinkingMinePredictor, ThinkingMinePredictorConfig

EVAL_NAME = 'medium'  # 'easy' | 'medium' | 'hard'
EVAL_CFG = DIFFICULTIES[EVAL_NAME]
EVAL_NPZ = datasets[EVAL_NAME]

ckpt_path = Path(repo_root) / 'models' / 'task3' / 'checkpoints' / f'task3_{EVAL_NAME}_{CKPT_TAG}.pt'
if not ckpt_path.exists():
    raise FileNotFoundError(
        f"Missing checkpoint: {ckpt_path}\n"
        "Run Step 2 first to train the v2 checkpoints (or change CKPT_TAG)."
    )

ckpt = torch.load(ckpt_path, map_location=device)
mcfg = ckpt.get('model_cfg') or {'height': int(EVAL_CFG['height']), 'width': int(EVAL_CFG['width']), 'default_steps': 4}
model = ThinkingMinePredictor(ThinkingMinePredictorConfig(**mcfg)).to(device)
model.load_state_dict(ckpt['state_dict'])
model.eval()

# I measure validation loss at different thinking depths.
# I treat this as a quick sanity check (not a careful statistical estimate).
# I keep it deterministic so when I re-run later, I'm comparing apples to apples.

def eval_loss_for_steps(steps: int, n_batches: int = 20, seed: int = 0) -> float:
    ds = Task1Dataset(EVAL_NPZ)

    # I fixed subset for stable comparisons.
    g = torch.Generator().manual_seed(int(seed))
    idx = torch.randperm(len(ds), generator=g)[: int(64 * n_batches)].tolist()
    sub = Subset(ds, idx)

    use_cuda = (device.type == 'cuda')
    loader = DataLoader(sub, batch_size=64, shuffle=False, num_workers=0, pin_memory=bool(use_cuda))

    total = 0.0
    count = 0
    with torch.no_grad():
        for batch in loader:
            x = batch['x'].to(device, non_blocking=use_cuda)
            y = batch['y'].to(device, non_blocking=use_cuda)
            m = batch['mask'].to(device, non_blocking=use_cuda)
            with torch.amp.autocast(device_type=device.type, enabled=use_cuda):
                logits = model(x, steps=int(steps))
                total += float(masked_bce_with_logits(logits, y, m).item())
            count += 1
            if count >= int(n_batches):
                break
    return total / max(1, count)

steps_list = [1, 2, 4, 6, 8]
losses = [eval_loss_for_steps(s) for s in steps_list]

plt.figure(figsize=(6,4))
plt.plot(steps_list, losses, marker='o')
plt.xlabel('thinking steps')
plt.ylabel('masked BCE loss (approx)')
plt.title(f'Task 3 ({EVAL_NAME}): loss vs thinking steps')
plt.grid(True)
plt.show()

print('loss vs steps:')
for s, l in zip(steps_list, losses):
    print(f'  steps={s:>2d}  loss={l:.6f}')


In [None]:
# Step 4 — I evaluate gameplay: does the bot improve when I let it think longer?
import numpy as np

from minesweeper.game import MinesweeperGame, GameState
from models.task3.policy import select_safest_unrevealed_thinking


def play_one_game(*, diff: dict, model: ThinkingMinePredictor, steps: int, seed: int, max_steps: int = 512) -> dict:
    # I keep first-click selection deterministic per seed so I can compare steps fairly.
    rng = np.random.default_rng(int(seed))
    first_r = int(rng.integers(0, int(diff['height'])))
    first_c = int(rng.integers(0, int(diff['width'])))

    g = MinesweeperGame(height=int(diff['height']), width=int(diff['width']), num_mines=int(diff['num_mines']), seed=int(seed))
    buttons_clear = set()
    g.player_clicks(first_r, first_c, buttons_clear)

    n = 0
    while g.get_game_state() == GameState.PROG and n < int(max_steps):
        a = select_safest_unrevealed_thinking(
            model,
            g.get_visible_board(),
            device=device,
            steps=int(steps),
            temperature=1.0,
        )
        if a is None:
            break
        buttons_clear = set()
        g.player_clicks(int(a[0]), int(a[1]), buttons_clear)
        n += 1

    st = g.get_statistics()
    return {
        'won': bool(st['game_won']) and (int(st['mines_triggered']) == 0),
        'cells_opened': int(st['cells_opened']),
        'mines_triggered': int(st['mines_triggered']),
    }


def eval_play_vs_steps(*, diff: dict, model: ThinkingMinePredictor, steps_list: list[int], n_games: int = 60, seed0: int = 0) -> dict:
    out = {}
    seeds = [int(seed0) + i for i in range(int(n_games))]

    for s in steps_list:
        wins = 0
        mines = 0
        opened = 0
        for seed in seeds:
            r = play_one_game(diff=diff, model=model, steps=int(s), seed=int(seed))
            wins += int(r['won'])
            mines += int(r['mines_triggered'])
            opened += int(r['cells_opened'])

        out[int(s)] = {
            'n': int(n_games),
            'win_rate': float(wins) / float(max(1, n_games)),
            'avg_mines_triggered': float(mines) / float(max(1, n_games)),
            'avg_cells_opened': float(opened) / float(max(1, n_games)),
        }

    return out


# I evaluate all steps on the same seeds so the comparison is fair.
# I keep this lightweight by only bootstrapping a CI for win rate.

def bootstrap_mean_ci(x: np.ndarray, *, n_boot: int = 2000, alpha: float = 0.05, seed: int = 0) -> tuple[float, float, float]:
    x = np.asarray(x, dtype=np.float64)
    if x.size == 0:
        return (float('nan'), float('nan'), float('nan'))

    rng = np.random.default_rng(int(seed))
    n = int(x.size)
    means = []
    for _ in range(int(n_boot)):
        idx = rng.integers(0, n, size=n)
        means.append(float(np.mean(x[idx])))
    means = np.asarray(means, dtype=np.float64)

    lo = float(np.quantile(means, float(alpha) / 2.0))
    hi = float(np.quantile(means, 1.0 - float(alpha) / 2.0))
    return (float(np.mean(x)), lo, hi)


def eval_play_vs_steps_with_ci(*, diff: dict, model: ThinkingMinePredictor, steps_list: list[int], n_games: int = 60, seed0: int = 0) -> dict:
    out = {}
    seeds = [int(seed0) + i for i in range(int(n_games))]

    for s in steps_list:
        wins = []
        mines = []
        opened = []
        for seed in seeds:
            r = play_one_game(diff=diff, model=model, steps=int(s), seed=int(seed))
            wins.append(int(r['won']))
            mines.append(int(r['mines_triggered']))
            opened.append(int(r['cells_opened']))

        wins = np.asarray(wins, dtype=np.float64)
        mines = np.asarray(mines, dtype=np.float64)
        opened = np.asarray(opened, dtype=np.float64)

        mu, lo, hi = bootstrap_mean_ci(wins, seed=int(seed0) + 999 + int(s))

        out[int(s)] = {
            'n': int(n_games),
            'win_rate': float(mu),
            'win_ci_lo': float(lo),
            'win_ci_hi': float(hi),
            'avg_mines_triggered': float(np.mean(mines)),
            'avg_cells_opened': float(np.mean(opened)),
        }

    return out


PLAY_N = 60
play = eval_play_vs_steps_with_ci(diff=EVAL_CFG, model=model, steps_list=steps_list, n_games=int(PLAY_N), seed0=123)

print('\nTask 3 gameplay vs thinking steps (same seeds; deterministic first click per seed)')
for s in steps_list:
    d = play[int(s)]
    print(
        f"  steps={s:>2d} | win_rate={d['win_rate']:.3f} "
        f"(95% CI {d['win_ci_lo']:.3f}..{d['win_ci_hi']:.3f}) | "
        f"avg_mines={d['avg_mines_triggered']:.3f} | avg_opened={d['avg_cells_opened']:.1f}"
    )

plt.figure(figsize=(6,4))
x = np.asarray(steps_list, dtype=np.float64)
y = np.asarray([play[int(s)]['win_rate'] for s in steps_list], dtype=np.float64)
ylo = np.asarray([play[int(s)]['win_rate'] - play[int(s)]['win_ci_lo'] for s in steps_list], dtype=np.float64)
yhi = np.asarray([play[int(s)]['win_ci_hi'] - play[int(s)]['win_rate'] for s in steps_list], dtype=np.float64)

plt.errorbar(x, y, yerr=[ylo, yhi], marker='o', capsize=3)
plt.xlabel('thinking steps')
plt.ylabel('perfect win rate (clear with 0 mines)')
plt.title(f'Task 3 ({EVAL_NAME}): win rate vs thinking steps')
plt.grid(True)
plt.show()


In [None]:
# Step 5 — I plot heatmap evolution (required figure)
# Here I take one fixed partially-revealed board and show how mine probabilities change as I let the model think longer.

# I pick one fixed sample board so the per-step heatmaps are directly comparable.
sample = Task1Dataset(EVAL_NPZ)[0]
x0 = sample['x'].unsqueeze(0).to(device)
final_logits, per_step = model(x0, steps=8, return_all=True)

fig, axes = plt.subplots(2, 4, figsize=(14, 7))
axes = axes.reshape(-1)
for i in range(8):
    probs = torch.sigmoid(per_step[i]).squeeze(0).detach().cpu().numpy()
    ax = axes[i]
    im = ax.imshow(probs, vmin=0.0, vmax=1.0, cmap='viridis')
    ax.set_title(f'step {i+1}')
    ax.axis('off')
fig.colorbar(im, ax=axes.tolist(), fraction=0.02, pad=0.02)
plt.suptitle(f'Mine probability heatmap as I think longer ({EVAL_NAME})')
plt.show()


In [None]:
# Step 6 — I save the figures + JSON summaries into docs/figures
# I export JSON so I can re-plot in Notebook 05 without copy/pasting logs, and I export PNGs
# so I can drop the figures directly into my writeup.

import json

fig_dir = Path(repo_root) / 'docs' / 'figures'
fig_dir.mkdir(parents=True, exist_ok=True)
print('Figure dir:', fig_dir)

# I save loss vs steps
out_loss = {
    'difficulty': str(EVAL_NAME),
    'steps_list': [int(s) for s in steps_list],
    'losses': [float(x) for x in losses],
}
loss_json = fig_dir / f'task3_{EVAL_NAME}_loss_vs_steps.json'
loss_png = fig_dir / f'task3_{EVAL_NAME}_loss_vs_steps.png'

with open(loss_json, 'w') as f:
    json.dump(out_loss, f, indent=2)
print('wrote', loss_json)

plt.figure(figsize=(6,4))
plt.plot(steps_list, losses, marker='o')
plt.xlabel('thinking steps')
plt.ylabel('masked BCE loss (approx)')
plt.title(f'Task 3 ({EVAL_NAME}): loss vs thinking steps')
plt.grid(True)
plt.tight_layout()
plt.savefig(loss_png, dpi=200)
plt.close()
print('wrote', loss_png)

# I save gameplay vs steps
play_json = fig_dir / f'task3_{EVAL_NAME}_play_vs_steps.json'
play_png = fig_dir / f'task3_{EVAL_NAME}_win_rate_vs_steps.png'

with open(play_json, 'w') as f:
    json.dump({'difficulty': str(EVAL_NAME), 'by_steps': play}, f, indent=2)
print('wrote', play_json)

plt.figure(figsize=(6,4))
x = np.asarray(steps_list, dtype=np.float64)
y = np.asarray([play[int(s)]['win_rate'] for s in steps_list], dtype=np.float64)
ylo = np.asarray([play[int(s)]['win_rate'] - play[int(s)]['win_ci_lo'] for s in steps_list], dtype=np.float64)
yhi = np.asarray([play[int(s)]['win_ci_hi'] - play[int(s)]['win_rate'] for s in steps_list], dtype=np.float64)
plt.errorbar(x, y, yerr=[ylo, yhi], marker='o', capsize=3)
plt.xlabel('thinking steps')
plt.ylabel('perfect win rate (clear with 0 mines)')
plt.title(f'Task 3 ({EVAL_NAME}): win rate vs thinking steps')
plt.grid(True)
plt.tight_layout()
plt.savefig(play_png, dpi=200)
plt.close()
print('wrote', play_png)

# I save the heatmap grid
heat_png = fig_dir / f'task3_{EVAL_NAME}_heatmaps_steps_1_to_8.png'

fig, axes = plt.subplots(2, 4, figsize=(14, 7))
axes = axes.reshape(-1)
for i in range(8):
    probs = torch.sigmoid(per_step[i]).squeeze(0).detach().cpu().numpy()
    ax = axes[i]
    im = ax.imshow(probs, vmin=0.0, vmax=1.0, cmap='viridis')
    ax.set_title(f'step {i+1}')
    ax.axis('off')
fig.colorbar(im, ax=axes.tolist(), fraction=0.02, pad=0.02)
fig.suptitle(f'Task 3 ({EVAL_NAME}): heatmaps as I let the model think longer')
fig.tight_layout(rect=[0, 0, 1, 0.95])
fig.savefig(heat_png, dpi=200)
plt.close(fig)
print('wrote', heat_png)


In [None]:
# (Optional) After training, I immediately export a zip so I don't lose checkpoints if Colab disconnects.
# I run this right after Step 2 finishes.

from pathlib import Path
import zipfile

bundle_dir = Path('/content')
zip_out = bundle_dir / 'task3_artifacts.zip'

paths_to_bundle = [
    Path(repo_root) / 'models' / 'task3' / 'checkpoints',
    Path(repo_root) / 'docs' / 'figures',
]

if zip_out.exists():
    zip_out.unlink()

with zipfile.ZipFile(zip_out, 'w', compression=zipfile.ZIP_DEFLATED) as z:
    for root in paths_to_bundle:
        if not root.exists():
            continue
        for p in root.rglob('*'):
            if p.is_dir():
                continue
            rel = p.relative_to(Path(repo_root))
            z.write(p, arcname=str(rel))

mb = zip_out.stat().st_size / (1024 * 1024)
print(f'Wrote: {zip_out} ({mb:.2f} MB)')
print('In Colab: use the Files panel to download task3_artifacts.zip')

