## testing for Transform Vit Swin

In [41]:
# =========================
# 0) Setup & configuration
# =========================
# pip install -U timm torch torchvision torchmetrics

import os, math, random, time
from pathlib import Path
import numpy as np
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import timm

# ---- Paths (edit to your dataset root) ----
PROC_DIR = Path(r"C:\Users\jians\Documents\GitHub\SMART-Barcode-Scanner-and-Face-Recognition\data\data_proc")
TRAIN_DIR = PROC_DIR/"train"
VAL_DIR   = PROC_DIR/"val"
TEST_DIR  = PROC_DIR/"test"

# ---- Reproducibility ----
SEED = 42
random.seed(SEED); np.random.seed(SEED); torch.manual_seed(SEED)

# ---- Device (CPU-only as requested) ----
device = torch.device("cpu")

# ---- Hyperparams (CPU-friendly) ----
IMSIZE   = 224          # internal resize
BATCH    = 32
EPOCHS   = 10           # you can bump to 25 if time allows
BASE_LR  = 3e-4
WEIGHT_DECAY = 1e-4

# ---- Data transforms ----
train_tf = transforms.Compose([
    transforms.Resize((IMSIZE, IMSIZE)),
    transforms.RandomHorizontalFlip(),
    transforms.ColorJitter(brightness=0.1, contrast=0.1),
    transforms.ToTensor(),
    transforms.Normalize([0.5]*3, [0.5]*3),
])
eval_tf = transforms.Compose([
    transforms.Resize((IMSIZE, IMSIZE)),
    transforms.ToTensor(),
    transforms.Normalize([0.5]*3, [0.5]*3),
])

# ---- Datasets & loaders ----
train_set = datasets.ImageFolder(TRAIN_DIR, transform=train_tf)
val_set   = datasets.ImageFolder(VAL_DIR,   transform=eval_tf)
test_set  = datasets.ImageFolder(TEST_DIR,  transform=eval_tf)

NUM_CLASSES = len(train_set.classes)
print("Classes:", NUM_CLASSES)

train_loader = DataLoader(train_set, batch_size=BATCH, shuffle=True,  num_workers=2, pin_memory=False)
val_loader   = DataLoader(val_set,   batch_size=BATCH, shuffle=False, num_workers=2, pin_memory=False)
test_loader  = DataLoader(test_set,  batch_size=BATCH, shuffle=False, num_workers=2, pin_memory=False)


Classes: 31


In [42]:
# =========================
# 1) Model: Swin-T backbone
#    + 256-D embedding head
#    + ArcFace margin head
# =========================

class L2Norm(nn.Module):
    def __init__(self, eps=1e-6): 
        super().__init__(); self.eps = eps
    def forward(self, x): 
        return F.normalize(x, dim=1, eps=self.eps)

class ArcMarginProduct(nn.Module):
    """ArcFace head: additive angular margin."""
    def __init__(self, in_features, out_features, s=30.0, m=0.50, easy_margin=False):
        super().__init__()
        self.in_features = in_features
        self.out_features = out_features
        self.s = s
        self.m = m
        self.weight = nn.Parameter(torch.FloatTensor(out_features, in_features))
        nn.init.xavier_uniform_(self.weight)
        self.easy_margin = easy_margin
        self.cos_m = math.cos(m)
        self.sin_m = math.sin(m)
        self.th = math.cos(math.pi - m)
        self.mm = math.sin(math.pi - m) * m

    def forward(self, emb, labels):
        # emb: (B, D) L2-normalized
        # labels: (B,) long
        W = F.normalize(self.weight)
        cosine = F.linear(emb, W)  # (B, C)
        sine = torch.sqrt(torch.clamp(1.0 - cosine**2, min=1e-9))
        phi = cosine * self.cos_m - sine * self.sin_m
        if self.easy_margin:
            phi = torch.where(cosine > 0, phi, cosine)
        else:
            phi = torch.where(cosine > self.th, phi, cosine - self.mm)

        one_hot = torch.zeros_like(cosine)
        one_hot.scatter_(1, labels.view(-1,1), 1.0)
        logits = (one_hot * phi) + ((1.0 - one_hot) * cosine)
        logits *= self.s
        return logits  # (B, C)

class SwinTinyEmbedder(nn.Module):
    def __init__(self, embed_dim=256, pretrained=True):
        super().__init__()
        # Try to load pretrained; fall back gracefully
        try:
            self.backbone = timm.create_model(
                "swin_tiny_patch4_window7_224",
                pretrained=pretrained,
                num_classes=0,   # feature extractor
                global_pool="avg"
            )
            print("Loaded pretrained Swin-Tiny.")
        except Exception as e:
            print("Could not load pretrained weights, using random init:", e)
            self.backbone = timm.create_model(
                "swin_tiny_patch4_window7_224",
                pretrained=False,
                num_classes=0,
                global_pool="avg"
            )
        in_feats = self.backbone.num_features
        self.proj = nn.Linear(in_feats, embed_dim, bias=False)
        self.l2 = L2Norm()

    def forward(self, x):
        f = self.backbone(x)     # (B, F)
        e = self.proj(f)         # (B, embed_dim)
        e = self.l2(e)           # L2-normalized
        return e

embed_dim = 256
embedder = SwinTinyEmbedder(embed_dim=embed_dim, pretrained=True).to(device)
arcface  = ArcMarginProduct(in_features=embed_dim, out_features=NUM_CLASSES, s=30.0, m=0.5).to(device)

