In [None]:
import os
import random
import numpy as np
from tqdm import tqdm
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader
import open_clip
from sketch_dataset import SketchDataset
import torch.nn.functional as F
import math

# ---------------------------
# Configuration
# ---------------------------
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
SEED = 42
BATCH_SIZE = 32
NUM_EPOCHS = 30
LR = 5e-6
WEIGHT_DECAY = 0.01
QUANTILE = 0.7
WARMUP_EPOCHS = 5
best_fine_acc = 0

SAVE_DIR = r"F:\\CLIP+MMD\\CTF\\pth"
os.makedirs(SAVE_DIR, exist_ok=True)
random.seed(SEED)
torch.manual_seed(SEED)
torch.backends.cudnn.benchmark = True
scaler = torch.amp.GradScaler(device='cuda')

# ---------------------------
# Dataset and DataLoader
# ---------------------------
train_ds = SketchDataset(split='train', root_dir=r"F:\\CLIP+MMD\\dataset")
val_ds = SketchDataset(split='val', root_dir=r"F:\\CLIP+MMD\\dataset")

_, _, preprocess = open_clip.create_model_and_transforms("ViT-B/32-quickgelu", pretrained="openai")
train_ds.transform = preprocess
val_ds.transform = preprocess

train_loader = DataLoader(train_ds, batch_size=BATCH_SIZE, shuffle=True, num_workers=0, pin_memory=True)
val_loader = DataLoader(val_ds, batch_size=BATCH_SIZE, shuffle=False, num_workers=0, pin_memory=True)

NUM_FINE = len(train_ds.classes)

# ---------------------------
# Model and optimizer
# ---------------------------
clip_model, _, _ = open_clip.create_model_and_transforms("ViT-B/32-quickgelu", pretrained="openai")
clip_model = clip_model.to(DEVICE)
for p in clip_model.parameters():
    p.requires_grad = False
for p in clip_model.visual.parameters():
    p.requires_grad = True

# ==========================
# LoRA Layer
# ==========================
class LoRALinear(nn.Module):
    def __init__(self, orig_linear, r=4, alpha=1.0):
        super().__init__()
        self.orig = orig_linear
        self.lora_A = nn.Linear(orig_linear.in_features, r, bias=False)
        self.lora_B = nn.Linear(r, orig_linear.out_features, bias=False)
        self.alpha = alpha
        nn.init.kaiming_uniform_(self.lora_A.weight, a=math.sqrt(5))
        nn.init.zeros_(self.lora_B.weight)

    def forward(self, x):
        return self.orig(x) + self.alpha * self.lora_B(self.lora_A(x))

# ==========================
# Insert LoRA
# ==========================
def insert_lora(model, r=4, alpha=1.0, device="cpu"):
    # vision_model와 text_model의 MLP fc 레이어를 LoRA로 교체합니다.
    for layer in model.vision_model.encoder.layers:
        layer.mlp.fc1 = LoRALinear(layer.mlp.fc1, r, alpha).to(device)
        layer.mlp.fc2 = LoRALinear(layer.mlp.fc2, r, alpha).to(device)
    for layer in model.text_model.encoder.layers:
        layer.mlp.fc1 = LoRALinear(layer.mlp.fc1, r, alpha).to(device)
        layer.mlp.fc2 = LoRALinear(layer.mlp.fc2, r, alpha).to(device)

FIXED_LOGIT_SCALE = 1 / 0.07

fine_prompts = [f"a photo of a {c}" for c in train_ds.classes]
with torch.no_grad():
    text_fine_feats = clip_model.encode_text(open_clip.tokenize(fine_prompts).to(DEVICE))
    text_fine_feats /= text_fine_feats.norm(dim=-1, keepdim=True)

optimizer = optim.AdamW(
    list(clip_model.visual.parameters()) + [clip_model.logit_scale],
    lr=LR, weight_decay=WEIGHT_DECAY
)

scheduler = optim.lr_scheduler.LambdaLR(
    optimizer,
    lr_lambda=lambda epoch: (epoch + 1) / WARMUP_EPOCHS if epoch < WARMUP_EPOCHS else 0.5 * (1 + np.cos(np.pi * (epoch - WARMUP_EPOCHS) / (NUM_EPOCHS - WARMUP_EPOCHS)))
)

