# 04 — Infer Videos → PGN (Submission)

Use board warp + per‑cell CNN + rule engine to output PGN per video.

- Reads videos from `data/public/videos/*.mp4` (local) or a Kaggle input path you specify.
- Loads model from `models/cell_cnn.h5`.
- Writes `submissions/submission.csv` with `row_id,output`.


In [None]:
# %%capture
# !pip install --quiet opencv-python python-chess tqdm


In [1]:
# =========================
# 04 — Setup & Bootstrap
# =========================

# --- stdlib / third-party ---
import os, sys, json, csv
from pathlib import Path
from collections import deque
import numpy as np
import cv2
from tqdm import tqdm

# --- TF / Keras ---
import tensorflow as tf
from tensorflow.keras.applications.mobilenet_v2 import preprocess_input # type: ignore

# --- env: local vs Kaggle ---
ON_KAGGLE = Path("/kaggle").exists()
ROOT = Path("/kaggle/working") if ON_KAGGLE else Path("..").resolve()

# ให้ import โค้ดจากโปรเจกต์ได้ (ทั้งแบบแพ็กเกจและแบบ src/)
sys.path.insert(0, str(ROOT / "src"))
sys.path.insert(0, str(ROOT))

# --- project modules ---
from Chess_Detection_Competition.utils import load_config
from Chess_Detection_Competition.board import warp_board, split_grid
from Chess_Detection_Competition.model import load_model as load_cell_model
from Chess_Detection_Competition.pgn import diff_to_move, san_list_to_pgn, labels_to_board

# --- paths ---
VIDEOS_DIR   = ROOT / "data/public/videos"   # input videos
MODEL_PATH   = ROOT / "models/cell_cnn.h5"   # trained model (.h5)
CLASSES_JSON = ROOT / "models/classes.json"  # class order saved at training time
TRAIN_DIR    = ROOT / "data/final/train"     # fallback only (avoid if classes.json exists)
OUT_DIR      = ROOT / "submissions"
OUT_DIR.mkdir(parents=True, exist_ok=True)
SUBMIT_CSV   = OUT_DIR / "submission.csv"

# --- load config (board/cell/inference params) ---
try:
    cfg = load_config()  # configs/parameters.yaml
    BOARD_CFG   = cfg["board"]
    IMG_SIZE    = int(cfg["cells"]["img_size"])
    SMOOTH_K    = int(cfg["inference"]["smooth_k"])
    SAMPLE_STEP = int(cfg["inference"]["sample_step"])
    cfg_source  = "configs/parameters.yaml"
except Exception as e:
    BOARD_CFG = {
        "warp_size": 800,
        "canny_low": 60,
        "canny_high": 180,
        "hough_threshold": 120,
        "min_line_length": 120,
        "max_line_gap": 10,
    }
    IMG_SIZE, SMOOTH_K, SAMPLE_STEP = 96, 5, 3
    cfg_source = f"[fallback defaults] ({e})"

# --- load class order (MUST match training) ---
if CLASSES_JSON.exists():
    CLASSES = json.loads(CLASSES_JSON.read_text(encoding="utf-8"))
    print("Loaded classes from classes.json:", CLASSES)
else:
    # Fallback (ไม่แนะนำ): เดาจากโฟลเดอร์ train ตาม sorting ของ Keras
    subdirs = [p.name for p in TRAIN_DIR.iterdir() if p.is_dir()] if TRAIN_DIR.exists() else []
    CLASSES = sorted(subdirs)
    print("[WARN] classes.json missing. Inferred classes:", CLASSES)

# --- load trained cell-CNN ---
model = load_cell_model(str(MODEL_PATH))
try:
    num_out = model.output_shape[-1]
    assert num_out == len(CLASSES), (
        f"Model output ({num_out}) != num classes ({len(CLASSES)}) "
        f"→ ใช้ classes.json ที่ได้จากรันเทรนเดียวกัน"
    )
    print(f"OK: model outputs {num_out} classes.")
except Exception as e:
    print("[WARN] cannot verify model output vs classes:", e)

# --- debug helpers ---
def _dbg(s): print("[DBG]", s)

def _mask_changes(a, b):
    return np.array([[a[r][c] != b[r][c] for c in range(8)] for r in range(8)], dtype=bool)

