In [None]:
!pip install torchsummary timm scikit-learn faiss-cpu -q

In [None]:
import warnings, os, copy, time, json, pickle, numpy as np, torch, torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Subset
from torchvision import datasets, transforms, models
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics  import accuracy_score, classification_report, top_k_accuracy_score
from pathlib import Path
from tqdm.auto import tqdm
import timm
import torch.nn.functional as F
from torch.utils.data import WeightedRandomSampler
import faiss

SEED          = 42
torch.manual_seed(SEED); np.random.seed(SEED)

In [None]:
#!git clone https://github.com/hbcbh1999/recaptcha-dataset.git

In [None]:
#!rm -rf ./recaptcha-dataset/Large/Mountain/
#!rm -rf ./recaptcha-dataset/Large/Other/
#!rm -rf ./recaptcha-dataset/Large/readme.txt

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
CLASS_NAMES   = ['Bicycle','Bridge','Bus','Car','Chimney',
                 'Crosswalk','Hydrant','Motorcycle','Palm','Traffic Light']
ROOT_DRIVE = Path('/content/drive/MyDrive/image_matching_challenge')
DATA_DIR      = ROOT_DRIVE / 'db_images'
MODEL_DIR  = ROOT_DRIVE / 'models'
FEAT_DIR   = ROOT_DRIVE / 'feature_spaces'
MODEL_DIR.mkdir(parents=True, exist_ok=True)
FEAT_DIR.mkdir(parents=True, exist_ok=True)
NUM_CLASSES   = len(CLASS_NAMES)
IMG_SIZE      = 224
BATCH_SIZE    = 16            # Convnext: 32 ResNet: 16
NUM_EPOCHS    = 20
LR            = 3e-4
MODEL_NAME    = 'resnet152'    # 'convnext_tiny' | 'resnet152' | 'efficientnet_b3' | 'mobilenet_v3_large' | 'densenet201'
WEIGHTS_OUT   = MODEL_DIR / f'{MODEL_NAME}_best.pth'
FEAT_CACHE    = FEAT_DIR / f'{MODEL_NAME}_features.npz'
KNN_PKL       = MODEL_DIR / f'{MODEL_NAME}_knn.pkl'


# 플래그: True → 새로 학습 / False → 저장본 로딩
TRAIN_CNN     = not WEIGHTS_OUT.exists()
EXTRACT_FEATS = not FEAT_CACHE.exists()
FIT_KNN       = not KNN_PKL.exists()

n_cls = 10

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print('device =', device)

torch.cuda.empty_cache()

In [None]:
"""
Performing data augmentation and setting dataloader
"""
train_tfms = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.RandomHorizontalFlip(), transforms.RandomRotation(10),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])
])
val_tfms = transforms.Compose([
    transforms.Resize((IMG_SIZE, IMG_SIZE)),
    transforms.ToTensor(),
    transforms.Normalize([0.485,0.456,0.406],[0.229,0.224,0.225])
])

full_ds = datasets.ImageFolder(DATA_DIR, transform=train_tfms)
indices = np.random.permutation(len(full_ds))
split   = int(0.8*len(full_ds))
train_idx, val_idx = indices[:split], indices[split:]

train_ds = Subset(full_ds, train_idx)
val_ds   = Subset(copy.deepcopy(full_ds), val_idx)
val_ds.dataset.transform = val_tfms

targets_arr  = np.array([t for _, t in full_ds.samples])
class_counts = np.bincount(targets_arr, minlength=len(CLASS_NAMES))
weights      = 1. / class_counts[targets_arr[train_idx]]
weights = torch.as_tensor(weights, dtype=torch.double)
weights      = torch.DoubleTensor(weights)
train_sampler= WeightedRandomSampler(weights, len(weights), replacement=True)

dataloaders = {
    'train': DataLoader(train_ds, batch_size=BATCH_SIZE,
                        sampler=train_sampler, num_workers=2, pin_memory=True, persistent_workers=False),
    'val'  : DataLoader(val_ds,   batch_size=BATCH_SIZE,
                        shuffle=False, num_workers=2, pin_memory=True, persistent_workers=False)
}
print(f"train / val = {len(train_ds)} / {len(val_ds)}")


