## COLAB

In [None]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
!git clone https://github.com/PopovychMariya/CV_LUN_same_room /content/drive/MyDrive/

In [None]:
!cp -r /content/drive/MyDrive/CV_LUN_same_room/research /content/CV_LUN_same_room_research
%cd /content/CV_LUN_same_room_research

/content/CV_LUN_same_room_research


In [None]:
!pip install -r requirements.txt



In [None]:
!unzip -qo "archives/train_images.zip" -d "dataset"

In [None]:
!unzip -qo "archives/test_images.zip" -d "dataset"

In [19]:
!python -m lightglue_keypoints

Preparing dataloaders...
Loading LightGlue weights...
Extracting keypoints for test set...
Loading batches: 100% 42/42 [02:52<00:00,  4.11s/it]
✅ All keypoints detected and saved successfully.


In [20]:
!python -m keypoints_grid

Processing 2665 files for split 'test'...
test split: 100% 2665/2665 [00:38<00:00, 68.57it/s]
✅ All grids generated and saved successfully.


In [None]:
!python -m scripts.archive_folder keypoints/train train_keypoints

Archive created: /content/CV_LUN_same_room_research/archives/train_keypoints.zip


In [21]:
!python -m scripts.archive_folder keypoints/test test_keypoints

Archive created: /content/CV_LUN_same_room_research/archives/test_keypoints.zip


## IMPORTS

In [None]:
import random
from pathlib import Path
import numpy as np
import pandas as pd
import os


import timm
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, Subset, DataLoader
from sklearn.model_selection import train_test_split
from torch.optim.lr_scheduler import LinearLR, CosineAnnealingLR, SequentialLR
from torch.cuda.amp import autocast, GradScaler

from sklearn.metrics import precision_recall_curve, precision_score, recall_score, f1_score

In [None]:
from path_config import (
    ROOT,
    DATASET_FOLDER_PATHS,
    DATASET_ANNOTATIONS,
    TRAIN_LABELS,
    DETECTED_KEYPOINTS,
    MODELS_PATH
)

GRID_DIR = DETECTED_KEYPOINTS["train"]

LABEL_CSV = TRAIN_LABELS

OUT_DIR = ROOT / "runs"
OUT_DIR.mkdir(parents=True, exist_ok=True)

TRAIN_DIR = DATASET_FOLDER_PATHS["train_folder_path"]
TEST_DIR  = DATASET_FOLDER_PATHS["test_folder_path"]
TRAIN_ANN = DATASET_ANNOTATIONS["train_annotation_path"]
TEST_ANN  = DATASET_ANNOTATIONS["test_annotation_path"]

In [None]:
DEVICE = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
SEED = 1337
random.seed(SEED); np.random.seed(SEED); torch.manual_seed(SEED)

## DATA

In [54]:
class GridTrainDataset(Dataset):
    def __init__(self, grid_dir: Path, label_csv: Path):
        self.grid_dir = Path(grid_dir)
        df = pd.read_csv(label_csv)

        keep = []
        for _, row in df.iterrows():
            if (self.grid_dir / row["task_id"] / "grid_G32.npy").exists():
                keep.append(row)

        self.df = pd.DataFrame(keep)
        self.task_ids = self.df["task_id"].values
        self.labels = self.df["label"].astype(np.float32).values

    def __len__(self): return len(self.task_ids)

    def __getitem__(self, idx):
        tid = self.task_ids[idx]
        y = self.labels[idx]
        grid = np.load(self.grid_dir / tid / "grid_G32.npy")
        grid = torch.tensor(grid, dtype=torch.float32)
        return grid, torch.tensor(y, dtype=torch.float32)

In [55]:
class GridTestDataset(Dataset):
    def __init__(self, grid_dir: Path):
        self.grid_dir = Path(grid_dir)
        self.task_ids = sorted([
            p.name for p in self.grid_dir.iterdir()
            if p.is_dir() and (p / "grid_G32.npy").exists()
        ])

    def __len__(self): return len(self.task_ids)

    def __getitem__(self, idx):
        tid = self.task_ids[idx]
        grid = np.load(self.grid_dir / tid / "grid_G32.npy")
        grid = torch.tensor(grid, dtype=torch.float32)
        return grid, tid