# ---------------------------
# QWM Loss
# ---------------------------
def qwm_loss(logits, labels, alpha=QUANTILE):
    probs = F.softmax(logits, dim=1)
    confs = probs[range(len(labels)), labels]
    q = torch.quantile(confs.detach(), alpha)

    weights = torch.where(
        confs < q,
        1.0 / (confs + 1e-6),
        torch.ones_like(confs)
    )
    ce = F.cross_entropy(logits, labels, reduction='none')
    return (weights * ce).mean()

# ---------------------------
# Training & Validation
# ---------------------------
def train_one_epoch(loader, epoch):
    clip_model.train()
    total_loss = 0.0

    for images, fine_lbls, _ in tqdm(loader, desc=f"Train E{epoch}"):
        images = images.to(DEVICE)
        fine_lbls = fine_lbls.to(DEVICE)

        optimizer.zero_grad()
        with torch.amp.autocast("cuda"):
            feats = clip_model.encode_image(images)
            feats = feats / feats.norm(dim=-1, keepdim=True)
            logits_f = FIXED_LOGIT_SCALE * feats @ text_fine_feats.T
            loss = qwm_loss(logits_f, fine_lbls)

        scaler.scale(loss).backward()
        scaler.step(optimizer)
        scaler.update()
        total_loss += loss.item()

    return total_loss / len(loader)

def validate(loader):
    clip_model.eval()
    total_loss, correct, total = 0, 0, 0
    with torch.no_grad():
        for images, fine_lbls, _ in tqdm(loader, desc="Validation"):
            images = images.to(DEVICE)
            fine_lbls = fine_lbls.to(DEVICE)
            feats = clip_model.encode_image(images)
            feats = feats / feats.norm(dim=-1, keepdim=True)
            logits_f = FIXED_LOGIT_SCALE * feats @ text_fine_feats.T

            total_loss += F.cross_entropy(logits_f, fine_lbls).item()
            correct += (logits_f.argmax(dim=1) == fine_lbls).sum().item()
            total += images.size(0)

    return total_loss / len(loader), correct / total

# ---------------------------
# Training loop
# ---------------------------
print("Starting training...")
for epoch in range(1, NUM_EPOCHS + 1):
    print(f"Epoch {epoch}/{NUM_EPOCHS}")
    train_loss = train_one_epoch(train_loader, epoch)
    val_loss, fine_acc = validate(val_loader)
    scheduler.step()

    print(f"Epoch {epoch} | LR: {scheduler.get_last_lr()[0]:.2e} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | Fine Acc: {fine_acc:.4f}")

    if fine_acc > best_fine_acc:
        best_fine_acc = fine_acc
        torch.save({
            'model_state': clip_model.state_dict(),
            'logit_scale': clip_model.logit_scale.detach().cpu(),
        }, os.path.join(SAVE_DIR, 'qwmbest.pth'))
        print(f"→ New best fine-accuracy: {fine_acc:.4f}")

print(f"Training complete. Best fine accuracy: {best_fine_acc:.4f}")


In [None]:
import os
import random
import torch
import numpy as np
from tqdm import tqdm
from sklearn.metrics import accuracy_score, precision_recall_fscore_support, confusion_matrix
import seaborn as sns
import matplotlib.pyplot as plt
import open_clip
from sketch_dataset import SketchDataset

# ---------------------------
# ✅ 재현을 위한 시드 설정
# ---------------------------
SEED = 42
random.seed(SEED)
np.random.seed(SEED)
torch.manual_seed(SEED)
if torch.cuda.is_available():
    torch.cuda.manual_seed_all(SEED)

# ---------------------------
# 설정
# ---------------------------
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model_name = "supconcf_con"
BATCH_SIZE = 128
CHECKPOINT_PATH = r"F:\CLIP+MMD\CTF\pth\qwmbest.pth"

# ---------------------------
# 데이터셋
# ---------------------------
test_ds = SketchDataset(split='test', root_dir=r"F:\CLIP+MMD\dataset")
_, _, preprocess = open_clip.create_model_and_transforms("ViT-B/32-quickgelu", pretrained="openai")
test_ds.transform = preprocess

test_loader = torch.utils.data.DataLoader(
    test_ds,
    batch_size=BATCH_SIZE,
    shuffle=False,
    num_workers=4,
    pin_memory=True
)

NUM_FINE = len(test_ds.classes)
fine_prompts = [f"a photo of a {c}" for c in test_ds.classes]
fine_tokens = open_clip.tokenize(fine_prompts).to(DEVICE)