# Simple wrapper for training
class FRModel(nn.Module):
    def __init__(self, embedder, arcface):
        super().__init__()
        self.embedder = embedder
        self.arcface = arcface
    def forward(self, x, labels=None):
        emb = self.embedder(x)
        if labels is None:
            return emb
        logits = self.arcface(emb, labels)
        return logits, emb

model = FRModel(embedder, arcface).to(device)

# Optimizer & schedule
optimizer = torch.optim.AdamW(model.parameters(), lr=BASE_LR, weight_decay=WEIGHT_DECAY)
# Cosine decay with warmup
total_steps = max(len(train_loader)*EPOCHS, 1)
warmup_steps = int(0.05 * total_steps)
def lr_lambda(step):
    if step < warmup_steps:
        return float(step) / max(1, warmup_steps)
    p = (step - warmup_steps) / max(1, total_steps - warmup_steps)
    return 0.5 * (1.0 + math.cos(math.pi * p))
scheduler = torch.optim.lr_scheduler.LambdaLR(optimizer, lr_lambda=lr_lambda)

criterion = nn.CrossEntropyLoss()


Loaded pretrained Swin-Tiny.


In [45]:
# ===== Checkpoint utils =====
import os, torch

CKPT_DIR = "checkpoints_transformer"
os.makedirs(CKPT_DIR, exist_ok=True)

def _model_state(m):
    # handles DataParallel/DistributedDataParallel transparently
    return m.module.state_dict() if hasattr(m, "module") else m.state_dict()

def save_checkpoint(path, epoch, model, optimizer=None, scheduler=None, best_val_acc=None, extra=None):
    ckpt = {
        "epoch": epoch,
        "model_state_dict": _model_state(model),
        "best_val_acc": best_val_acc,
    }
    if optimizer is not None:
        ckpt["optimizer_state_dict"] = optimizer.state_dict()
    if scheduler is not None:
        ckpt["scheduler_state_dict"] = scheduler.state_dict()
    if extra is not None:
        ckpt["extra"] = extra
    torch.save(ckpt, path)

def load_checkpoint(path, model, optimizer=None, scheduler=None, map_location="cpu"):
    ckpt = torch.load(path, map_location=map_location)
    model.load_state_dict(ckpt["model_state_dict"])
    if optimizer is not None and "optimizer_state_dict" in ckpt:
        optimizer.load_state_dict(ckpt["optimizer_state_dict"])
    if scheduler is not None and "scheduler_state_dict" in ckpt:
        scheduler.load_state_dict(ckpt["scheduler_state_dict"])
    start_epoch = ckpt.get("epoch", 0) + 1
    best_val_acc = ckpt.get("best_val_acc", 0.0)
    return start_epoch, best_val_acc, ckpt.get("extra", None)


In [46]:
# =========================
# 2) Train (classification)
# =========================
def run_epoch(loader, train=True):
    model.train(train)
    running_loss, correct, count = 0.0, 0, 0
    for x, y in loader:
        x = x.to(device); y = y.to(device)
        if train:
            optimizer.zero_grad()
            logits, _ = model(x, y)
            loss = criterion(logits, y)
            loss.backward()
            optimizer.step()
            scheduler.step()  # <- you were stepping per-batch; keep if intended
        else:
            with torch.no_grad():
                logits, _ = model(x, y)
                loss = criterion(logits, y)
        running_loss += loss.item() * x.size(0)
        pred = logits.argmax(1)
        correct += (pred == y).sum().item()
        count += x.size(0)
    return running_loss / max(1, count), correct / max(1, count)

best_val_acc = 0.0
best_epoch = 0

for epoch in range(1, EPOCHS + 1):
    t0 = time.time()
    tr_loss, tr_acc = run_epoch(train_loader, train=True)
    val_loss, val_acc = run_epoch(val_loader,   train=False)
    dt = time.time() - t0

    print(f"Epoch {epoch:02d}/{EPOCHS} | "
          f"train_acc {tr_acc:.3f} loss {tr_loss:.3f} | "
          f"val_acc {val_acc:.3f} loss {val_loss:.3f} | {dt:.1f}s")

    # ---- Save "last" checkpoint every epoch ----
    save_checkpoint(
        path=os.path.join(CKPT_DIR, "last.pt"),
        epoch=epoch,
        model=model,
        optimizer=optimizer,
        scheduler=scheduler,
        best_val_acc=best_val_acc,
        extra={"val_acc": float(val_acc), "val_loss": float(val_loss)}
    )
    # Also keep a light weights-only file
    torch.save(_model_state(model), os.path.join(CKPT_DIR, "last_weights.pth"))

    # ---- Save "best" when validation improves ----
    if val_acc > best_val_acc:
        best_val_acc = val_acc
        best_epoch = epoch
        save_checkpoint(
            path=os.path.join(CKPT_DIR, "best.pt"),
            epoch=epoch,
            model=model,
            optimizer=optimizer,
            scheduler=scheduler,
            best_val_acc=best_val_acc,
            extra={"val_acc": float(val_acc), "val_loss": float(val_loss)}
        )
        torch.save(_model_state(model), os.path.join(CKPT_DIR, "best_weights.pth"))
        print(f"✅ New best @ epoch {epoch}: val_acc={val_acc:.4f} (saved)")