def _diff_preview(prev_labels, now_labels, mask=None, max_list=8):
    rows, coords = [], []
    for r in range(8):
        marks = []
        for c in range(8):
            changed = (prev_labels[r][c] != now_labels[r][c])
            if mask is not None:
                changed = changed and bool(mask[r, c])
            marks.append("X" if changed else ".")
            if changed:
                coords.append((r, c, prev_labels[r][c], now_labels[r][c]))
        rows.append(" ".join(marks))
    for s in rows: _dbg(s)
    if coords:
        _dbg("changes (r,c: prev→now):")
        for (r,c,a,b) in coords[:max_list]:
            _dbg(f"  ({r},{c}): {a} → {b}")
        if len(coords) > max_list:
            _dbg(f"  ... (+{len(coords)-max_list} more)")

# --- Homography bootstrap (FORCE_H0) ---
FORCE_H0 = None
FORCE_WARP_SIZE = None  # (W,H)

def scan_for_H0(video_path: Path, max_scan=120, step=2):
    cap = cv2.VideoCapture(str(video_path))
    H0, wh = None, None
    f = 0
    while f < max_scan:
        ok = cap.grab()
        if not ok: break
        f += 1
        if f % step: continue
        ok, frame = cap.retrieve()
        if not ok or frame is None: continue
        warped, aux = warp_board(frame, {"board": BOARD_CFG})
        if warped is not None and getattr(warped, "size", 0) > 0 and isinstance(aux, dict) and aux.get("H") is not None:
            H0 = aux["H"]
            wh = (warped.shape[1], warped.shape[0])
            print(f"[HBOOT] Found H at frame {f}: size={wh}")
            break
    cap.release()
    if H0 is None:
        print(f"[HBOOT] No H found in first ~{max_scan} frames")
    return H0, wh

def run_video_with_homography(video_path: Path):
    global FORCE_H0, FORCE_WARP_SIZE
    FORCE_H0, FORCE_WARP_SIZE = scan_for_H0(video_path, max_scan=120, step=2)
    if FORCE_H0 is None:
        print("[HBOOT] Proceeding WITHOUT fixed H (results may be noisy)")
    else:
        print("[HBOOT] Using fixed H for all frames")
    pgn = decode_video_to_pgn(video_path)  # type: ignore[name-defined]
    FORCE_H0, FORCE_WARP_SIZE = None, None  # reset
    return pgn

# --- environment summary ---
video_files = sorted(VIDEOS_DIR.glob("*.mp4"))
print("=== Environment ===")
print("Kaggle     :", ON_KAGGLE)
print("ROOT       :", ROOT)
print("Config     :", cfg_source)
print("Model path :", MODEL_PATH, "exists?", MODEL_PATH.exists())
print("IMG_SIZE   :", IMG_SIZE)
print("Videos dir :", VIDEOS_DIR, "| found:", len(video_files))


Loaded classes from classes.json: ['BB', 'BK', 'BN', 'BP', 'BQ', 'BR', 'Empty', 'WB', 'WK', 'WN', 'WP', 'WQ', 'WR']




OK: model outputs 13 classes.
=== Environment ===
Kaggle     : False
ROOT       : C:\Users\worap\Downloads\image_processing_term_orject\Chess_Detection_Competition
Config     : configs/parameters.yaml
Model path : C:\Users\worap\Downloads\image_processing_term_orject\Chess_Detection_Competition\models\cell_cnn.h5 exists? True
IMG_SIZE   : 96
Videos dir : C:\Users\worap\Downloads\image_processing_term_orject\Chess_Detection_Competition\data\public\videos | found: 5


In [2]:
# =========================
# Inference helpers & tunables
# =========================
import chess
from copy import deepcopy

# ---- Tunables ----
TAU = 0.45                   # thresh ความมั่นใจต่อ cell
SMOOTH_K = max(9, int(SMOOTH_K))  # moving average per-cell
SETTLE = 15                  # warm-up frames
SAMPLE_STEP = 1              # อ่านทุกเฟรม
MIN_CH, MAX_CH = 2, 8        # เกณฑ์จำนวนช่องที่ควรเปลี่ยนใน 1 move
PENDING_HORIZON = 2
ENFORCE_LEGAL = True         # ใช้กฎหมากรุกจริง
REQUIRE_STABLE_FRAMES = 2    # label ใหม่ต้องคงอยู่กี่เฟรม ถึงจะยอม “เปลี่ยน”