In [None]:
# 1) Build dataset + stratified splits (80/10/10)
keys = list(DETECTED_KEYPOINTS["train"])                  # ordered ids for your dataset
y_all = np.array([TRAIN_LABELS[k] for k in keys], int)    # labels aligned to keys
idx_all = np.arange(len(keys))

train_idx, temp_idx = train_test_split(
	idx_all, test_size=0.2, stratify=y_all, random_state=SEED
)
val1_idx, val2_idx = train_test_split(
	temp_idx, test_size=0.5, stratify=y_all[temp_idx], random_state=SEED
)

train_full = GridTrainDataset(DETECTED_KEYPOINTS["train"], TRAIN_LABELS)
train_ds = Subset(train_full, train_idx.tolist())
val1_ds  = Subset(train_full, val1_idx.tolist())
val2_ds  = Subset(train_full, val2_idx.tolist())

In [None]:
train_loader = DataLoader(train_ds, batch_size=32, shuffle=True,  num_workers=8, pin_memory=True)
val1_loader  = DataLoader(val1_ds, batch_size=32, shuffle=False, num_workers=8, pin_memory=True)
val2_loader  = DataLoader(val2_ds, batch_size=32, shuffle=False, num_workers=8, pin_memory=True)

## MODEL

In [None]:
# 1) Typed stems (7ch -> 3ch adapter), mask-gated
class TypedStem(nn.Module):
	def __init__(self):
		super().__init__()
		self.scalar = nn.Sequential(nn.Conv2d(3, 8, 1), nn.GroupNorm(1, 8), nn.GELU())
		self.motion = nn.Sequential(nn.Conv2d(2, 16, 3, padding=1), nn.GroupNorm(2, 16), nn.GELU())
		self.orient = nn.Sequential(nn.Conv2d(2, 8, 3, padding=1), nn.GroupNorm(1, 8), nn.GELU())
		self.drop = nn.Dropout2d(0.10)
		self.to3 = nn.Conv2d(8+16+8, 3, 1)

	def forward(self, x):  # x: [B,7,32,32]
		m = (x[:, :1] > 0).float()
		sc = self.scalar(torch.cat([x[:,0:1], x[:,1:2], x[:,6:7]], dim=1))
		mn = self.motion(x[:,2:4])
		ang = self.orient(x[:,4:6])
		h = torch.cat([sc, mn, ang], dim=1) * m
		h = self.drop(h)
		return self.to3(h)

In [None]:
# 2) Single-tower ConvNeXt feature encoder to 8x8 map
class Tower(nn.Module):
	def __init__(self, backbone='convnext_tiny', drop_path=0.10):
		super().__init__()
		self.stem = TypedStem()
		self.bk = timm.create_model(backbone, features_only=True, out_indices=(0,),
		                            pretrained=True, drop_path_rate=drop_path)
		for p in self.bk.parameters(): p.requires_grad = False

	def forward(self, x):  # x: [B,7,32,32]
		x = self.stem(x)  # -> [B,3,32,32]
		f = self.bk(x)[0] # -> [B,C,8,8]
		f = F.normalize(f, dim=1)  # L2 for cosine
		return f

In [None]:
# 3) Local correlation volume (radius r)
def local_corr(Fa, Fb, r=2):  # Fa,Fb: [B,C,8,8] -> [B,(2r+1)^2,8,8]
	B, C, H, W = Fa.shape
	pad = F.pad(Fb, (r, r, r, r))
	costs = []
	for oy in range(2*r+1):
		for ox in range(2*r+1):
			sh = pad[:, :, oy:oy+H, ox:ox+W]
			costs.append((Fa * sh).sum(1, keepdim=True))
	return torch.cat(costs, dim=1)

In [None]:
# 4) 3D cost-volume aggregator -> 2D map
class Vol3D(nn.Module):
	def __init__(self, c_out=16, p_drop=0.10):
		super().__init__()
		self.do3d = nn.Dropout3d(p_drop)
		self.net = nn.Sequential(
			nn.Conv3d(1, 8, 3, padding=1), nn.GELU(), nn.GroupNorm(1, 8),
			nn.Conv3d(8, 8, 3, padding=1), nn.GELU(),
			nn.Conv3d(8, c_out, 3, padding=1), nn.GELU()
		)

	def forward(self, vol):  # vol: [B,D,H,W]
		v = vol.unsqueeze(1)              # [B,1,D,H,W]
		v = self.do3d(v)                  # dropout along D,H,W
		v = self.net(v)                   # [B,Cd,D,H,W]
		return v.mean(2)                  # -> [B,Cd,H,W]