print(f"Training done. Best val_acc={best_val_acc:.4f} @ epoch {best_epoch}")

# Optionally load the best weights into model for downstream eval/inference:
start_epoch, best_val_acc_loaded, _ = load_checkpoint(
    os.path.join(CKPT_DIR, "best.pt"), model, optimizer=None, scheduler=None, map_location=device
)
print("Loaded best checkpoint for evaluation.")


Epoch 01/10 | train_acc 0.016 loss 15.908 | val_acc 0.073 loss 13.887 | 369.9s
✅ New best @ epoch 1: val_acc=0.0728 (saved)
Epoch 02/10 | train_acc 0.173 loss 11.191 | val_acc 0.328 loss 8.502 | 262.1s
✅ New best @ epoch 2: val_acc=0.3277 (saved)
Epoch 03/10 | train_acc 0.467 loss 5.837 | val_acc 0.557 loss 5.647 | 317.3s
✅ New best @ epoch 3: val_acc=0.5574 (saved)
Epoch 04/10 | train_acc 0.651 loss 3.542 | val_acc 0.588 loss 5.839 | 254.0s
✅ New best @ epoch 4: val_acc=0.5882 (saved)
Epoch 05/10 | train_acc 0.804 loss 1.596 | val_acc 0.745 loss 3.903 | 278.4s
✅ New best @ epoch 5: val_acc=0.7451 (saved)
Epoch 06/10 | train_acc 0.906 loss 0.689 | val_acc 0.804 loss 3.062 | 301.0s
✅ New best @ epoch 6: val_acc=0.8039 (saved)
Epoch 07/10 | train_acc 0.955 loss 0.226 | val_acc 0.824 loss 2.687 | 308.8s
✅ New best @ epoch 7: val_acc=0.8235 (saved)
Epoch 08/10 | train_acc 0.972 loss 0.150 | val_acc 0.840 loss 2.677 | 310.2s
✅ New best @ epoch 8: val_acc=0.8403 (saved)
Epoch 09/10 | train_a

In [47]:
# === Evaluation: load best.pt and compute metrics on test_loader ===
import os, json, numpy as np, torch
import torch.nn.functional as F
from pathlib import Path

from sklearn.metrics import (
    accuracy_score, precision_recall_fscore_support,
    classification_report, confusion_matrix, roc_auc_score
)

CKPT_DIR = "checkpoints_transformer"
OUT_DIR  = "artifacts"
Path(OUT_DIR).mkdir(parents=True, exist_ok=True)

# 1) Load best checkpoint into the existing model
_ = load_checkpoint(os.path.join(CKPT_DIR, "best.pt"), model, optimizer=None, scheduler=None, map_location=device)
model.to(device)
model.eval()

# 2) Run through test set and collect logits/probs/labels
y_true, y_pred, y_prob_list = [], [], []
running_loss, n_samples = 0.0, 0

with torch.no_grad():
    for xb, yb in test_loader:
        xb = xb.to(device)
        yb = yb.to(device)

        out = model(xb, yb)  # your forward accepts (x, y); adjust to model(xb) if not needed
        logits = out[0] if isinstance(out, (tuple, list)) else out

        loss = criterion(logits, yb)
        running_loss += float(loss.item()) * xb.size(0)
        n_samples += xb.size(0)

        probs = F.softmax(logits, dim=1)
        preds = torch.argmax(probs, dim=1)

        y_prob_list.append(probs.detach().cpu().numpy())
        y_true.append(yb.detach().cpu().numpy())
        y_pred.append(preds.detach().cpu().numpy())

test_loss = running_loss / max(1, n_samples)
y_prob = np.concatenate(y_prob_list, axis=0)
y_true = np.concatenate(y_true, axis=0).astype(int)
y_pred = np.concatenate(y_pred, axis=0).astype(int)
num_classes = y_prob.shape[1]

# Try to get class names from the dataset; else 0..C-1
try:
    CLASS_NAMES = list(getattr(test_loader.dataset, "classes"))
    if len(CLASS_NAMES) != num_classes:
        CLASS_NAMES = [str(i) for i in range(num_classes)]
except Exception:
    CLASS_NAMES = [str(i) for i in range(num_classes)]

# 3) Metrics
top1_acc = accuracy_score(y_true, y_pred)

k = min(5, num_classes)
if k > 1:
    topk_idx = np.argsort(y_prob, axis=1)[:, -k:]
    topk_acc = np.mean([y_true[i] in topk_idx[i] for i in range(len(y_true))])
else:
    topk_acc = top1_acc

prec_macro, rec_macro, f1_macro, _ = precision_recall_fscore_support(
    y_true, y_pred, average="macro", zero_division=0
)
prec_micro, rec_micro, f1_micro, _ = precision_recall_fscore_support(
    y_true, y_pred, average="micro", zero_division=0
)
prec_weighted, rec_weighted, f1_weighted, _ = precision_recall_fscore_support(
    y_true, y_pred, average="weighted", zero_division=0
)

cm = confusion_matrix(y_true, y_pred, labels=list(range(num_classes)))
per_class_acc = {}
for i, name in enumerate(CLASS_NAMES):
    row = cm[i].sum()
    per_class_acc[name] = float(cm[i, i] / row) if row > 0 else 0.0

# ROC-AUC (OvR macro). Needs one-hot labels
try:
    y_true_1h = np.eye(num_classes, dtype=np.float32)[y_true]
    roc_auc_macro = float(roc_auc_score(y_true_1h, y_prob, multi_class="ovr", average="macro"))