def predict_labels8x8_with_conf(warped_bgr, buffers):
    cells = split_grid(warped_bgr, IMG_SIZE)
    X = []
    for _, patch in cells:
        rgb = cv2.cvtColor(patch, cv2.COLOR_BGR2RGB).astype(np.float32)
        X.append(preprocess_input(rgb))
    X = np.asarray(X, dtype=np.float32)
    probs = model.predict(X, verbose=0)  # (64, C)

    labels = [[None]*8 for _ in range(8)]
    confs  = np.zeros((8, 8), dtype=np.float32)
    k = 0
    for r in range(8):
        for c in range(8):
            buffers[r][c].append(probs[k])
            avg = np.mean(np.stack(buffers[r][c], axis=0), axis=0)
            idx = int(np.argmax(avg))
            labels[r][c] = CLASSES[idx]
            confs[r, c]  = float(avg[idx])
            k += 1
    return labels, confs

# ---- legal-move resolver ----
def rc_to_square(r, c):
    file = c
    rank = 7 - r
    return chess.square(file, rank)

def find_from_to_pair(prev_labels, now_labels, eff_mask):
    srcs, dsts = [], []
    for r in range(8):
        for c in range(8):
            if not eff_mask[r, c]:
                continue
            a, b = prev_labels[r][c], now_labels[r][c]
            if a != "Empty" and b == "Empty":
                srcs.append((r, c, a))
            if a == "Empty" and b != "Empty":
                dsts.append((r, c, b))
    if not srcs or not dsts:
        return None
    best, best_d = None, 1e9
    for (rs, cs, pa) in srcs:
        for (rd, cd, pb) in dsts:
            d = abs(rs - rd) + abs(cs - cd)
            if d < best_d:
                best_d = d
                best = ((rs, cs, pa), (rd, cd, pb))
    return best

def make_move_candidates(rs, cs, rd, cd, try_promos=("Q","R","B","N")):
    from_sq = rc_to_square(rs, cs)
    to_sq   = rc_to_square(rd, cd)
    cands = [chess.Move(from_sq, to_sq)]
    for sym in try_promos:
        promo = {"Q": chess.QUEEN, "R": chess.ROOK, "B": chess.BISHOP, "N": chess.KNIGHT}[sym]
        cands.append(chess.Move(from_sq, to_sq, promotion=promo))
    return cands

def resolve_move_by_legality(prev_labels, now_labels, eff_mask, try_promos=("Q","R","B","N")):
    b_prev = labels_to_board(prev_labels)
    pair = find_from_to_pair(prev_labels, now_labels, eff_mask)
    if pair is None:
        return None
    (rs, cs, _), (rd, cd, _) = pair
    candidates = make_move_candidates(rs, cs, rd, cd, try_promos=try_promos)
    legal = set(b_prev.legal_moves)
    for mv in candidates:
        if mv in legal:
            return mv
    return None


In [3]:
# =========================
# Main inference: decode_video_to_pgn
# =========================
from copy import deepcopy