# ---------------------------
# 모델 로드
# ---------------------------
clip_model, _, _ = open_clip.create_model_and_transforms("ViT-B/32-quickgelu", pretrained="openai")
clip_model = clip_model.to(DEVICE)
clip_model.eval()

checkpoint = torch.load(CHECKPOINT_PATH, map_location=DEVICE)
clip_model.load_state_dict(checkpoint['model_state'], strict=False)  # strict=False for safety
clip_model.logit_scale.data = checkpoint['logit_scale'].to(DEVICE)

with torch.no_grad():
    text_fine_feats = clip_model.encode_text(fine_tokens)
    text_fine_feats = text_fine_feats / text_fine_feats.norm(dim=-1, keepdim=True)

# ---------------------------
# 테스트 평가
# ---------------------------
all_preds = []
all_targets = []
top3_correct = 0
top5_correct = 0
with torch.no_grad():
    for images, fine_lbls, _ in tqdm(test_loader, desc="[Test]"):
        images = images.to(DEVICE)
        fine_lbls = fine_lbls.to(DEVICE)

        with torch.cuda.amp.autocast():
            img_feats = clip_model.encode_image(images)
            img_feats = img_feats / img_feats.norm(dim=-1, keepdim=True)
            scale = clip_model.logit_scale.exp()
            logits = scale * img_feats @ text_fine_feats.T
            preds = logits.argmax(dim=1)

            top5_preds = torch.topk(logits, k=5, dim=1).indices  # (B, 5)
            top3_preds = top5_preds[:, :3]

        for i in range(images.size(0)):
            label = fine_lbls[i].item()
            if label in top3_preds[i]:
                top3_correct += 1
            if label in top5_preds[i]:
                top5_correct += 1

        all_preds.extend(preds.cpu().numpy())
        all_targets.extend(fine_lbls.cpu().numpy())

# ---------------------------
# ✅ 지표 계산
# ---------------------------
accuracy = accuracy_score(all_targets, all_preds)
precision, recall, f1, _ = precision_recall_fscore_support(
    all_targets, all_preds, average='macro', zero_division=0
)

top3_acc = (top3_correct / len(test_ds)) * 100
top5_acc = (top5_correct / len(test_ds)) * 100

print(f"\n[✓] Top-1 Accuracy   : {accuracy  * 100:.2f}%")
print(f"[✓] Top-3 Accuracy     : {top3_acc:.2f}%")
print(f"[✓] Top-5 Accuracy     : {top5_acc:.2f}%")
print(f"[✓] Macro Precision    : {precision * 100:.2f}%")
print(f"[✓] Macro Recall       : {recall    * 100:.2f}%")
print(f"[✓] Macro F1-score     : {f1        * 100:.2f}%")

# ---------------------------
# 정확도 계산
# ---------------------------
class_correct = np.zeros(NUM_FINE)
class_total = np.zeros(NUM_FINE)

for pred, target in zip(all_preds, all_targets):
    if pred == target:
        class_correct[target] += 1
    class_total[target] += 1

class_accuracy = (class_correct / class_total) * 100  # 퍼센트로 변환

# 클래스명과 정확도를 묶어서 튜플 리스트로 만들기
class_accuracy_list = [(test_ds.classes[idx], class_accuracy[idx]) for idx in range(NUM_FINE)]

# 정확도 기준으로 내림차순 정렬
class_accuracy_list.sort(key=lambda x: x[1], reverse=True)

# 저장 경로 생성
save_dir = r"F:\CLIP+MMD\accuracy"
os.makedirs(save_dir, exist_ok=True)
save_path = os.path.join(save_dir, f"{model_name}_accuracy.txt")

# 저장
with open(save_path, "w") as f:
    f.write(f"[✓] Top-1 Accuracy     : {accuracy  * 100:.2f}%\n")
    f.write(f"[✓] Top-3 Accuracy     : {top3_acc:.2f}%\n")
    f.write(f"[✓] Top-5 Accuracy     : {top5_acc:.2f}%\n")
    f.write(f"[✓] Macro Precision    : {precision * 100:.2f}%\n")
    f.write(f"[✓] Macro Recall       : {recall    * 100:.2f}%\n")
    f.write(f"[✓] Macro F1-score     : {f1        * 100:.2f}%\n\n")
    for class_name, acc in class_accuracy_list:
        f.write(f"{class_name:30s} : {acc:.2f}%\n")

In [None]:
pip install seaborn