class BalancedSoftmaxCE(nn.Module):
    def __init__(self, cls_cnt):
        super().__init__()
        freq = torch.tensor(cls_cnt, dtype=torch.float32)
        self.log_freq = (freq / freq.sum()).log()

    def forward(self, logits, target):
        return F.cross_entropy(logits + self.log_freq.to(logits.device), target)

class ArcFaceLoss(nn.Module):
    def __init__(self, in_feat, n_cls, s=30.0, m=0.50):
        super().__init__()
        self.W = nn.Parameter(torch.randn(n_cls, in_feat))
        nn.init.xavier_uniform_(self.W); self.s, self.m = s, m

    def forward(self, emb, label):
        emb = F.normalize(emb); W = F.normalize(self.W)
        cos = F.linear(emb, W).clamp(-1+1e-7, 1-1e-7)
        theta = torch.acos(cos); target_cos = torch.cos(theta + self.m)
        onehot = torch.zeros_like(cos); onehot.scatter_(1, label.view(-1,1), 1.)
        logits = cos*(1-onehot) + target_cos*onehot
        return F.cross_entropy(self.s*logits, label)

In [None]:
"""
Building Model
"""

def logits_from_emb(model, emb):
    if hasattr(model, 'get_classifier'):          # timm ConvNeXt·ViT …
        return model.get_classifier()(emb)
    elif hasattr(model, 'fc'):                    # torchvision ResNet
        return model.fc(emb)
    elif hasattr(model, 'classifier'):            # EfficientNet·DenseNet 등
        return model.classifier(emb)
    else:
        raise RuntimeError("No classifier head found!")

def build_model(name:str, n_cls:int):
    if name.startswith('convnext'):
        model = timm.create_model(name, pretrained=True,
                                  num_classes=n_cls, global_pool='avg')
        in_feat = model.num_features
        feat_forward = lambda x: torch.flatten(
            model.head.global_pool(model.forward_features(x)), 1)

    elif name == 'resnet152':
        model = models.resnet152(weights=models.ResNet152_Weights.IMAGENET1K_V2)
        in_feat = model.fc.in_features; model.fc = nn.Linear(in_feat, n_cls)
        feat_forward = lambda x: torch.flatten(model.avgpool(
            model.layer4(model.layer3(model.layer2(model.layer1(
            model.relu(model.bn1(model.conv1(x)))))))),1)

    elif name.startswith('efficientnet'):
        model = models.__dict__[name](weights='IMAGENET1K_V1')
        # 마지막 Linear 교체
        in_feat = model.classifier[-1].in_features
        model.classifier[-1] = nn.Linear(in_feat, n_cls)
        # feature 추출 함수
        feat_forward = lambda x: torch.flatten(
            model.avgpool(model.features(x)), 1)
    else: raise ValueError("지원되지 않는 모델")
    return model.to(device), feat_forward, in_feat

model, feat_forward, EMB_DIM = build_model(MODEL_NAME, len(CLASS_NAMES))
print(f"{MODEL_NAME}: {sum(p.numel() for p in model.parameters())/1e6:.2f}M params, feat_dim={EMB_DIM}")