def decode_video_to_pgn(video_path: Path) -> str:
    cap = cv2.VideoCapture(str(video_path))
    ok, frame0 = cap.read()
    if not ok or frame0 is None:
        _dbg(f"read fail: {video_path.name}")
        cap.release(); return ""

    # ใช้ FORCE_H0 ถ้ามี
    if FORCE_H0 is not None and FORCE_WARP_SIZE is not None:
        warped0 = cv2.warpPerspective(frame0, FORCE_H0, FORCE_WARP_SIZE)
        aux0 = {"H": FORCE_H0}
        _dbg("[HUSE] using FORCE_H0 on frame0")
    else:
        warped0, aux0 = warp_board(frame0, {"board": BOARD_CFG})

    if warped0 is None or getattr(warped0, "size", 0) == 0:
        _dbg(f"warp fail: {video_path.name}")
        cap.release(); return ""

    H0 = aux0.get("H") if isinstance(aux0, dict) else None
    _dbg(f"frame0 shape={frame0.shape}, warped shape={warped0.shape}, have H? {H0 is not None}")

    # buffers & state
    buffers = [[deque(maxlen=SMOOTH_K) for _ in range(8)] for __ in range(8)]
    stable = [[None]*8 for _ in range(8)]
    steady = [[0]*8 for _ in range(8)]

    prev_labels, prev_confs = predict_labels8x8_with_conf(warped0, buffers)
    for r in range(8):
        for c in range(8):
            stable[r][c] = prev_labels[r][c]
            steady[r][c] = REQUIRE_STABLE_FRAMES
    _dbg(f"first labels sample: {prev_labels[0][:4]} ...")

    # warm-up
    for _ in range(SETTLE):
        ok, fr = cap.read()
        if not ok: break
        if FORCE_H0 is not None and FORCE_WARP_SIZE is not None:
            warped = cv2.warpPerspective(fr, FORCE_H0, FORCE_WARP_SIZE)
        else:
            warped = cv2.warpPerspective(fr, H0, (warped0.shape[1], warped0.shape[0])) if H0 is not None else warp_board(fr, {"board": BOARD_CFG})[0]
        if warped is None or getattr(warped, "size", 0) == 0:
            continue
        _ = predict_labels8x8_with_conf(warped, buffers)

    sans = []
    frame_id = SETTLE
    pending = None

    while True:
        ok, frame = cap.read()
        if not ok: break
        frame_id += 1
        if frame_id % SAMPLE_STEP:
            continue

        if FORCE_H0 is not None and FORCE_WARP_SIZE is not None:
            warped = cv2.warpPerspective(frame, FORCE_H0, FORCE_WARP_SIZE)
        else:
            warped = cv2.warpPerspective(frame, H0, (warped0.shape[1], warped0.shape[0])) if H0 is not None else warp_board(frame, {"board": BOARD_CFG})[0]

        if warped is None or getattr(warped, "size", 0) == 0:
            _dbg(f"warp fail midstream @ f{frame_id}")
            continue

        now_labels_raw, now_confs = predict_labels8x8_with_conf(warped, buffers)

        # sticky โดย TAU
        sticky = [[now_labels_raw[r][c] if now_confs[r, c] >= TAU else stable[r][c]
                   for c in range(8)] for r in range(8)]

        # per-cell stability
        for r in range(8):
            for c in range(8):
                if sticky[r][c] == stable[r][c]:
                    steady[r][c] = min(steady[r][c] + 1, REQUIRE_STABLE_FRAMES)
                else:
                    steady[r][c] = 1
                if steady[r][c] >= REQUIRE_STABLE_FRAMES:
                    stable[r][c] = sticky[r][c]

        # diff + gating
        conf_or = (prev_confs >= TAU) | (now_confs >= TAU)
        diff_raw = _mask_changes(stable, sticky)
        eff_mask = conf_or & diff_raw
        changes_eff = int(eff_mask.sum())
        if frame_id % (5 * max(1, SAMPLE_STEP)) == 0:
            _dbg(f"f{frame_id}: changes_eff={changes_eff}")

        committed = False
        if MIN_CH <= changes_eff <= MAX_CH:
            mv = diff_to_move(stable, sticky)
            if ENFORCE_LEGAL and mv is None:
                try_promos = tuple(cfg.get("pgn", {}).get("try_promotions", ["Q","R","B","N"]))
                mv = resolve_move_by_legality(stable, sticky, eff_mask, try_promos=try_promos)

            if mv is not None:
                b_prev = labels_to_board(stable)
                try:
                    san = b_prev.san(mv)
                except Exception as e:
                    _dbg(f"SAN fail @ f{frame_id}: {e}")
                    _diff_preview(stable, sticky, eff_mask)
                else:
                    sans.append(san)
                    _dbg(f"f{frame_id}: SAN={san} (eff={changes_eff})")
                    prev_labels = deepcopy(sticky)
                    prev_confs = now_confs.copy()
                    stable = deepcopy(sticky)
                    steady = [[REQUIRE_STABLE_FRAMES]*8 for _ in range(8)]
                    pending = None
                    committed = True
            else:
                _dbg(f"f{frame_id}: no {'legal ' if ENFORCE_LEGAL else ''}move matched (eff={changes_eff})")
                _diff_preview(stable, sticky, eff_mask)
                pending = {"frame": frame_id, "labels": deepcopy(stable), "mask": eff_mask.copy()}

        # pending combine
        if not committed and pending is not None and (frame_id - pending["frame"]) <= PENDING_HORIZON:
            combined_mask = pending["mask"] | eff_mask
            combined_changes = int(combined_mask.sum())
            mv2 = diff_to_move(stable, sticky)
            if ENFORCE_LEGAL and mv2 is None:
                try_promos = tuple(cfg.get("pgn", {}).get("try_promotions", ["Q","R","B","N"]))
                mv2 = resolve_move_by_legality(stable, sticky, combined_mask, try_promos=try_promos)
            if mv2 is not None:
                b_prev = labels_to_board(stable)
                try:
                    san = b_prev.san(mv2)
                except Exception as e:
                    _dbg(f"SAN fail (pending) @ f{frame_id}: {e}")
                    _diff_preview(stable, sticky, combined_mask)
                else:
                    sans.append(san)
                    _dbg(f"f{frame_id}: SAN={san} (pending, combined={combined_changes})")
                    prev_labels = deepcopy(sticky)
                    prev_confs = now_confs.copy()
                    stable = deepcopy(sticky)
                    steady = [[REQUIRE_STABLE_FRAMES]*8 for _ in range(8)]
                    pending = None

        if pending is not None and (frame_id - pending["frame"]) > PENDING_HORIZON:
            pending = None

    cap.release()
    _dbg(f"done. total SAN moves = {len(sans)}")
    if sans: _dbg(f"first SAN moves = {sans[:10]}")
    try:
        return san_list_to_pgn(sans)
    except Exception as e:
        _dbg(f"san_list_to_pgn error: {e}")
        return " ".join(sans)