except Exception:
    roc_auc_macro = None

# 4) Print summary
print("\n=== Test Metrics (ViT) ===")
print(f"Loss        : {test_loss:.4f}")
print(f"Top-1 Acc   : {top1_acc:.4f}")
print(f"Top-{k} Acc : {topk_acc:.4f}")
print(f"Precision   : macro={prec_macro:.4f} | micro={prec_micro:.4f} | weighted={prec_weighted:.4f}")
print(f"Recall      : macro={rec_macro:.4f} | micro={rec_micro:.4f} | weighted={rec_weighted:.4f}")
print(f"F1-score    : macro={f1_macro:.4f}  | micro={f1_micro:.4f}  | weighted={f1_weighted:.4f}")
print(f"ROC-AUC(m)  : {roc_auc_macro if roc_auc_macro is not None else 'N/A'}")

print("\nPer-class accuracy:")
for name, acc in per_class_acc.items():
    print(f" - {name}: {acc:.4f}")

print("\nClassification report:")
print(classification_report(y_true, y_pred, target_names=CLASS_NAMES, digits=4, zero_division=0))

# 5) Save artifacts
report = {
    "loss": float(test_loss),
    "top1_acc": float(top1_acc),
    f"top{k}_acc": float(topk_acc),
    "precision": {"macro": float(prec_macro), "micro": float(rec_micro), "weighted": float(prec_weighted)},
    "recall":    {"macro": float(rec_macro), "micro": float(rec_micro), "weighted": float(rec_weighted)},
    "f1":        {"macro": float(f1_macro),  "micro": float(f1_micro),  "weighted": float(f1_weighted)},
    "roc_auc_macro_ovr": roc_auc_macro,
    "per_class_accuracy": per_class_acc,
    "class_names": CLASS_NAMES,
}
with open(os.path.join(OUT_DIR, "vit_test_metrics.json"), "w", encoding="utf-8") as f:
    json.dump(report, f, indent=2)
np.save(os.path.join(OUT_DIR, "vit_confusion_matrix.npy"), cm)
print(f"\nSaved: {OUT_DIR}/vit_test_metrics.json and {OUT_DIR}/vit_confusion_matrix.npy")



=== Test Metrics (ViT) ===
Loss        : 2.2504
Top-1 Acc   : 0.8375
Top-5 Acc : 0.9175
Precision   : macro=0.8404 | micro=0.8375 | weighted=0.8432
Recall      : macro=0.8293 | micro=0.8375 | weighted=0.8375
F1-score    : macro=0.8311  | micro=0.8375  | weighted=0.8366
ROC-AUC(m)  : 0.9316058509960582

Per-class accuracy:
 - Akshay Kumar: 1.0000
 - Alexandra Daddario: 0.9333
 - Alia Bhatt: 0.7500
 - Amitabh Bachchan: 0.8333
 - Andy Samberg: 0.8667
 - Anushka Sharma: 0.6364
 - Billie Eilish: 0.6429
 - Brad Pitt: 0.8947
 - Camila Cabello: 0.9231
 - Charlize Theron: 0.8462
 - Claire Holt: 0.6667
 - Courtney Cox: 0.7500
 - Dwayne Johnson: 1.0000
 - Elizabeth Olsen: 0.8182
 - Ellen Degeneres: 0.9167
 - Henry Cavill: 0.9412
 - Hrithik Roshan: 0.7333
 - Hugh Jackman: 0.7647
 - Jessica Alba: 0.7647
 - Kashyap: 0.6000
 - Lisa Kudrow: 0.9091
 - Margot Robbie: 0.8333
 - Marmik: 0.6000
 - Natalie Portman: 0.9412
 - Priyanka Chopra: 1.0000
 - Robert Downey Jr: 0.8889
 - Roger Federer: 0.7273
 - To

In [74]:
# ============================================
# Webcam -> two captures -> verify (SwinTiny)
# ============================================
# pip install opencv-python timm torch torchvision pillow

import os, sys, math, time, numpy as np
import torch, torch.nn as nn, torch.nn.functional as F
from torchvision import transforms
from PIL import Image
import cv2, timm

# ---------------- Config ----------------
CKPT_PATH     = r"C:\Users\jians\Documents\GitHub\SMART-Barcode-Scanner-and-Face-Recognition\checkpoints_transformer\best.pt"   # your saved checkpoint
TEMPLATES_NPZ = "swin_templates.npz"      # optional (load tau)
CAM_INDEX     = 0                         # webcam index
EXPAND_X      = 0.15                      # face box horizontal expansion
EXPAND_Y      = 0.20                      # face box vertical expansion
MIN_FACE      = (60, 60)                  # min face size in pixels
DEFAULT_TAU   = 0.85                      # fallback threshold
# ----------------------------------------

# ------------- Device setup -------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if device.type == "cuda":
    torch.backends.cudnn.benchmark = True
print("[INFO] Device:", device)

# ------------- Model (embedder) ----------
class L2Norm(nn.Module):
    def __init__(self, eps=1e-6): 
        super().__init__(); self.eps = eps
    def forward(self, x): 
        return F.normalize(x, dim=1, eps=self.eps)