# -----------------------------------------------------------
# 2-1. 학습 루프
# -----------------------------------------------------------
if TRAIN_CNN:
    print(">>> training CNN with BalancedSoftmax + ArcFace")
    crit_bs = BalancedSoftmaxCE(class_counts).to(device)
    arcloss = ArcFaceLoss(EMB_DIM, len(CLASS_NAMES)).to(device)
    optimizer = optim.AdamW(list(model.parameters())+list(arcloss.parameters()),
                            lr=LR, weight_decay=1e-2)
    scheduler = optim.lr_scheduler.CosineAnnealingLR(optimizer, T_max=NUM_EPOCHS)

    best_acc, best_wts = 0., copy.deepcopy(model.state_dict())
    for epoch in range(NUM_EPOCHS):
        print(f"\n[Epoch {epoch+1}/{NUM_EPOCHS}]")
        for phase in ('train','val'):
            model.train() if phase=='train' else model.eval()
            tot_loss = tot_hit = 0
            for X, y in tqdm(dataloaders[phase], leave=False):
                X, y = X.to(device, non_blocking=True), y.to(device, non_blocking=True)
                optimizer.zero_grad(set_to_none=True)
                with torch.set_grad_enabled(phase == 'train'):
                    emb = feat_forward(X)                  # ① 백본 1회
                    if emb.dim() == 4:                     # ② (B,C,H,W)→(B,C)
                        emb = emb.mean(dim=[2, 3])
                    logits = logits_from_emb(model, emb)   # ③ FC만 호출
                    loss   = crit_bs(logits, y) + 0.5 * arcloss(emb, y)
                    if phase == 'train':
                        loss.backward(); optimizer.step()
                tot_loss += loss.item()*len(X)
                tot_hit  += (logits.argmax(1)==y).sum().item()

            epoch_loss = tot_loss/len(dataloaders[phase].dataset)
            epoch_acc  = tot_hit / len(dataloaders[phase].dataset)
            print(f" {phase:5s} | loss {epoch_loss:.4f} | acc {epoch_acc:.4f}")
            if phase=='val' and epoch_acc > best_acc:
                best_acc = epoch_acc
                best_wts = {k: v.cpu().clone() for k,v in model.state_dict().items()}
        scheduler.step()

    print(f"\nBest val acc = {best_acc:.4f}")
    torch.save(best_wts, WEIGHTS_OUT)
else:
    print(">>> Found weights – loading")
    model.load_state_dict(torch.load(WEIGHTS_OUT, map_location=device))


In [None]:
"""
Caching feature vector
"""
def extract_feats(dataset, batch=BATCH_SIZE):
    loader = DataLoader(dataset, batch_size=batch, shuffle=False,
                        num_workers=0, pin_memory=True)
    feats, labels = [], []
    model.eval()
    with torch.inference_mode():
        for X,y in tqdm(loader, desc="extract"):
            X = X.to(device)
            f = feat_forward(X)              # (B,C,H,W) 또는 (B,C)
            if f.dim() == 4:                 # ▲ FIX: 공간 평균
                f = f.mean(dim=[2,3])        # (B,C)
            f = F.normalize(f, p=2, dim=1)   # ▲ FIX: 이제 L2
            feats.append(f.cpu()); labels.append(y)
    paths = [dataset.dataset.samples[i][0] for i in dataset.indices] \
            if isinstance(dataset, Subset) \
            else [p for p,_ in dataset.samples]
    return torch.cat(feats).numpy(), torch.cat(labels).numpy(), np.array(paths)

if EXTRACT_FEATS:
    full_ds_eval           = copy.deepcopy(full_ds)
    full_ds_eval.transform = val_tfms
    feats, labels, paths   = extract_feats(full_ds_eval)
    np.savez_compressed(FEAT_CACHE, feats=feats, labels=labels, paths=paths)
else:
    data   = np.load(FEAT_CACHE, allow_pickle=True)
    feats, labels, paths = data['feats'], data['labels'], data['paths']

In [None]:
train_feats, val_feats = feats[train_idx], feats[val_idx]
train_lbls , val_lbls  = labels[train_idx], labels[val_idx]

gallery_feats = feats
gallery_lbls  = labels

if FIT_KNN:
    knn = KNeighborsClassifier(n_neighbors=5, metric='cosine', weights='distance')
    knn.fit(gallery_feats, gallery_lbls)         # ★MOD
    with open(KNN_PKL, 'wb') as f: pickle.dump(knn, f)
else:
    with open(KNN_PKL, 'rb') as f: knn = pickle.load(f)

val_pred = knn.predict(val_feats)
print("\nKNN val ACC =", accuracy_score(val_lbls, val_pred))
print(classification_report(val_lbls, val_pred, target_names=CLASS_NAMES, digits=3))