In [None]:
# 5) Fusion + head
class FusionHead(nn.Module):
	def __init__(self, c_in_bk, c_vol=16, width=64, p_drop=0.20):
		super().__init__()
		self.pa = nn.Conv2d(c_in_bk, 16, 1)
		self.pb = nn.Conv2d(c_in_bk, 16, 1)
		self.pd = nn.Conv2d(c_in_bk, 16, 1)
		self.mix = nn.Conv2d(c_vol + 48, width, 1)
		self.drop = nn.Dropout(p_drop)
		self.mlp = nn.Sequential(nn.Linear(width, width//2), nn.GELU(), nn.Linear(width//2, 1))

	def forward(self, agg2d, Fa, Fb):
		f = torch.cat([agg2d, self.pa(Fa), self.pb(Fb), self.pd(Fa - Fb)], dim=1)
		z = self.mix(f).mean(dim=(2,3))
		z = self.drop(z)
		return self.mlp(z).squeeze(1)  # [B]

In [None]:
# 6) Full pair model
class PairSameRoomModel(nn.Module):
	def __init__(self, backbone='convnext_tiny', r=2):
		super().__init__()
		self.towerA = Tower(backbone)
		self.towerB = Tower(backbone)
		# peek a dummy to get backbone out-channels
		with torch.no_grad():
			c_bk = timm.create_model(backbone, features_only=True, out_indices=(0,), pretrained=False)(torch.zeros(1,3,32,32))[0].shape[1]
		self.r = r
		self.vol = Vol3D(c_out=16)
		self.head = FusionHead(c_in_bk=c_bk, c_vol=16, width=64)

	def freeze_backbone(self, freeze=True):
		for t in [self.towerA.bk, self.towerB.bk]:
			for p in t.parameters(): p.requires_grad = not (freeze)

	def forward(self, A7, B7):  # [B,7,32,32] each
		Fa = self.towerA(A7)
		Fb = self.towerB(B7)
		vol = local_corr(Fa, Fb, r=self.r)  # [B,D,8,8]
		agg = self.vol(vol)                 # [B,16,8,8]
		return self.head(agg, Fa, Fb)       # logits [B]

## TRAINING

In [None]:
model = PairSameRoomModel(backbone='convnext_tiny', r=2).to(DEVICE)
model.freeze_backbone(True)

In [None]:
epochs = 20
lr, wd = 3e-4, 1e-2
eta_min = 1e-6
steps_per_epoch = len(train_loader)
total_steps = epochs * steps_per_epoch
warmup_steps = max(1, int(0.1 * total_steps))
ckpt_path = MODELS_PATH / "best_convnext_siamese.pt"

In [None]:
y_train = y_all[train_idx]
n_pos = int((y_train == 1).sum())
n_neg = int((y_train == 0).sum())
pos_weight_value = n_neg / max(n_pos, 1)

print(f"[train] pos={n_pos}, neg={n_neg}, pos_weight={pos_weight_value:.3f}")

In [None]:
crit  = nn.BCEWithLogitsLoss(pos_weight=torch.tensor([pos_weight_value], device=DEVICE))
opt   = torch.optim.AdamW(model.parameters(), lr=lr, weight_decay=wd, betas=(0.9, 0.999))
sched = SequentialLR(
	opt,
	schedulers=[
		LinearLR(opt, start_factor=1e-3, end_factor=1.0, total_iters=warmup_steps),
		CosineAnnealingLR(opt, T_max=total_steps - warmup_steps, eta_min=eta_min),
	],
	milestones=[warmup_steps],
)
scaler = GradScaler(enabled=(DEVICE.type == 'cuda'))

In [None]:
def train_step(batch, clip=1.0):
	model.train()
	A7, B7, y = batch
	A7, B7, y = A7.to(DEVICE), B7.to(DEVICE), y.float().to(DEVICE)

	opt.zero_grad(set_to_none=True)
	with autocast(enabled=scaler is not None):
		logits = model(A7, B7)
		loss = crit(logits, y)

	if scaler is None:
		loss.backward()
		nn.utils.clip_grad_norm_(model.parameters(), clip)
		opt.step()
	else:
		scaler.scale(loss).backward()
		scaler.unscale_(opt)
		nn.utils.clip_grad_norm_(model.parameters(), clip)
		scaler.step(opt); scaler.update()

	sched.step()
	return float(loss.detach()), opt.param_groups[0]['lr']

In [None]:
@torch.no_grad()
def predict_proba(model, loader, device=DEVICE):
	model.eval()
	ps, ys = [], []
	for A7, B7, y in loader:
		p = torch.sigmoid(model(A7.to(device), B7.to(device))).cpu()
		ps.append(p); ys.append(y.cpu().float())
	return torch.cat(ps), torch.cat(ys)

def f1_pr_rec_at_threshold(p, y, thr):
	if hasattr(p, "detach"): p = p.detach().cpu().numpy()
	if hasattr(y, "detach"): y = y.detach().cpu().numpy().astype(int)
	pb = (p >= thr).astype(int)
	return (
		float(f1_score(y, pb, zero_division=0)),
		float(precision_score(y, pb, zero_division=0)),
		float(recall_score(y, pb, zero_division=0)),
	)

def pick_best_threshold(p, y):
	if hasattr(p, "detach"): p = p.detach().cpu().numpy()
	if hasattr(y, "detach"): y = y.detach().cpu().numpy().astype(int)

	prec, rec, th = precision_recall_curve(y, p)
	f1 = 2*prec*rec/(prec+rec+1e-9)
	f1_t = f1[1:]
	i = int(np.nanargmax(f1_t))
	return {"thr": float(th[i]), "f1": float(f1_t[i]),
	        "prec": float(prec[i+1]), "rec": float(rec[i+1])}

In [None]:
def save_ckpt(path, model, opt, epoch, best_thr, best_val1_f1, backbone='convnext_tiny', r=2):
    torch.save({
        "model_state": model.state_dict(),
        "optimizer_state": opt.state_dict(),
        "epoch": epoch,
        "best_thr": float(best_thr),
        "best_val1_f1": float(best_val1_f1),
        "config": {"backbone": backbone, "r": r}
    }, path)

def load_ckpt(path, model, opt=None, map_location="cpu"):
	data = torch.load(path, map_location=map_location)
	model.load_state_dict(data["model_state"])
	if opt is not None and "optimizer_state" in data:
		opt.load_state_dict(data["optimizer_state"])
	return data

In [None]:
best_f1, best_thr = 0.0, 0.5

In [None]:
for ep in range(1, epochs+1):
    # ---- train ----
    model.train()
    running = 0.0
    for batch in train_loader:
        loss, lr = train_step(batch)
        running += loss
    avg_loss = running / max(1, len(train_loader))

    # ---- val1: pick threshold ----
    p1, y1 = predict_proba(model, val1_loader)
    sel = pick_best_threshold(p1, y1)
    best_thr = sel["thr"]
    val1_f1, val1_pr, val1_rc = sel["f1"], sel["prec"], sel["rec"]

    # ---- val2: evaluate at that threshold ----
    p2, y2 = predict_proba(model, val2_loader)
    val2_f1, val2_pr, val2_rc = f1_pr_rec_at_threshold(p2, y2, best_thr)

    # ---- log ----
    print(f"Epoch {ep:03d} | loss {avg_loss:.4f} | lr {lr:.2e} | "
            f"val1 F1 {val1_f1:.4f} @ {best_thr:.2f} (P {val1_pr:.3f}, R {val1_rc:.3f}) | "
            f"val2 F1 {val2_f1:.4f} (P {val2_pr:.3f}, R {val2_rc:.3f})")

    # ---- checkpoint on val1 F1 improvement ----
    if val1_f1 > best_f1 + 1e-6:
        best_f1 = val1_f1
        save_ckpt(ckpt_path, model, opt, ep, best_thr, best_f1)
        # optional: keep a 'best' symlink/name if you’re civilized

print(f"Best val1 F1={best_f1:.4f} at thr={best_thr:.2f}. "
        f"Checkpoint saved to {os.path.abspath(ckpt_path)}")