class SwinTinyEmbedder(nn.Module):
    def __init__(self, embed_dim=256):
        super().__init__()
        self.backbone = timm.create_model(
            "swin_tiny_patch4_window7_224",
            pretrained=False, num_classes=0, global_pool="avg"
        )
        in_feats = self.backbone.num_features
        self.proj = nn.Linear(in_feats, embed_dim, bias=False)
        self.l2 = L2Norm()
    def forward(self, x):
        f = self.backbone(x)
        e = self.proj(f)
        return self.l2(e)

# --------- Load checkpoint & config ---------
if not os.path.exists(CKPT_PATH):
    print(f"[ERR] Missing checkpoint: {CKPT_PATH}")
    sys.exit(1)

ckpt = torch.load(CKPT_PATH, map_location="cpu")

# Try to get training-time preprocessing from ckpt; set sensible defaults otherwise
IMSIZE = int(ckpt.get("imsize", 224))  # default 224 (Swin)
MEAN   = ckpt.get("mean", [0.5, 0.5, 0.5])
STD    = ckpt.get("std",  [0.5, 0.5, 0.5])
EMBED_DIM = int(ckpt.get("embed_dim", 256))

print(f"[INFO] IMSIZE={IMSIZE} | EMBED_DIM={EMBED_DIM} | mean={MEAN} | std={STD}")

embedder = SwinTinyEmbedder(embed_dim=EMBED_DIM).to(device)

# Robustly extract an embedder state dict from various checkpoint formats
state = None
candidates = [
    "embedder_state", "model_state_dict", "state_dict", "model_state"
]
for k in candidates:
    if k in ckpt and isinstance(ckpt[k], dict):
        state = ckpt[k]
        break
if state is None and isinstance(ckpt, dict):
    # If top-level are params already
    if all(isinstance(v, torch.Tensor) for v in ckpt.values()):
        state = ckpt

if state is None:
    print("[ERR] Could not find a state_dict in checkpoint.")
    sys.exit(1)

# If keys are prefixed like "embedder.backbone....", strip "embedder."
def strip_prefix(d, prefix="embedder."):
    return { (k[len(prefix):] if k.startswith(prefix) else k): v for k,v in d.items() }

state = strip_prefix(state, "embedder.")
# Filter to only keys that exist in our embedder (prevents unexpected key errors)
expected = embedder.state_dict().keys()
filtered = {k: v for k, v in state.items() if k in expected}

missing, unexpected = embedder.load_state_dict(filtered, strict=False)
if missing:   print("[WARN] Missing keys:", missing[:5], "..." if len(missing)>5 else "")
if unexpected:print("[WARN] Unexpected keys (ignored):", unexpected[:5], "..." if len(unexpected)>5 else "")
embedder.eval()

# ------------- Threshold (tau) -------------
TAU = DEFAULT_TAU
if os.path.exists(TEMPLATES_NPZ):
    try:
        npz = np.load(TEMPLATES_NPZ, allow_pickle=True)
        if "tau" in npz.files:
            val = np.array(npz["tau"]).reshape(-1)[0]
            TAU = float(val)
            print(f"[INFO] Using tau from {TEMPLATES_NPZ}: {TAU:.3f}")
    except Exception as e:
        print("[WARN] Could not load tau from templates:", e)
print(f"[INFO] Threshold tau={TAU:.3f}")

# ------------- Preprocessing --------------
eval_tf = transforms.Compose([
    transforms.Resize((IMSIZE, IMSIZE)),
    transforms.ToTensor(),
    transforms.Normalize(MEAN, STD),
])

@torch.no_grad()
def embed_np_image_bgr(bgr):
    """
    bgr: (H,W,3) uint8 face crop
    returns: (D,) float32 L2-normalized embedding on CPU
    """
    rgb = cv2.cvtColor(bgr, cv2.COLOR_BGR2RGB)
    img = Image.fromarray(rgb)
    x = eval_tf(img).unsqueeze(0).to(device)      # (1,3,H,W)
    e = embedder(x).detach().cpu().numpy()[0]
    return e.astype(np.float32)

def cosine(a, b):  # embeddings are L2 -> dot == cosine
    return float(np.dot(a, b))

# ----------- Face detector utils ----------
def get_face_cascade():
    candidates = [
        "models/haarcascade_frontalface_default.xml",
        os.path.join(os.getcwd(), "haarcascade_frontalface_default.xml"),
        os.path.join(cv2.data.haarcascades, "haarcascade_frontalface_default.xml"),
    ]
    for p in candidates:
        if os.path.exists(p):
            cc = cv2.CascadeClassifier(p)
            if not cc.empty():
                return cc
    raise IOError("Haar cascade not found. Put 'haarcascade_frontalface_default.xml' in ./models/ or project root.")

_FACE = get_face_cascade()

def detect_largest_face(gray):
    faces = _FACE.detectMultiScale(
        gray, scaleFactor=1.1, minNeighbors=4,
        flags=cv2.CASCADE_SCALE_IMAGE, minSize=MIN_FACE
    )
    if len(faces) == 0: return None
    return max(faces, key=lambda r: r[2]*r[3])  # (x,y,w,h)