# ----- (선택) Faiss 인덱스 – 대규모용 ----------
# faiss_index = faiss.IndexFlatIP(train_feats.shape[1])
# faiss_index.add(train_feats.astype('float32'))

def k_reciprocal_rerank(qf, gf, initial_rank, k1=20, k2=6, lamb=0.3):
    cos   = (gf @ qf.T).squeeze(); dist = 1 - cos
    V = np.zeros_like(dist)
    for i in initial_rank[:k1]:
        sim_i = gf @ gf[i]; ind_i = sim_i.argsort()[-k2:]
        V[i] = sim_i[ind_i].mean()
    V = (V - V.min()) / (V.max() - V.min() + 1e-12)
    return ((1-lamb)*dist + lamb*(1-V)).argsort()

def retrieve(query_idx: int, k: int = 10, k1: int = 20, k2: int = 6, lamb: float = 0.3):
    """
    • query_idx : val 세트에서 질의로 쓸 인덱스
    • k         : 최종으로 돌려줄 개수
    """
    # 1) 쿼리 · 갤러리 feature
    qf = val_feats[query_idx]                       # shape (D,)
    gf = gallery_feats                             # shape (N, D)

    # 2) cosine 거리 (= 1 – cosine similarity) 전체 계산
    cos_all  = gf @ qf               # similarity  (N,)
    dist_all = 1.0 - cos_all         # distance    (N,)

    # 3) 초기 랭크 & k-reciprocal re-rank
    init_rank = dist_all.argsort()[:50]            # 50 개 정도면 충분
    rank      = k_reciprocal_rerank(qf, gf, init_rank,
                                    k1=k1, k2=k2, lamb=lamb)

    # 4) 최종 상위 k
    final_idx = rank[:k]
    neigh_info = [
        (paths[i],
         CLASS_NAMES[gallery_lbls[i]],
         float(dist_all[i]))
        for i in final_idx
    ]
    return neigh_info

print("\n[Query 예시]")
for r,(p,c,d) in enumerate(retrieve(0,10),1):
    print(f"{r:02d}. {Path(p).name:30s} | {c:12s} | dist={d:.4f}")

In [None]:
"""
Performing classification and retrieval for query images
"""

import pickle, numpy as np, torch
from PIL import Image
from sklearn.preprocessing import normalize
from pathlib import Path
from tqdm.auto import tqdm
import csv
import matplotlib.pyplot as plt
from IPython.display import display

# --- 0) Drive 경로 및 파일 존재 여부 -----------------------------------------
ROOT_DRIVE = Path('/content/drive/MyDrive/image_matching_challenge')
WEIGHTS_OUT = ROOT_DRIVE / 'models' / f'{MODEL_NAME}_best.pth'
FEAT_CACHE  = ROOT_DRIVE / 'feature_spaces' / f'{MODEL_NAME}_features.npz'
KNN_PKL     = ROOT_DRIVE / 'models' / f'{MODEL_NAME}_knn.pkl'
RESULT_DIR_1 = ROOT_DRIVE / 'results' / f'{MODEL_NAME}_c2_t1_a1.csv'
RESULT_DIR_2 = ROOT_DRIVE / 'results' / f'{MODEL_NAME}_c2_t2_a2.csv'

assert WEIGHTS_OUT.exists(), "❌ CNN 가중치(pth)가 없습니다."
assert FEAT_CACHE.exists() , "❌ feature 캐시(npz)가 없습니다."
assert KNN_PKL.exists()    , "❌ KNN 모델(pkl)이 없습니다."

# --- 1) 모델 & 보조 객체 로드 -------------------------------------------------
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model, feat_forward, _ = build_model(MODEL_NAME, len(CLASS_NAMES))
model.load_state_dict(torch.load(WEIGHTS_OUT, map_location=device))
model.eval()