In [4]:
# =========================
# Quick test one video (uses fixed H0)
# =========================
video_files = sorted(VIDEOS_DIR.glob("*.mp4"))
print("videos:", len(video_files))

if video_files:
    p = video_files[0]
    print("TEST:", p.name)
    # ให้เห็น log ชัด ๆ ระหว่างทดสอบ
    SMOOTH_K = 1
    SAMPLE_STEP = 1
    pgn = run_video_with_homography(p)
    print("\nPGN =", pgn if pgn else "[EMPTY]")
else:
    print("[WARN] no videos found")


videos: 5
TEST: 2_Move_rotate_student.mp4
[HBOOT] No H found in first ~120 frames
[HBOOT] Proceeding WITHOUT fixed H (results may be noisy)
[DBG] frame0 shape=(1920, 1080, 3), warped shape=(800, 800, 3), have H? False
[DBG] first labels sample: ['WN', 'BN', 'BB', 'BQ'] ...
[DBG] f20: changes_eff=10
[DBG] f25: changes_eff=12
[DBG] f30: changes_eff=11
[DBG] f35: changes_eff=8
[DBG] f35: no legal move matched (eff=8)
[DBG] X . . . . . . .
[DBG] . . X . . . X .
[DBG] . . . . . . . .
[DBG] X X . . . . X .
[DBG] . . . . . . . .
[DBG] . . . . . . . .
[DBG] . X . . . . . .
[DBG] . . . . X . . .
[DBG] changes (r,c: prev→now):
[DBG]   (0,0): WN → WR
[DBG]   (1,2): WP → BP
[DBG]   (1,6): BP → Empty
[DBG]   (3,0): BP → Empty
[DBG]   (3,1): Empty → BP
[DBG]   (3,6): Empty → WP
[DBG]   (6,1): Empty → BP
[DBG]   (7,4): WP → Empty
[DBG] f40: changes_eff=11
[DBG] f45: changes_eff=15
[DBG] f50: changes_eff=12
[DBG] f55: changes_eff=13
[DBG] f60: changes_eff=18
[DBG] f65: changes_eff=14


KeyboardInterrupt: 

In [None]:
rows = []
video_files = sorted(VIDEOS_DIR.glob("*.mp4"))
if not video_files:
    print(f"[warn] no videos found in: {VIDEOS_DIR}")

for v in tqdm(video_files, desc="Decoding videos"):
    row_id = v.stem
    pgn = decode_video_to_pgn(v)
    rows.append((row_id, pgn))
    print(f"{row_id} -> {pgn}")

with open(SUBMIT_CSV, "w", newline="", encoding="utf-8") as f:
    w = csv.writer(f)
    w.writerow(["row_id", "output"])
    w.writerows(rows)

print("✅ submission saved to:", SUBMIT_CSV.resolve())