def expand_box(x, y, w, h, fx=EXPAND_X, fy=EXPAND_Y, W=0, H=0):
    cx, cy = x + w//2, y + h//2
    w2, h2 = int(w*(1+2*fx)), int(h*(1+2*fy))
    x2, y2 = max(0, cx - w2//2), max(0, cy - h2//2)
    x2e, y2e = min(W, x2 + w2), min(H, y2 + h2)
    return x2, y2, max(1, x2e-x2), max(1, y2e-y2)

# -------- Capture one face & embed --------
def capture_and_embed(cam_index=CAM_INDEX, window_name="Camera"):
    cap = cv2.VideoCapture(cam_index, cv2.CAP_DSHOW)
    if not cap.isOpened():
        print("[ERR] Cannot open camera")
        return None
    print("Press SPACE to capture, ESC to quit.")
    emb = None
    while True:
        ok, frame = cap.read()
        if not ok:
            print("[ERR] Frame grab failed"); break
        frame = cv2.flip(frame, 1)  # mirror for user
        gray  = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
        box = detect_largest_face(gray)
        if box is not None:
            x,y,w,h = box
            cv2.rectangle(frame, (x,y), (x+w,y+h), (0,255,0), 2)
            cv2.putText(frame, "Face", (x, y-8), cv2.FONT_HERSHEY_SIMPLEX, 0.6, (0,255,0), 2, cv2.LINE_AA)
        cv2.imshow(window_name, frame)
        k = cv2.waitKey(1) & 0xFF
        if k == 27:  # ESC
            print("Exit."); break
        if k == 32:  # SPACE
            if box is None:
                print("❌ No face detected. Try again.")
                continue
            H, W = frame.shape[:2]
            x,y,w,h = expand_box(*box, W=W, H=H)
            face = frame[y:y+h, x:x+w].copy()
            emb = embed_np_image_bgr(face)
            print("✅ Captured embedding shape:", emb.shape)
            break
    cap.release(); cv2.destroyAllWindows()
    return emb

# ---------- Optional: enroll & verify helpers ----------
def enroll_and_save(name="user1", out_dir="enrollments"):
    os.makedirs(out_dir, exist_ok=True)
    e = capture_and_embed()
    if e is None: 
        print("❌ Enrollment failed.")
        return None
    path = os.path.join(out_dir, f"{name}.npy")
    np.save(path, e.astype(np.float32))
    print(f"✔ Saved: {path}")
    return path

def verify_against_file(path, tau=TAU):
    ref = np.load(path)
    probe = capture_and_embed()
    if probe is None:
        print("❌ Probe capture failed.")
        return None, None
    sim = cosine(ref, probe)
    ok = sim >= tau
    print(f"[VERIFY] cosine={sim:.4f} | tau={tau:.2f} → {'MATCH ✅' if ok else 'NOT MATCH ❌'}")
    return sim, ok

# -------------- Main: 2-capture verify --------------
if __name__ == "__main__":
    e1 = capture_and_embed()
    if e1 is None: sys.exit(0)
    e2 = capture_and_embed()
    if e2 is None: sys.exit(0)

    sim = cosine(e1, e2)
    verdict = "MATCH ✅" if sim >= TAU else "NOT MATCH ❌"
    print(f"\ncosine={sim:.3f}  |  tau={TAU:.3f}  ->  {verdict}")


[INFO] Device: cpu
[INFO] IMSIZE=224 | EMBED_DIM=256 | mean=[0.5, 0.5, 0.5] | std=[0.5, 0.5, 0.5]
[INFO] Threshold tau=0.850
Press SPACE to capture, ESC to quit.
✅ Captured embedding shape: (256,)
Press SPACE to capture, ESC to quit.
✅ Captured embedding shape: (256,)

cosine=0.892  |  tau=0.850  ->  MATCH ✅


In [None]:
# ============================================
# Webcam -> robust capture -> verify (SwinTiny)
# No MediaPipe; Haar only; multi-frame averaging + quality gate
# ============================================
# pip install opencv-python timm torch torchvision pillow

import os, sys, time, math, numpy as np
import torch, torch.nn as nn, torch.nn.functional as F
from torchvision import transforms
from PIL import Image
import cv2, timm
from pathlib import Path

# ---------------- Config ----------------
# Put your absolute or relative checkpoint path here:
# Examples:
#   CKPT_PATH = r"C:\Users\you\model\best.pth"
#   CKPT_PATH = "checkpoints/swin_arcface_best.pth"
CKPT_PATH    = r"C:\Users\jians\Documents\GitHub\SMART-Barcode-Scanner-and-Face-Recognition\checkpoints_transformer\best.pt"  # <-- change this
CAM_INDEX    = 0
DEFAULT_TAU  = 0.85

# Face crop & quality
IMSIZE_FALLBK = 224
EMB_FALLBK    = 256
EXPAND_X      = 0.20     # expand Haar box (left/right)
EXPAND_Y      = 0.30     # expand Haar box (up/down, with more forehead)
MIN_FACE_PX   = 80       # reject tiny faces
MIN_BLUR      = 60.0     # Laplacian var threshold
MIN_BRIGHT    = 35       # mean gray min
MAX_BRIGHT    = 230      # mean gray max
ENROLL_AVG_N  = 5        # frames to average for enrollment
PROBE_AVG_N   = 5        # frames to average for probe

# ------------- Device setup -------------
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
if device.type == "cuda":
    torch.backends.cudnn.benchmark = True
print("[INFO] Device:", device)

# ------------- Path resolve -------------
def resolve_ckpt(path_or_base):
    """
    Accepts:
      - exact file (absolute or relative): *.pth or *.pt
      - base name without ext: will try .pth then .pt
    Returns a normalized absolute path or raises FileNotFoundError.
    """
    p = Path(os.path.expanduser(str(path_or_base))).expanduser()
    # exact file
    if p.suffix.lower() in (".pth", ".pt"):
        if p.exists():
            return p.resolve()
        else:
            raise FileNotFoundError(f"Checkpoint not found: {p}")
    # try with common extensions
    for ext in (".pth", ".pt"):
        cand = (p.parent / (p.name + ext)).resolve()
        if cand.exists():
            return cand
    # if a directory was passed, try common filenames inside
    if p.exists() and p.is_dir():
        for name in ("best.pth", "best.pt", "last.pth", "last.pt"):
            cand = (p / name).resolve()
            if cand.exists():
                return cand
        # fallback: any .pth/.pt, pick newest
        any_cand = list(p.glob("*.pth")) + list(p.glob("*.pt"))
        if any_cand:
            return max(any_cand, key=lambda q: q.stat().st_mtime).resolve()
    raise FileNotFoundError(f"No checkpoint found for '{path_or_base}'")

# ------------- Model (embedder) ----------
class L2Norm(nn.Module):
    def __init__(self, eps=1e-6): super().__init__(); self.eps = eps
    def forward(self, x): return F.normalize(x, dim=1, eps=self.eps)

class SwinTinyEmbedder(nn.Module):
    def __init__(self, embed_dim=256):
        super().__init__()
        self.backbone = timm.create_model(
            "swin_tiny_patch4_window7_224",
            pretrained=False, num_classes=0, global_pool="avg"
        )
        in_feats = self.backbone.num_features
        self.proj = nn.Linear(in_feats, embed_dim, bias=False)
        self.l2 = L2Norm()
    def forward(self, x):
        f = self.backbone(x)
        e = self.proj(f)
        return self.l2(e)

# --------- Load checkpoint & config ---------
try:
    ckpt_path = resolve_ckpt(CKPT_PATH)
except FileNotFoundError as e:
    print("[ERR]", e)
    sys.exit(1)

print(f"[INFO] Using checkpoint: {ckpt_path}")
ckpt = torch.load(str(ckpt_path), map_location="cpu")

# Training-time preprocessing (fall back if missing)
IMSIZE    = int(ckpt.get("imsize", IMSIZE_FALLBK))
EMBED_DIM = int(ckpt.get("embed_dim", EMB_FALLBK))
MEAN      = ckpt.get("mean", [0.5, 0.5, 0.5])
STD       = ckpt.get("std",  [0.5, 0.5, 0.5])
print(f"[INFO] IMSIZE={IMSIZE} | EMBED_DIM={EMBED_DIM} | mean/std={MEAN}/{STD}")

embedder = SwinTinyEmbedder(embed_dim=EMBED_DIM).to(device)

# Robustly extract a state dict
state = None
for k in ("embedder_state","model_state_dict","state_dict","model_state"):
    if k in ckpt and isinstance(ckpt[k], dict):
        state = ckpt[k]; break
# if directly a state dict
if state is None and isinstance(ckpt, dict) and all(isinstance(v, torch.Tensor) for v in ckpt.values()):
    state = ckpt

if state is None:
    print("[ERR] Could not find a state_dict in checkpoint."); sys.exit(1)

# Strip possible prefix "embedder."
state = { (k.split("embedder.",1)[-1] if k.startswith("embedder.") else k): v for k,v in state.items() }

# Filter to expected keys
expected = embedder.state_dict().keys()
state = {k:v for k,v in state.items() if k in expected}

missing, unexpected = embedder.load_state_dict(state, strict=False)
if missing:   print("[WARN] Missing keys:", missing[:6], "..." if len(missing)>6 else "")
if unexpected: print("[WARN] Unexpected keys (ignored):", unexpected[:6], "..." if len(unexpected)>6 else "")
embedder.eval()
print("[INFO] Embedder ready, params loaded.")

# ------------- Threshold (tau) -------------
TAU = DEFAULT_TAU
print(f"[INFO] Threshold tau={TAU:.3f}")

# ------------- Preprocessing --------------
eval_tf = transforms.Compose([
    transforms.Resize((IMSIZE, IMSIZE)),
    transforms.ToTensor(),
    transforms.Normalize(MEAN, STD),
])

@torch.no_grad()
def embed_bgr(face_bgr):
    """Face BGR -> embedding (L2-normalized)"""
    rgb = cv2.cvtColor(face_bgr, cv2.COLOR_BGR2RGB)
    img = Image.fromarray(rgb)
    x = eval_tf(img).unsqueeze(0).to(device)
    e = embedder(x).detach().cpu().numpy()[0].astype(np.float32)
    return e

def cosine(a, b):
    a = a / (np.linalg.norm(a)+1e-9)
    b = b / (np.linalg.norm(b)+1e-9)
    return float(np.dot(a, b))

# ----------- Haar face detector ----------
_HAAR = cv2.CascadeClassifier(cv2.data.haarcascades + "haarcascade_frontalface_default.xml")

def detect_largest(gray):
    faces = _HAAR.detectMultiScale(
        gray, scaleFactor=1.1, minNeighbors=4,
        flags=cv2.CASCADE_SCALE_IMAGE, minSize=(MIN_FACE_PX, MIN_FACE_PX)
    )
    if len(faces) == 0: return None
    return max(faces, key=lambda r: r[2]*r[3])

def expand_box(x, y, w, h, W, H, fx=EXPAND_X, fy=EXPAND_Y):
    cx, cy = x + w//2, y + h//2
    w2, h2 = int(w*(1+2*fx)), int(h*(1+2*fy))
    # bias upward to include forehead
    x2 = max(0, cx - w2//2)
    y2 = max(0, cy - int(0.55*h2))
    x2e, y2e = min(W, x2 + w2), min(H, y2 + h2)
    return x2, y2, max(1, x2e-x2), max(1, y2e-y2)

def crop_face(frame):
    gray = cv2.cvtColor(frame, cv2.COLOR_BGR2GRAY)
    box = detect_largest(gray)
    if box is None: return None
    H, W = frame.shape[:2]
    x,y,w,h = expand_box(*box, W=W, H=H)
    face = frame[y:y+h, x:x+w]
    if face.size == 0: return None
    return cv2.resize(face, (IMSIZE, IMSIZE), interpolation=cv2.INTER_AREA)

# -------------- Quality gate --------------
def quality_ok(face_bgr):
    if face_bgr is None or face_bgr.size == 0:
        return False, "empty"
    gray = cv2.cvtColor(face_bgr, cv2.COLOR_BGR2GRAY)
    if min(gray.shape[:2]) < MIN_FACE_PX:
        return False, "too small"
    blur = cv2.Laplacian(gray, cv2.CV_64F).var()
    if blur < MIN_BLUR: return False, f"blurry({blur:.1f})"
    m = float(gray.mean())
    if m < MIN_BRIGHT: return False, f"dark({m:.1f})"
    if m > MAX_BRIGHT: return False, f"bright({m:.1f})"
    return True, "ok"

# -------------- Capture helpers --------------
def draw_info(frame, msg, color=(0,255,0)):
    cv2.putText(frame, msg, (10, 24), cv2.FONT_HERSHEY_SIMPLEX, 0.7, color, 2, cv2.LINE_AA)

def capture_avg_embedding(n_frames=5, window="Camera"):
    cap = cv2.VideoCapture(CAM_INDEX, cv2.CAP_DSHOW)
    if not cap.isOpened():
        print("[ERR] cannot open camera"); return None
    print(f"Press SPACE to capture {n_frames} frames, ESC to quit.")
    vecs = []
    while True:
        ok, frame = cap.read()
        if not ok: print("[ERR] grab failed"); break
        frame = cv2.flip(frame, 1)
        show = frame.copy()
        draw_info(show, f"SPACE: capture {n_frames} | ESC: exit")
        cv2.imshow(window, show)
        k = cv2.waitKey(1) & 0xFF
        if k == 27:  # ESC
            print("Exit."); vecs=[]; break
        if k == 32:  # SPACE
            taken = 0
            while taken < n_frames:
                ok2, f2 = cap.read()
                if not ok2: break
                f2 = cv2.flip(f2, 1)
                face = crop_face(f2)
                okq, why = quality_ok(face)
                if not okq:
                    draw_info(f2, f"Skip: {why}", (0,0,255)); cv2.imshow(window, f2); cv2.waitKey(1)
                    continue
                vecs.append(embed_bgr(face))
                taken += 1
                draw_info(f2, f"Captured {taken}/{n_frames}", (0,255,0))
                cv2.imshow(window, f2); cv2.waitKey(1)
            break
    cap.release(); cv2.destroyAllWindows()
    if not vecs: return None
    v = np.mean(np.stack(vecs,0), axis=0)
    v = v / (np.linalg.norm(v) + 1e-9)
    return v.astype(np.float32)

# -------------- Verify two captures --------------
def verify_two_captures(tau=DEFAULT_TAU):
    e1 = capture_avg_embedding(ENROLL_AVG_N, window="Enroll")
    if e1 is None: return None, None, None
    e2 = capture_avg_embedding(PROBE_AVG_N, window="Probe")
    if e2 is None: return None, None, None
    sim = cosine(e1, e2)
    ok  = sim >= tau
    print(f"[VERIFY] cosine={sim:.4f} | tau={tau:.2f} → {'MATCH ✅' if ok else 'NOT MATCH ❌'}")
    return e1, e2, (sim, ok)

# -------------- Main --------------
if __name__ == "__main__":
    print("[INFO] Starting two-capture verify (Haar + averaging + quality gate).")
    e1, e2, res = verify_two_captures(tau=DEFAULT_TAU)
    if res is not None:
        sim, ok = res
        print(f"\nResult → cosine={sim:.4f} vs tau={DEFAULT_TAU:.2f} :: {'MATCH ✅' if ok else 'NOT MATCH ❌'}")


[INFO] Device: cpu
[INFO] Using checkpoint: C:\Users\jians\Documents\GitHub\SMART-Barcode-Scanner-and-Face-Recognition\checkpoints_transformer\best.pt
[INFO] IMSIZE=224 | EMBED_DIM=256 | mean/std=[0.5, 0.5, 0.5]/[0.5, 0.5, 0.5]
[INFO] Embedder ready, params loaded.
[INFO] Threshold tau=0.850
[INFO] Starting two-capture verify (Haar + averaging + quality gate).
Press SPACE to capture 5 frames, ESC to quit.
Press SPACE to capture 5 frames, ESC to quit.
[VERIFY] cosine=0.7772 | tau=0.85 → NOT MATCH ❌

Result → cosine=0.7772 vs tau=0.85 :: NOT MATCH ❌