# 특징 벡터·레이블·경로 로드 (retrieval용)
data   = np.load(FEAT_CACHE, allow_pickle=True)
feats  = normalize(data['feats'].astype('float32'))      # L2 정규화 이미 했으면 제거
labels = data['labels']
paths  = data['paths']

# KNN 로드
with open(KNN_PKL, 'rb') as f:
    knn = pickle.load(f)

# --- 2) 쿼리 이미지 → 분류 + Retrieval ----------------------------------------
def classify_and_retrieve(query_paths, k=10, verbose=True, pred_csv=RESULT_DIR_1, neigh_csv=RESULT_DIR_2, show_img=True):
    """
    Args
    ----
    query_paths : list[str | Path]
        분류·검색할 이미지 경로 리스트
    k           : int, optional
        Retrieval 상위 k개
    verbose     : bool
        True면 결과를 print, False면 dict 반환만
    """
    pred_rows, neigh_rows = [], []      # CSV에 쓸 레코드 누적
    results = []

    for idx_query, qpath in enumerate(tqdm(query_paths, desc="query")):
        img = Image.open(qpath).convert('RGB')
        if show_img:                                  # ★ 추가
            plt.figure(figsize=(3,3))
            plt.imshow(img); plt.axis('off')
            plt.title(f"Query {idx_query+1}: {Path(qpath).name}")
            plt.show()


        x   = val_tfms(img).unsqueeze(0).to(device)      # (1,3,H,W)
        # 1) 분류
        with torch.inference_mode():
            logits = model(x)
            pred   = logits.argmax(1).item()
            emb_t  = F.normalize(feat_forward(x), dim=1)   # (1, D)
            emb    = emb_t.cpu().numpy().squeeze()

        # 2) 갤러리 전체에 대한 cosine → distance
        cos_all  = feats @ emb               # (N,)
        dist_all = 1.0 - cos_all             # (N,)

        # 3) 초기 랭크 50개 → k-reciprocal 재랭크
        init_rank = dist_all.argsort()[:50]
        rank      = k_reciprocal_rerank(emb, feats, init_rank)
        final_idx = rank[:k]

        # 4) 이웃 정보
        neigh_info = [
            (paths[i], CLASS_NAMES[labels[i]], float(dist_all[i]))
            for i in final_idx
        ]

        # 5) CSV용 행
        qname_png   = f'query{idx_query+1:03}.png'
        pred_rows.append([qname_png, CLASS_NAMES[pred]])

        neigh_labels = [CLASS_NAMES[labels[i]] for i in final_idx]
        neigh_rows.append([qname_png, *neigh_labels])

        if verbose:
            print(f"\n🖼️  Query: {Path(qpath).name}")
            print(f"   ➤ Predicted class: {CLASS_NAMES[pred]}")
            print(f"   ➤ Top-{k} nearest images")
            for r, (p, cls, d) in enumerate(neigh_info, 1):
                print(f"     {r:02d}. {Path(p).name:30s} | {cls:12s} | dist={d:.4f}")

        results.append({
            'query'     : str(qpath),
            'prediction': CLASS_NAMES[pred],
            'neighbors' : neigh_info
        })

    with open(pred_csv,  'w', newline='') as f1:
      csv.writer(f1).writerows(pred_rows)

    with open(neigh_csv, 'w', newline='') as f2:
      csv.writer(f2).writerows(neigh_rows)

    print(f"\n✅ Saved {len(pred_rows)} predictions → {pred_csv}")
    print(f"✅ Saved {len(neigh_rows)} neighbor lists → {neigh_csv}")

    return results

# --- 3) 사용 예 ---------------------------------------------------------------
# ① 단일 이미지
# classify_and_retrieve(['/content/sample_data/cat.png'], k=8)

# ② 폴더 안 모든 JPG
query_dir = Path('/content/drive/MyDrive/image_matching_challenge/query_images')
valid_ext = {'.jpg', '.jpeg', '.png'}
query_list = sorted(p for p in query_dir.rglob('*') if p.suffix.lower() in valid_ext)
print(f"↳ found {len(query_list)} query images")
classify_and_retrieve(query_list, k=10, verbose=True)


In [None]:
# --- Ensemble + Save Results Cell ---
from pathlib import Path
import torch
import torch.nn.functional as F
import numpy as np
from sklearn.preprocessing import normalize
from PIL import Image
import csv

# 사전 정의됨: build_model, val_tfms, NUM_CLASSES, CLASS_NAMES

# 1) 모델 & 데이터 경로 설정
MODEL_DIR   = Path('/content/drive/MyDrive/image_matching_challenge/models')
FEAT_DIR    = Path('/content/drive/MyDrive/image_matching_challenge/feature_spaces')
QUERY_DIR   = Path('/content/drive/MyDrive/image_matching_challenge/query_images')
RESULT_DIR  = Path('/content/drive/MyDrive/image_matching_challenge/results')
RESULT_DIR.mkdir(parents=True, exist_ok=True)

# 2) 앙상블 대상 모델명 및 파일명 프리픽스
MODEL_NAMES = ['convnext_tiny', 'resnet152']
SAVE_PREFIX = '_'.join(MODEL_NAMES) + '_ensemble'
PRED_CSV    = RESULT_DIR / f"{SAVE_PREFIX}_predictions.csv"
NEIGH_CSV   = RESULT_DIR / f"{SAVE_PREFIX}_neighbors.csv"

# 3) 디바이스
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

# 4) 모델 로드
models_dict = {}
for name in MODEL_NAMES:
    m, fwd, _ = build_model(name, NUM_CLASSES)
    m.load_state_dict(torch.load(MODEL_DIR/f'{name}_best.pth', map_location=device))
    m.to(device).eval()
    models_dict[name] = (m, fwd)

# 5) 피쳐 스페이스 로드 및 결합
conv = np.load(FEAT_DIR/'convnext_tiny_features.npz', allow_pickle=True)
res  = np.load(FEAT_DIR/'resnet152_features.npz', allow_pickle=True)
feats = np.concatenate([conv['feats'], res['feats']], axis=1).astype('float32')
feats = normalize(feats)
labels = conv['labels']
paths  = conv['paths']

# 6) 앙상블 분류+리트리벌 + CSV 저장

def ensemble_classify_and_retrieve_and_save(query_paths, k=10):
    pred_rows = []
    neigh_rows = []

    for idx, qpath in enumerate(query_paths, start=1):
        img = Image.open(qpath).convert('RGB')
        x   = val_tfms(img).unsqueeze(0).to(device)

        with torch.inference_mode():
            # logits ensemble
            logits = torch.stack([m(x) for m,fwd in models_dict.values()]).mean(0)
            pred   = logits.argmax(1).item()
            # embedding ensemble
            embs = [F.normalize(fwd(x).view(x.size(0), -1), dim=1)
                    for m,fwd in models_dict.values()]
            emb = torch.cat(embs, dim=1).cpu().numpy().squeeze()

        # retrieval
        dist_all = 1.0 - (feats @ emb)
        idxs     = dist_all.argsort()[:k]

        # CSV rows 준비
        qname = f"query{idx:03}.png"
        pred_rows.append([qname, CLASS_NAMES[pred]])
        neigh_labels = [CLASS_NAMES[labels[i]] for i in idxs]
        neigh_rows.append([qname, *neigh_labels])

    # 저장
    with open(PRED_CSV, 'w', newline='') as f1:
        csv.writer(f1).writerows(pred_rows)
    with open(NEIGH_CSV, 'w', newline='') as f2:
        csv.writer(f2).writerows(neigh_rows)

    print(f"✅ Saved predictions -> {PRED_CSV}")
    print(f"✅ Saved neighbors   -> {NEIGH_CSV}")

    return pred_rows, neigh_rows

# 7) 실행
valid_exts = {'.jpg', '.jpeg', '.png'}
queries    = sorted(p for p in QUERY_DIR.rglob('*') if p.suffix.lower() in valid_exts)
print(f"↳ found {len(queries)} queries")
ensemble_classify_and_retrieve_and_save(queries, k=10)
