In [None]:
# -*- coding: utf-8 -*-
# mvtec2_clip_pipeline.ipynb
# 整合版 Pipeline：gen_txt + train + inference + 統計 AUROC

import os
import random
import json
import csv
from pathlib import Path
from tqdm import tqdm
from PIL import Image, ImageOps
import shutil
import numpy as np
import torch
from torch.utils.data import Dataset, DataLoader
from torch.optim import AdamW
from torch.optim.lr_scheduler import CosineAnnealingLR
from transformers import CLIPProcessor, CLIPModel
from sklearn.metrics import roc_auc_score
import pandas as pd

# ================== 全域設定 ==================
MVTEC2_CLASSES = [
    "fabric"
]

BASE_DIR = Path(r"C:\Users\anywhere4090\Desktop\0902 finalcode\dataset\mvtec2")
CHECKPOINT_ROOT = Path("clipcheckpoints")
OUT_BASE_DIR = Path(r"C:\Users\anywhere4090\Desktop\DDAD-main")
RESULTS_CSV = OUT_BASE_DIR / "mvtec2_clip_results.csv"

IMGSIZE = 256
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# 跑三組 (3,3) (4,4) (5,5)
CONFIGS = [(3,3), (4,4), (5,5)]


# ======================================================
# Part 1. gen_txt (生成 txt + clip2/clip4)
# ======================================================
NORMAL_PROMPTS = ["a flawless {}", "a good {}", "a perfect {}"]
ANOMALY_PROMPTS = [
    "a defective {}", "a broken {}", "a damaged {}", "a cracked {}",
    "a faulty {}", "a scratched {}", "a {} with visible defects",
    "a malfunctioning {}", "a {} with flaws", "a deteriorated {}"
]
IMG_EXTS = {".png", ".jpg", ".jpeg"}
TXT_SUFFIX = ".txt"
IMG_COPY_FORMAT = "{:03d}.png"
TXT_COPY_FORMAT = "{:03d}.txt"

class TextGenerator:
    def __init__(self, folder: Path, clip2_aug_num: int, clip4_aug_num: int):
        self.folder = folder
        self.images = sorted([p for p in self.folder.iterdir() if p.suffix.lower() in IMG_EXTS])
        self.class_name = self.folder.parent.parent.name
        self.clip2_aug_num = clip2_aug_num
        self.clip4_aug_num = clip4_aug_num

    def reset_dir(self, out_dir: Path):
        out_dir.mkdir(parents=True, exist_ok=True)
        for file in out_dir.iterdir():
            if file.is_file():
                file.unlink()
            elif file.is_dir():
                shutil.rmtree(file)

    def _save_resized(self, src_img: Image.Image, dst_img: Path):
        src_img.resize((IMGSIZE, IMGSIZE), Image.BICUBIC).save(dst_img)

    def generate_good_prompts(self):
        for img_path in tqdm(self.images, desc=f"[{self.class_name}] GOOD", unit="img"):
            txt_path = img_path.with_suffix(TXT_SUFFIX)
            prompt = random.choice(NORMAL_PROMPTS).format(self.class_name)
            with open(txt_path, "w", encoding="utf-8") as f:
                f.write(prompt)

    def augment_good_prompts(self, out_dir: Path):
        self.reset_dir(out_dir)
        counter = 0
        for img_path in tqdm(self.images, desc=f"[{self.class_name}] CLIP2", unit="img"):
            for k in range(self.clip2_aug_num):
                dst_img = out_dir / IMG_COPY_FORMAT.format(counter)
                img = Image.open(img_path).convert("RGB")
                self._save_resized(img, dst_img)
                txt_path = out_dir / TXT_COPY_FORMAT.format(counter)
                prompt = random.choice(NORMAL_PROMPTS).format(self.class_name)
                with open(txt_path, "w", encoding="utf-8") as f:
                    f.write(prompt)
                counter += 1

    def augment_good_clip4(self, out_dir: Path):
        self.reset_dir(out_dir)
        counter = 0
        for img_path in tqdm(self.images, desc=f"[{self.class_name}] CLIP4", unit="img"):
            img = Image.open(img_path).convert("RGB")
            for k in range(self.clip4_aug_num):
                angle = random.choice([0, 15, -15, 30, -30])
                scale = random.uniform(0.8, 1.2)
                w, h = img.size
                new_w, new_h = int(w * scale), int(h * scale)
                aug_img = img.resize((new_w, new_h), Image.BICUBIC).rotate(angle, expand=True)
                if random.random() < 0.5:
                    aug_img = ImageOps.mirror(aug_img)
                if random.random() < 0.3:
                    aug_img = ImageOps.flip(aug_img)
                dst_img = out_dir / IMG_COPY_FORMAT.format(counter)
                self._save_resized(aug_img, dst_img)
                txt_path = out_dir / TXT_COPY_FORMAT.format(counter)
                prompt = random.choice(NORMAL_PROMPTS).format(self.class_name)
                with open(txt_path, "w", encoding="utf-8") as f:
                    f.write(prompt)
                counter += 1

def run_gen_txt(cls, clip2_num, clip4_num):
    src_dir = BASE_DIR / cls / "train" / "good"
    gen = TextGenerator(src_dir, clip2_num, clip4_num)
    gen.generate_good_prompts()
    gen.augment_good_prompts(src_dir.parent / "clip2")
    gen.augment_good_clip4(src_dir.parent / "clip4")

# ======================================================
# Part 2. train_clip (訓練)
# ======================================================
class ImageTextPairDataset(Dataset):
    def __init__(self, base_dir: Path):
        self.samples = []
        self.class_name = base_dir.name
        for sub in ["good", "clip2", "clip4"]:
            p = base_dir / f"train/{sub}"
            if not p.exists():
                continue
            for img_path in sorted(p.glob("*.png")):
                txt_path = img_path.with_suffix(".txt")
                if txt_path.exists():
                    self.samples.append((img_path, txt_path))

    def __len__(self): return len(self.samples)
    def __getitem__(self, idx):
        img_path, txt_path = self.samples[idx]
        img = Image.open(img_path).convert("RGB").resize((IMGSIZE, IMGSIZE))
        text = Path(txt_path).read_text().strip()
        return {"image": img, "text": text, "path": str(img_path)}

def collate_fn(batch, processor):
    images = [b["image"] for b in batch]
    texts = [b["text"] for b in batch]
    inputs = processor(text=texts, images=images, return_tensors="pt", padding=True)
    inputs["paths"] = [b["path"] for b in batch]
    return inputs

def clip_loss(image_embeds, text_embeds, logit_scale):
    img = image_embeds / (image_embeds.norm(dim=-1, keepdim=True)+1e-12)
    txt = text_embeds / (text_embeds.norm(dim=-1, keepdim=True)+1e-12)
    logits = torch.matmul(img, txt.t()) * logit_scale
    labels = torch.arange(img.size(0), device=img.device)
    loss_i = torch.nn.functional.cross_entropy(logits, labels)
    loss_t = torch.nn.functional.cross_entropy(logits.t(), labels)
    return (loss_i + loss_t) / 2.0

def run_train_clip(cls):
    base_dir = BASE_DIR / cls
    dataset = ImageTextPairDataset(base_dir)
    processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")
    dataloader = DataLoader(dataset, batch_size=64, shuffle=True,
                            collate_fn=lambda b: collate_fn(b, processor))

    model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32").to(DEVICE)
    optimizer = AdamW(model.parameters(), lr=5e-7)
    scheduler = CosineAnnealingLR(optimizer, T_max=2)

    ckpt_root = CHECKPOINT_ROOT / cls
    ckpt_root.mkdir(parents=True, exist_ok=True)

    for epoch in range(2):
        total_loss = 0
        for batch in tqdm(dataloader, desc=f"[{cls}] Epoch {epoch+1}", unit="batch"):
            inputs = {k:v.to(DEVICE) for k,v in batch.items() if k!="paths"}
            outputs = model(**inputs)
            loss = clip_loss(outputs.image_embeds, outputs.text_embeds, model.logit_scale.exp())
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            total_loss += loss.item()
        scheduler.step()
        print(f"[{cls}] Epoch {epoch+1} - AvgLoss {total_loss/len(dataloader):.6f}")
        model.save_pretrained(ckpt_root / f"epoch_{epoch+1}")
        processor.save_pretrained(ckpt_root / f"epoch_{epoch+1}")

# ======================================================
# Part 3. inference (計算 AUROC)
# ======================================================
def anomaly_score(model, processor, image, text_prompt):
    inputs = processor(images=image, text=[text_prompt], return_tensors="pt", padding=True).to(DEVICE)
    with torch.no_grad():
        outputs = model(**inputs)
    img = outputs.image_embeds
    txt = outputs.text_embeds
    sim = torch.matmul(img/img.norm(dim=-1,keepdim=True),
                       (txt/txt.norm(dim=-1,keepdim=True)).t()).item()
    return 1.0 - sim

def run_inference(cls, prompt="a flawless {}"):
    model_dir = CHECKPOINT_ROOT / cls / "epoch_2"
    processor = CLIPProcessor.from_pretrained(str(model_dir))
    model = CLIPModel.from_pretrained(str(model_dir)).to(DEVICE)
    model.eval()

    test_root = BASE_DIR / cls / "test"
    y_true, y_score = [], []
    for sub in ["good"] + [d.name for d in test_root.iterdir() if d.is_dir() and d.name!="good"]:
        label = 0 if sub=="good" else 1
        for img_path in (test_root/sub).glob("*.png"):
            img = Image.open(img_path).convert("RGB").resize((IMGSIZE, IMGSIZE))
            score = anomaly_score(model, processor, img, prompt.format(cls))
            y_true.append(label); y_score.append(score)

    return roc_auc_score(y_true, y_score)

# ======================================================
# Part 4. Pipeline 主程式
# ======================================================
results = []
for c2, c4 in CONFIGS:
    print(f"\n===== Running config (CLIP2={c2}, CLIP4={c4}) =====")
    for cls in MVTEC2_CLASSES:
        print(f"\n>>> Class: {cls}")
        run_gen_txt(cls, c2, c4)
        run_train_clip(cls)
        auroc = run_inference(cls)
        print(f"[RESULT] {cls} clip2={c2} clip4={c4} AUROC={auroc:.4f}")
        results.append({"class": cls, "clip2": c2, "clip4": c4, "auroc": auroc})

# 存檔
df = pd.DataFrame(results)
df.to_csv(RESULTS_CSV, index=False)
df


In [4]:
# -*- coding: utf-8 -*-
# mvtec2_winclip_pipeline.py
# WinCLIP Pipeline：CPE prompt + window-based feature extraction + AUROC (含進度條)

import os
from pathlib import Path
from tqdm import tqdm
from PIL import Image
import numpy as np
import torch
from sklearn.metrics import roc_auc_score
import pandas as pd
import open_clip   # pip install open_clip_torch

# ================== 全域設定 ==================
MVTEC2_CLASSES = ["one"]

BASE_DIR = Path(r"C:\Users\anywhere4090\Desktop\0902 finalcode\dataset\btad")
OUT_BASE_DIR = Path(r"C:\Users\anywhere4090\Desktop\DDAD-main")
RESULTS_CSV = OUT_BASE_DIR / "mvtec2_winclip_results.csv"

IMGSIZE = 256
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# Window/patch 設定
WINDOW_SIZES = [32, 48, 64]
STRIDE = 16

# ================== CPE Prompt ==================
STATE_WORDS_NORMAL = ["flawless", "intact", "perfect", "clean", "good"]
STATE_WORDS_ANOM = ["broken", "cracked", "damaged", "scratched", "defective", "faulty"]
TEMPLATES = [
    "a photo of a {}",
    "a cropped photo of a {}",
    "a photo of a {} for visual inspection"
]

def build_cpe_prompts(cls_name):
    normal_prompts = [t.format(w + " " + cls_name) for w in STATE_WORDS_NORMAL for t in TEMPLATES]
    anomaly_prompts = [t.format(w + " " + cls_name) for w in STATE_WORDS_ANOM for t in TEMPLATES]
    return normal_prompts, anomaly_prompts

# ================== WinCLIP 特徵抽取 ==================
def extract_winclip_features(model, preprocess, image, normal_prompts, anomaly_prompts,
                             window_size=32, stride=16):
    W, H = image.size
    scores = np.zeros((H, W))
    counts = np.zeros((H, W))

    # encode text prompts
    with torch.no_grad():
        txt_norm = model.encode_text(open_clip.tokenize(normal_prompts).to(DEVICE))
        txt_anom = model.encode_text(open_clip.tokenize(anomaly_prompts).to(DEVICE))
        txt_norm = txt_norm / txt_norm.norm(dim=-1, keepdim=True)
        txt_anom = txt_anom / txt_anom.norm(dim=-1, keepdim=True)

    # sliding window
    for y in range(0, H - window_size + 1, stride):
        for x in range(0, W - window_size + 1, stride):
            crop = image.crop((x, y, x + window_size, y + window_size))
            crop_tensor = preprocess(crop).unsqueeze(0).to(DEVICE)

            with torch.no_grad():
                img_emb = model.encode_image(crop_tensor)
                img_emb = img_emb / img_emb.norm(dim=-1, keepdim=True)

            sim_norm = (img_emb @ txt_norm.T).max().item()
            sim_anom = (img_emb @ txt_anom.T).max().item()
            anomaly_score = sim_anom - sim_norm  # 越大越異常

            scores[y:y + window_size, x:x + window_size] += anomaly_score
            counts[y:y + window_size, x:x + window_size] += 1

    return scores / (counts + 1e-6)

# ================== 推理 (WinCLIP Zero-shot) ==================
def run_inference_winclip(cls):
    model, _, preprocess = open_clip.create_model_and_transforms(
        "ViT-B-16", pretrained="laion400m_e32"
    )
    model = model.to(DEVICE).eval()

    test_root = BASE_DIR / cls / "test"
    y_true, y_score = [], []
    normal_prompts, anomaly_prompts = build_cpe_prompts(cls)

    # 計算總影像數量，用於 tqdm
    all_imgs = []
    for sub in ["good"] + [d.name for d in test_root.iterdir() if d.is_dir() and d.name != "good"]:
        for img_path in (test_root / sub).glob("*.png"):
            all_imgs.append((img_path, sub))
    
    for img_path, sub in tqdm(all_imgs, desc=f"[{cls}] 推理中", unit="img"):
        label = 0 if sub == "good" else 1
        img = Image.open(img_path).convert("RGB").resize((IMGSIZE, IMGSIZE))

        # 多尺度融合
        score_maps = []
        for ws in WINDOW_SIZES:
            score_map = extract_winclip_features(
                model, preprocess, img, normal_prompts, anomaly_prompts,
                window_size=ws, stride=STRIDE
            )
            score_maps.append(score_map)

        final_map = np.mean(score_maps, axis=0)
        score = final_map.max()  # image-level score

        y_true.append(label)
        y_score.append(score)

    return roc_auc_score(y_true, y_score)

# ================== Pipeline 主程式 ==================
results = []
for cls in MVTEC2_CLASSES:
    print(f"\n>>> Class: {cls}")
    auroc = run_inference_winclip(cls)
    print(f"[RESULT] {cls} AUROC={auroc:.4f}")
    results.append({"class": cls, "auroc": auroc})

df = pd.DataFrame(results)
df.to_csv(RESULTS_CSV, index=False)
print(f"Results saved to {RESULTS_CSV}")



>>> Class: one


[one] 推理中: 100%|██████████| 70/70 [03:00<00:00,  2.57s/img]

[RESULT] one AUROC=0.7123
Results saved to C:\Users\anywhere4090\Desktop\DDAD-main\mvtec2_winclip_results.csv





In [6]:
# -*- coding: utf-8 -*-
# mvtec2_winclip_plus_pipeline.py
# WinCLIP+ Pipeline：CPE prompt + Few-shot Normal Support + window-based feature extraction + AUROC (含進度條)

import os
from pathlib import Path
from tqdm import tqdm
from PIL import Image
import numpy as np
import torch
from sklearn.metrics import roc_auc_score
import pandas as pd
import open_clip   # pip install open_clip_torch

# ================== 全域設定 ==================
MVTEC2_CLASSES = ["one"]

BASE_DIR = Path(r"C:\Users\anywhere4090\Desktop\0902 finalcode\dataset\btad")
OUT_BASE_DIR = Path(r"C:\Users\anywhere4090\Desktop\DDAD-main")
RESULTS_CSV = OUT_BASE_DIR / "mvtec2_winclip_plus_results.csv"

IMGSIZE = 256
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# Window/patch 設定
WINDOW_SIZES = [32, 48, 64]
STRIDE = 16

# ================== CPE Prompt ==================
STATE_WORDS_NORMAL = ["flawless", "intact", "perfect", "clean", "good"]
STATE_WORDS_ANOM = ["broken", "cracked", "damaged", "scratched", "defective", "faulty"]
TEMPLATES = [
    "a photo of a {}",
    "a cropped photo of a {}",
    "a photo of a {} for visual inspection"
]

def build_cpe_prompts(cls_name):
    normal_prompts = [t.format(w + " " + cls_name) for w in STATE_WORDS_NORMAL for t in TEMPLATES]
    anomaly_prompts = [t.format(w + " " + cls_name) for w in STATE_WORDS_ANOM for t in TEMPLATES]
    return normal_prompts, anomaly_prompts

# ================== 建立 Support Embeddings ==================
def build_support_embeddings(model, preprocess, cls, max_support=5):
    """從 train/good 抽少量正常影像，生成 support embedding"""
    support_dir = BASE_DIR / cls / "train" / "good"
    support_imgs = sorted(list(support_dir.glob("*.png")))[:max_support]
    support_embs = []

    for img_path in support_imgs:
        img = Image.open(img_path).convert("RGB").resize((IMGSIZE, IMGSIZE))
        tensor = preprocess(img).unsqueeze(0).to(DEVICE)
        with torch.no_grad():
            emb = model.encode_image(tensor)
            emb = emb / emb.norm(dim=-1, keepdim=True)
        support_embs.append(emb)

    if len(support_embs) > 0:
        return torch.cat(support_embs, dim=0)  # shape = [N, D]
    else:
        return None

# ================== WinCLIP+ 特徵抽取 ==================
def extract_winclip_plus_features(model, preprocess, image, normal_prompts, anomaly_prompts,
                                  emb_norm_support=None, window_size=32, stride=16):
    W, H = image.size
    scores = np.zeros((H, W))
    counts = np.zeros((H, W))

    # encode text prompts
    with torch.no_grad():
        txt_norm = model.encode_text(open_clip.tokenize(normal_prompts).to(DEVICE))
        txt_anom = model.encode_text(open_clip.tokenize(anomaly_prompts).to(DEVICE))
        txt_norm = txt_norm / txt_norm.norm(dim=-1, keepdim=True)
        txt_anom = txt_anom / txt_anom.norm(dim=-1, keepdim=True)

    # sliding window
    for y in range(0, H - window_size + 1, stride):
        for x in range(0, W - window_size + 1, stride):
            crop = image.crop((x, y, x + window_size, y + window_size))
            crop_tensor = preprocess(crop).unsqueeze(0).to(DEVICE)

            with torch.no_grad():
                img_emb = model.encode_image(crop_tensor)
                img_emb = img_emb / img_emb.norm(dim=-1, keepdim=True)

            # normal 相似度 (文字 + 支援影像)
            sim_norm_text = (img_emb @ txt_norm.T).max().item()
            sim_norm_support = (img_emb @ emb_norm_support.T).max().item() if emb_norm_support is not None else -1e9
            sim_norm = max(sim_norm_text, sim_norm_support)

            # anomaly 相似度
            sim_anom = (img_emb @ txt_anom.T).max().item()

            anomaly_score = sim_anom - sim_norm  # 越大越異常

            scores[y:y + window_size, x:x + window_size] += anomaly_score
            counts[y:y + window_size, x:x + window_size] += 1

    return scores / (counts + 1e-6)

# ================== 推理 (WinCLIP+) ==================
def run_inference_winclip_plus(cls, max_support=10):
    model, _, preprocess = open_clip.create_model_and_transforms(
        "ViT-B-16", pretrained="laion400m_e32"
    )
    model = model.to(DEVICE).eval()

    # 建立 support embeddings
    emb_norm_support = build_support_embeddings(model, preprocess, cls, max_support=max_support)

    test_root = BASE_DIR / cls / "test"
    y_true, y_score = [], []
    normal_prompts, anomaly_prompts = build_cpe_prompts(cls)

    # 收集所有測試影像
    all_imgs = []
    for sub in ["good"] + [d.name for d in test_root.iterdir() if d.is_dir() and d.name != "good"]:
        for img_path in (test_root / sub).glob("*.png"):
            all_imgs.append((img_path, sub))
    
    for img_path, sub in tqdm(all_imgs, desc=f"[{cls}] 推理中", unit="img"):
        label = 0 if sub == "good" else 1
        img = Image.open(img_path).convert("RGB").resize((IMGSIZE, IMGSIZE))

        # 多尺度融合
        score_maps = []
        for ws in WINDOW_SIZES:
            score_map = extract_winclip_plus_features(
                model, preprocess, img, normal_prompts, anomaly_prompts,
                emb_norm_support=emb_norm_support,
                window_size=ws, stride=STRIDE
            )
            score_maps.append(score_map)

        final_map = np.mean(score_maps, axis=0)
        score = final_map.max()  # image-level score

        y_true.append(label)
        y_score.append(score)

    return roc_auc_score(y_true, y_score)

# ================== Pipeline 主程式 ==================
results = []
for cls in MVTEC2_CLASSES:
    print(f"\n>>> Class: {cls}")
    auroc = run_inference_winclip_plus(cls, max_support=5)
    print(f"[RESULT] {cls} AUROC={auroc:.4f}")
    results.append({"class": cls, "auroc": auroc})

df = pd.DataFrame(results)
df.to_csv(RESULTS_CSV, index=False)
print(f"Results saved to {RESULTS_CSV}")



>>> Class: one


[one] 推理中: 100%|██████████| 70/70 [02:49<00:00,  2.42s/img]

[RESULT] one AUROC=0.8873
Results saved to C:\Users\anywhere4090\Desktop\DDAD-main\mvtec2_winclip_plus_results.csv





In [7]:
# -*- coding: utf-8 -*-
# mvtec2_winclip_plus_pipeline.py
# WinCLIP+ Pipeline：CPE prompt + Few-shot Normal Support + Top-k pooling + backbone selection

import os
from pathlib import Path
from tqdm import tqdm
from PIL import Image
import numpy as np
import torch
from sklearn.metrics import roc_auc_score
import pandas as pd
import open_clip   # pip install open_clip_torch

# ================== 全域設定 ==================
MVTEC2_CLASSES = ["one"]

BASE_DIR = Path(r"C:\Users\anywhere4090\Desktop\0902 finalcode\dataset\btad")
OUT_BASE_DIR = Path(r"C:\Users\anywhere4090\Desktop\DDAD-main")
RESULTS_CSV = OUT_BASE_DIR / "mvtec2_winclip_plus_results.csv"

IMGSIZE = 256
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# Backbone 可選: "ViT-B-16" / "ViT-L-14"
BACKBONE = "ViT-B-16"
PRETRAINED = "laion400m_e32" if BACKBONE == "ViT-B-16" else "laion2b_s32b_b82k"

# Window/patch 設定
WINDOW_SIZES = [32, 48, 64]
STRIDE = 16

# Top-k pooling 設定
TOPK_RATIO = 0.05  # 取前 5%

# ================== CPE Prompt ==================
STATE_WORDS_NORMAL = ["flawless", "intact", "perfect", "clean", "good"]
STATE_WORDS_ANOM = ["broken", "cracked", "damaged", "scratched", "defective", "faulty"]
TEMPLATES = [
    "a photo of a {}",
    "a cropped photo of a {}",
    "a photo of a {} for visual inspection"
]

def build_cpe_prompts(cls_name):
    normal_prompts = [t.format(w + " " + cls_name) for w in STATE_WORDS_NORMAL for t in TEMPLATES]
    anomaly_prompts = [t.format(w + " " + cls_name) for w in STATE_WORDS_ANOM for t in TEMPLATES]
    return normal_prompts, anomaly_prompts

# ================== 建立 Support Embeddings ==================
def build_support_embeddings(model, preprocess, cls, max_support=9999):
    """使用 train/good 生成 support embedding (可用全部正常影像)"""
    support_dir = BASE_DIR / cls / "train" / "good"
    support_imgs = sorted(list(support_dir.glob("*.png")))[:max_support]
    support_embs = []

    for img_path in support_imgs:
        img = Image.open(img_path).convert("RGB").resize((IMGSIZE, IMGSIZE))
        tensor = preprocess(img).unsqueeze(0).to(DEVICE)
        with torch.no_grad():
            emb = model.encode_image(tensor)
            emb = emb / emb.norm(dim=-1, keepdim=True)
        support_embs.append(emb)

    if len(support_embs) > 0:
        return torch.cat(support_embs, dim=0)  # shape = [N, D]
    else:
        return None

# ================== WinCLIP+ 特徵抽取 ==================
def extract_winclip_plus_features(model, preprocess, image, normal_prompts, anomaly_prompts,
                                  emb_norm_support=None, window_size=32, stride=16):
    W, H = image.size
    scores = np.zeros((H, W))
    counts = np.zeros((H, W))

    # encode text prompts
    with torch.no_grad():
        txt_norm = model.encode_text(open_clip.tokenize(normal_prompts).to(DEVICE))
        txt_anom = model.encode_text(open_clip.tokenize(anomaly_prompts).to(DEVICE))
        txt_norm = txt_norm / txt_norm.norm(dim=-1, keepdim=True)
        txt_anom = txt_anom / txt_anom.norm(dim=-1, keepdim=True)

    # sliding window
    for y in range(0, H - window_size + 1, stride):
        for x in range(0, W - window_size + 1, stride):
            crop = image.crop((x, y, x + window_size, y + window_size))
            crop_tensor = preprocess(crop).unsqueeze(0).to(DEVICE)

            with torch.no_grad():
                img_emb = model.encode_image(crop_tensor)
                img_emb = img_emb / img_emb.norm(dim=-1, keepdim=True)

            # normal 相似度 (文字 + 支援影像)
            sim_norm_text = (img_emb @ txt_norm.T).max().item()
            sim_norm_support = (img_emb @ emb_norm_support.T).max().item() if emb_norm_support is not None else -1e9
            sim_norm = max(sim_norm_text, sim_norm_support)

            # anomaly 相似度
            sim_anom = (img_emb @ txt_anom.T).max().item()

            anomaly_score = sim_anom - sim_norm  # 越大越異常

            scores[y:y + window_size, x:x + window_size] += anomaly_score
            counts[y:y + window_size, x:x + window_size] += 1

    return scores / (counts + 1e-6)

# ================== 推理 (WinCLIP+) ==================
def run_inference_winclip_plus(cls, max_support=9999):
    model, _, preprocess = open_clip.create_model_and_transforms(
        BACKBONE, pretrained=PRETRAINED
    )
    model = model.to(DEVICE).eval()

    # 建立 support embeddings
    emb_norm_support = build_support_embeddings(model, preprocess, cls, max_support=max_support)

    test_root = BASE_DIR / cls / "test"
    y_true, y_score = [], []
    normal_prompts, anomaly_prompts = build_cpe_prompts(cls)

    # 收集所有測試影像
    all_imgs = []
    for sub in ["good"] + [d.name for d in test_root.iterdir() if d.is_dir() and d.name != "good"]:
        for img_path in (test_root / sub).glob("*.png"):
            all_imgs.append((img_path, sub))
    
    for img_path, sub in tqdm(all_imgs, desc=f"[{cls}] 推理中", unit="img"):
        label = 0 if sub == "good" else 1
        img = Image.open(img_path).convert("RGB").resize((IMGSIZE, IMGSIZE))

        # 多尺度融合
        score_maps = []
        for ws in WINDOW_SIZES:
            score_map = extract_winclip_plus_features(
                model, preprocess, img, normal_prompts, anomaly_prompts,
                emb_norm_support=emb_norm_support,
                window_size=ws, stride=STRIDE
            )
            score_maps.append(score_map)

        final_map = np.mean(score_maps, axis=0)

        # Top-k pooling (前5% pixel 平均)
        flat = final_map.flatten()
        k = max(1, int(len(flat) * TOPK_RATIO))
        topk = np.partition(flat, -k)[-k:]
        score = topk.mean()

        y_true.append(label)
        y_score.append(score)

    return roc_auc_score(y_true, y_score)

# ================== Pipeline 主程式 ==================
results = []
for cls in MVTEC2_CLASSES:
    print(f"\n>>> Class: {cls} (Backbone={BACKBONE})")
    auroc = run_inference_winclip_plus(cls, max_support=9999)  # 預設用全部 good 當 support
    print(f"[RESULT] {cls} AUROC={auroc:.4f}")
    results.append({"class": cls, "auroc": auroc, "backbone": BACKBONE})

df = pd.DataFrame(results)
df.to_csv(RESULTS_CSV, index=False)
print(f"Results saved to {RESULTS_CSV}")



>>> Class: one (Backbone=ViT-B-16)


[one] 推理中: 100%|██████████| 70/70 [02:56<00:00,  2.52s/img]

[RESULT] one AUROC=0.9349
Results saved to C:\Users\anywhere4090\Desktop\DDAD-main\mvtec2_winclip_plus_results.csv





In [8]:
# -*- coding: utf-8 -*-
# mvtec2_winclip_plus_pipeline.py
# WinCLIP+ Pipeline：CPE prompt + Few-shot Normal Support + Top-k pooling + backbone selection

import os
from pathlib import Path
from tqdm import tqdm
from PIL import Image
import numpy as np
import torch
from sklearn.metrics import roc_auc_score
import pandas as pd
import open_clip   # pip install open_clip_torch

# ================== 全域設定 ==================
MVTEC2_CLASSES = ["two"]

BASE_DIR = Path(r"C:\Users\anywhere4090\Desktop\0902 finalcode\dataset\btad")
OUT_BASE_DIR = Path(r"C:\Users\anywhere4090\Desktop\DDAD-main")
RESULTS_CSV = OUT_BASE_DIR / "mvtec2_winclip_plus_results.csv"

IMGSIZE = 256
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# Backbone 可選: "ViT-B-16" / "ViT-L-14"
BACKBONE = "ViT-B-16"
PRETRAINED = "laion400m_e32" if BACKBONE == "ViT-B-16" else "laion2b_s32b_b82k"

# Window/patch 設定
WINDOW_SIZES = [32, 48, 64]
STRIDE = 16

# Top-k pooling 設定
TOPK_RATIO = 0.05  # 取前 5%

# ================== CPE Prompt ==================
STATE_WORDS_NORMAL = ["flawless", "intact", "perfect", "clean", "good"]
STATE_WORDS_ANOM = ["broken", "cracked", "damaged", "scratched", "defective", "faulty"]
TEMPLATES = [
    "a photo of a {}",
    "a cropped photo of a {}",
    "a photo of a {} for visual inspection"
]

def build_cpe_prompts(cls_name):
    normal_prompts = [t.format(w + " " + cls_name) for w in STATE_WORDS_NORMAL for t in TEMPLATES]
    anomaly_prompts = [t.format(w + " " + cls_name) for w in STATE_WORDS_ANOM for t in TEMPLATES]
    return normal_prompts, anomaly_prompts

# ================== 建立 Support Embeddings ==================
def build_support_embeddings(model, preprocess, cls, max_support=9999):
    """使用 train/good 生成 support embedding (可用全部正常影像)"""
    support_dir = BASE_DIR / cls / "train" / "good"
    support_imgs = sorted(list(support_dir.glob("*.png")))[:max_support]
    support_embs = []

    for img_path in support_imgs:
        img = Image.open(img_path).convert("RGB").resize((IMGSIZE, IMGSIZE))
        tensor = preprocess(img).unsqueeze(0).to(DEVICE)
        with torch.no_grad():
            emb = model.encode_image(tensor)
            emb = emb / emb.norm(dim=-1, keepdim=True)
        support_embs.append(emb)

    if len(support_embs) > 0:
        return torch.cat(support_embs, dim=0)  # shape = [N, D]
    else:
        return None

# ================== WinCLIP+ 特徵抽取 ==================
def extract_winclip_plus_features(model, preprocess, image, normal_prompts, anomaly_prompts,
                                  emb_norm_support=None, window_size=32, stride=16):
    W, H = image.size
    scores = np.zeros((H, W))
    counts = np.zeros((H, W))

    # encode text prompts
    with torch.no_grad():
        txt_norm = model.encode_text(open_clip.tokenize(normal_prompts).to(DEVICE))
        txt_anom = model.encode_text(open_clip.tokenize(anomaly_prompts).to(DEVICE))
        txt_norm = txt_norm / txt_norm.norm(dim=-1, keepdim=True)
        txt_anom = txt_anom / txt_anom.norm(dim=-1, keepdim=True)

    # sliding window
    for y in range(0, H - window_size + 1, stride):
        for x in range(0, W - window_size + 1, stride):
            crop = image.crop((x, y, x + window_size, y + window_size))
            crop_tensor = preprocess(crop).unsqueeze(0).to(DEVICE)

            with torch.no_grad():
                img_emb = model.encode_image(crop_tensor)
                img_emb = img_emb / img_emb.norm(dim=-1, keepdim=True)

            # normal 相似度 (文字 + 支援影像)
            sim_norm_text = (img_emb @ txt_norm.T).max().item()
            sim_norm_support = (img_emb @ emb_norm_support.T).max().item() if emb_norm_support is not None else -1e9
            sim_norm = max(sim_norm_text, sim_norm_support)

            # anomaly 相似度
            sim_anom = (img_emb @ txt_anom.T).max().item()

            anomaly_score = sim_anom - sim_norm  # 越大越異常

            scores[y:y + window_size, x:x + window_size] += anomaly_score
            counts[y:y + window_size, x:x + window_size] += 1

    return scores / (counts + 1e-6)

# ================== 推理 (WinCLIP+) ==================
def run_inference_winclip_plus(cls, max_support=9999):
    model, _, preprocess = open_clip.create_model_and_transforms(
        BACKBONE, pretrained=PRETRAINED
    )
    model = model.to(DEVICE).eval()

    # 建立 support embeddings
    emb_norm_support = build_support_embeddings(model, preprocess, cls, max_support=max_support)

    test_root = BASE_DIR / cls / "test"
    y_true, y_score = [], []
    normal_prompts, anomaly_prompts = build_cpe_prompts(cls)

    # 收集所有測試影像
    all_imgs = []
    for sub in ["good"] + [d.name for d in test_root.iterdir() if d.is_dir() and d.name != "good"]:
        for img_path in (test_root / sub).glob("*.png"):
            all_imgs.append((img_path, sub))
    
    for img_path, sub in tqdm(all_imgs, desc=f"[{cls}] 推理中", unit="img"):
        label = 0 if sub == "good" else 1
        img = Image.open(img_path).convert("RGB").resize((IMGSIZE, IMGSIZE))

        # 多尺度融合
        score_maps = []
        for ws in WINDOW_SIZES:
            score_map = extract_winclip_plus_features(
                model, preprocess, img, normal_prompts, anomaly_prompts,
                emb_norm_support=emb_norm_support,
                window_size=ws, stride=STRIDE
            )
            score_maps.append(score_map)

        final_map = np.mean(score_maps, axis=0)

        # Top-k pooling (前5% pixel 平均)
        flat = final_map.flatten()
        k = max(1, int(len(flat) * TOPK_RATIO))
        topk = np.partition(flat, -k)[-k:]
        score = topk.mean()

        y_true.append(label)
        y_score.append(score)

    return roc_auc_score(y_true, y_score)

# ================== Pipeline 主程式 ==================
results = []
for cls in MVTEC2_CLASSES:
    print(f"\n>>> Class: {cls} (Backbone={BACKBONE})")
    auroc = run_inference_winclip_plus(cls, max_support=9999)  # 預設用全部 good 當 support
    print(f"[RESULT] {cls} AUROC={auroc:.4f}")
    results.append({"class": cls, "auroc": auroc, "backbone": BACKBONE})

df = pd.DataFrame(results)
df.to_csv(RESULTS_CSV, index=False)
print(f"Results saved to {RESULTS_CSV}")



>>> Class: two (Backbone=ViT-B-16)


[two] 推理中: 100%|██████████| 230/230 [09:29<00:00,  2.47s/img]

[RESULT] two AUROC=0.7510
Results saved to C:\Users\anywhere4090\Desktop\DDAD-main\mvtec2_winclip_plus_results.csv





In [13]:
# -*- coding: utf-8 -*-
# mvtec2_winclip_plus_pipeline.py
# WinCLIP+ Pipeline：CPE prompt + Few-shot Normal Support + Top-k pooling + backbone selection

import os
from pathlib import Path
from tqdm import tqdm
from PIL import Image
import numpy as np
import torch
from sklearn.metrics import roc_auc_score
import pandas as pd
import open_clip   # pip install open_clip_torch

# ================== 全域設定 ==================
MVTEC2_CLASSES = ["three"]

BASE_DIR = Path(r"C:\Users\anywhere4090\Desktop\0902 finalcode\dataset\btad")
OUT_BASE_DIR = Path(r"C:\Users\anywhere4090\Desktop\DDAD-main")
RESULTS_CSV = OUT_BASE_DIR / "mvtec2_winclip_plus_results.csv"

IMGSIZE = 256
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# Backbone 可選: "ViT-B-16" / "ViT-L-14"
BACKBONE = "ViT-L-14"
PRETRAINED = "laion400m_e32" if BACKBONE == "ViT-B-16" else "laion2b_s32b_b82k"

# Window/patch 設定
WINDOW_SIZES = [32, 48, 64]
STRIDE = 16

# Top-k pooling 設定
TOPK_RATIO = 0.05  # 取前 5%

# ================== CPE Prompt ==================
STATE_WORDS_NORMAL = ["flawless", "intact", "perfect", "clean", "good"]
STATE_WORDS_ANOM = ["broken", "cracked", "damaged", "scratched", "defective", "faulty"]
TEMPLATES = [
    "a photo of a {}",
    "a cropped photo of a {}",
    "a photo of a {} for visual inspection"
]

def build_cpe_prompts(cls_name):
    normal_prompts = [t.format(w + " " + cls_name) for w in STATE_WORDS_NORMAL for t in TEMPLATES]
    anomaly_prompts = [t.format(w + " " + cls_name) for w in STATE_WORDS_ANOM for t in TEMPLATES]
    return normal_prompts, anomaly_prompts

# ================== 建立 Support Embeddings ==================
def build_support_embeddings(model, preprocess, cls, max_support=9999):
    """使用 train/good 生成 support embedding (可用全部正常影像)"""
    support_dir = BASE_DIR / cls / "train" / "good"
    support_imgs = sorted(list(support_dir.glob("*.png")))[:max_support]
    support_embs = []

    for img_path in support_imgs:
        img = Image.open(img_path).convert("RGB").resize((IMGSIZE, IMGSIZE))
        tensor = preprocess(img).unsqueeze(0).to(DEVICE)
        with torch.no_grad():
            emb = model.encode_image(tensor)
            emb = emb / emb.norm(dim=-1, keepdim=True)
        support_embs.append(emb)

    if len(support_embs) > 0:
        return torch.cat(support_embs, dim=0)  # shape = [N, D]
    else:
        return None

# ================== WinCLIP+ 特徵抽取 ==================
def extract_winclip_plus_features(model, preprocess, image, normal_prompts, anomaly_prompts,
                                  emb_norm_support=None, window_size=32, stride=16):
    W, H = image.size
    scores = np.zeros((H, W))
    counts = np.zeros((H, W))

    # encode text prompts
    with torch.no_grad():
        txt_norm = model.encode_text(open_clip.tokenize(normal_prompts).to(DEVICE))
        txt_anom = model.encode_text(open_clip.tokenize(anomaly_prompts).to(DEVICE))
        txt_norm = txt_norm / txt_norm.norm(dim=-1, keepdim=True)
        txt_anom = txt_anom / txt_anom.norm(dim=-1, keepdim=True)

    # sliding window
    for y in range(0, H - window_size + 1, stride):
        for x in range(0, W - window_size + 1, stride):
            crop = image.crop((x, y, x + window_size, y + window_size))
            crop_tensor = preprocess(crop).unsqueeze(0).to(DEVICE)

            with torch.no_grad():
                img_emb = model.encode_image(crop_tensor)
                img_emb = img_emb / img_emb.norm(dim=-1, keepdim=True)

            # normal 相似度 (文字 + 支援影像)
            sim_norm_text = (img_emb @ txt_norm.T).max().item()
            sim_norm_support = (img_emb @ emb_norm_support.T).max().item() if emb_norm_support is not None else -1e9
            sim_norm = max(sim_norm_text, sim_norm_support)

            # anomaly 相似度
            sim_anom = (img_emb @ txt_anom.T).max().item()

            anomaly_score = sim_anom - sim_norm  # 越大越異常

            scores[y:y + window_size, x:x + window_size] += anomaly_score
            counts[y:y + window_size, x:x + window_size] += 1

    return scores / (counts + 1e-6)

# ================== 推理 (WinCLIP+) ==================
def run_inference_winclip_plus(cls, max_support=9999):
    model, _, preprocess = open_clip.create_model_and_transforms(
        BACKBONE, pretrained=PRETRAINED
    )
    model = model.to(DEVICE).eval()

    # 建立 support embeddings
    emb_norm_support = build_support_embeddings(model, preprocess, cls, max_support=max_support)

    test_root = BASE_DIR / cls / "test"
    y_true, y_score = [], []
    normal_prompts, anomaly_prompts = build_cpe_prompts(cls)

    # 收集所有測試影像
    all_imgs = []
    for sub in ["good"] + [d.name for d in test_root.iterdir() if d.is_dir() and d.name != "good"]:
        for img_path in (test_root / sub).glob("*.png"):
            all_imgs.append((img_path, sub))
    
    for img_path, sub in tqdm(all_imgs, desc=f"[{cls}] 推理中", unit="img"):
        label = 0 if sub == "good" else 1
        img = Image.open(img_path).convert("RGB").resize((IMGSIZE, IMGSIZE))

        # 多尺度融合
        score_maps = []
        for ws in WINDOW_SIZES:
            score_map = extract_winclip_plus_features(
                model, preprocess, img, normal_prompts, anomaly_prompts,
                emb_norm_support=emb_norm_support,
                window_size=ws, stride=STRIDE
            )
            score_maps.append(score_map)

        final_map = np.mean(score_maps, axis=0)

        # Top-k pooling (前5% pixel 平均)
        flat = final_map.flatten()
        k = max(1, int(len(flat) * TOPK_RATIO))
        topk = np.partition(flat, -k)[-k:]
        score = topk.mean()

        y_true.append(label)
        y_score.append(score)

    return roc_auc_score(y_true, y_score)

# ================== Pipeline 主程式 ==================
results = []
for cls in MVTEC2_CLASSES:
    print(f"\n>>> Class: {cls} (Backbone={BACKBONE})")
    auroc = run_inference_winclip_plus(cls, max_support=9999)  # 預設用全部 good 當 support
    print(f"[RESULT] {cls} AUROC={auroc:.4f}")
    results.append({"class": cls, "auroc": auroc, "backbone": BACKBONE})

df = pd.DataFrame(results)
df.to_csv(RESULTS_CSV, index=False)
print(f"Results saved to {RESULTS_CSV}")



>>> Class: three (Backbone=ViT-B-16)


[three] 推理中: 100%|██████████| 441/441 [17:48<00:00,  2.42s/img]

[RESULT] three AUROC=0.6763
Results saved to C:\Users\anywhere4090\Desktop\DDAD-main\mvtec2_winclip_plus_results.csv





In [None]:
\begin{table}[H]
\centering
\caption{MVTec AD 2 dataset: Image AUROC (\%) across different CLIP training strategies.}
\label{tab:mvtec2_clip_perclass}
\resizebox{\linewidth}{!}{
\begin{tabular}{l c c c c c c c c c}
\toprule
Setting & can & fabric & fruit\_jelly & rice & sheet\_metal & vial & wallplugs & walnuts & Average \\
\midrule
Only CLIP (only good) & 44.6451 & 38.05 & 49.25 & 51.16 & 42.12 & 66.1497 & 43.03 & 58.51 & 49.1011 \\
Only CLIP (good + 1-fold augmentation + 1-fold duplication) & 39.7531 & 33.97 & 55.83 & 52.54 & 32.64 & 66.18 & 52.39 & 49.74 & 47.3804 \\
Only CLIP (good + 2-fold augmentation + 2-fold duplication) & 45.52 & 37.21 & 49.83 & 51.24 & 29.26 & 55.16 & 58.09 & 56.4074 & 47.8397 \\
Only CLIP (good + 3-fold augmentation + 3-fold duplication) & 46.5741 & 43.569 & 51.833 & 47.9894 & 37.6852 & 67.1293 & 59.7778 & 63.2778 & \textbf{52.9795} \\
Only CLIP (good + 4-fold augmentation + 4-fold duplication) & 47.3765 & 38.67 & 54.9167 & 51.8254 & 33.4259 & 60.1361 & 64.4074 & 48.9259 & 49.7105 \\
Only CLIP (good + 5-fold augmentation + 5-fold duplication) & 49.25 & 37.3064 & 54.5833 & 45.7011 & 35.3241 & 60.6259 & 65.9259 & 48.6481 & 49.6706 \\
\textbf{Diffusion baseline + normalization anomaly score and guidance by scale + CLIP (good + 3-fold augmentation + 3-fold duplication)} & 73.61 & 80.37 & 95.33 & 75.93 & 60.83 & 67.14 & 58.33 & 98.56 & \textbf{76.26} \\
\textbf{Base (diffusion, only image)} & 73.04 & 53.45 & 96.58 & 75.13 & 72.96 & 79.81 & 66.20 & 79.44 & \textbf{74.58} \\
\bottomrule
\end{tabular}
}
\end{table}


In [14]:
# -*- coding: utf-8 -*-
# mvtec2_winclip_plus_seg_pipeline.py
# WinCLIP+ Segmentation 版：
# - Classification: Image AUROC (Top-k pooling)
# - Segmentation: Pixel AUROC, PRO
# - Few-shot Normal Support 可選
# - 多尺度 sliding-window feature extraction
# - 含 tqdm 進度條

import os
from pathlib import Path
from tqdm import tqdm
from PIL import Image
import numpy as np
import torch
from sklearn.metrics import roc_auc_score
import pandas as pd
import open_clip   # pip install open_clip_torch
from skimage import measure

# ================== 全域設定 ==================
MVTEC2_CLASSES = ["bagel"]

BASE_DIR = Path(r"C:\Users\anywhere4090\Desktop\0902 finalcode\dataset\newmvtec3d")
OUT_BASE_DIR = Path(r"C:\Users\anywhere4090\Desktop\DDAD-main")
RESULTS_CSV = OUT_BASE_DIR / "mvtec2_winclip_plus_seg_results.csv"

IMGSIZE = 256
DEVICE = "cuda" if torch.cuda.is_available() else "cpu"

# Backbone 可選: "ViT-B-16" / "ViT-L-14"
BACKBONE = "ViT-L-14"
PRETRAINED = "laion400m_e32" if BACKBONE == "ViT-B-16" else "laion2b_s32b_b82k"

# Window/patch 設定
WINDOW_SIZES = [32, 48, 64]
STRIDE = 16

# Top-k pooling 設定
TOPK_RATIO = 0.05  # 取前 5%

# Few-shot Support
SUPPORT_MAX = 5  # 可調：取多少張 train/good 作 support

# ================== CPE Prompt ==================
STATE_WORDS_NORMAL = ["flawless", "intact", "perfect", "clean", "good"]
STATE_WORDS_ANOM = ["broken", "cracked", "damaged", "scratched", "defective", "faulty"]
TEMPLATES = [
    "a photo of a {}",
    "a cropped photo of a {}",
    "a photo of a {} for visual inspection"
]

def build_cpe_prompts(cls_name):
    normal_prompts = [t.format(w + " " + cls_name) for w in STATE_WORDS_NORMAL for t in TEMPLATES]
    anomaly_prompts = [t.format(w + " " + cls_name) for w in STATE_WORDS_ANOM for t in TEMPLATES]
    return normal_prompts, anomaly_prompts

# ================== 建立 Support Embeddings ==================
def build_support_embeddings(model, preprocess, cls, max_support=SUPPORT_MAX):
    """從 train/good 抽少量正常影像，生成 support embedding"""
    support_dir = BASE_DIR / cls / "train" / "good"
    support_imgs = sorted(list(support_dir.glob("*.png")))[:max_support]
    support_embs = []

    for img_path in support_imgs:
        img = Image.open(img_path).convert("RGB").resize((IMGSIZE, IMGSIZE))
        tensor = preprocess(img).unsqueeze(0).to(DEVICE)
        with torch.no_grad():
            emb = model.encode_image(tensor)
            emb = emb / emb.norm(dim=-1, keepdim=True)
        support_embs.append(emb)

    if len(support_embs) > 0:
        return torch.cat(support_embs, dim=0)  # shape = [N, D]
    else:
        return None

# ================== WinCLIP+ 特徵抽取 ==================
def extract_winclip_plus_features(model, preprocess, image, normal_prompts, anomaly_prompts,
                                  emb_norm_support=None, window_size=32, stride=16):
    W, H = image.size
    scores = np.zeros((H, W))
    counts = np.zeros((H, W))

    # encode text prompts
    with torch.no_grad():
        txt_norm = model.encode_text(open_clip.tokenize(normal_prompts).to(DEVICE))
        txt_anom = model.encode_text(open_clip.tokenize(anomaly_prompts).to(DEVICE))
        txt_norm = txt_norm / txt_norm.norm(dim=-1, keepdim=True)
        txt_anom = txt_anom / txt_anom.norm(dim=-1, keepdim=True)

    # sliding window
    for y in range(0, H - window_size + 1, stride):
        for x in range(0, W - window_size + 1, stride):
            crop = image.crop((x, y, x + window_size, y + window_size))
            crop_tensor = preprocess(crop).unsqueeze(0).to(DEVICE)

            with torch.no_grad():
                img_emb = model.encode_image(crop_tensor)
                img_emb = img_emb / img_emb.norm(dim=-1, keepdim=True)

            # normal 相似度 (文字 + 支援影像)
            sim_norm_text = (img_emb @ txt_norm.T).max().item()
            sim_norm_support = (img_emb @ emb_norm_support.T).max().item() if emb_norm_support is not None else -1e9
            sim_norm = max(sim_norm_text, sim_norm_support)

            # anomaly 相似度
            sim_anom = (img_emb @ txt_anom.T).max().item()

            anomaly_score = sim_anom - sim_norm  # 越大越異常

            scores[y:y + window_size, x:x + window_size] += anomaly_score
            counts[y:y + window_size, x:x + window_size] += 1

    return scores / (counts + 1e-6)

# ================== 計算 PRO (Per-Region-Overlap) ==================
def compute_pro(masks, heatmaps, num_th=50):
    pros = []
    for th in np.linspace(0, 1, num_th):
        bin_preds = (heatmaps >= th).astype(np.uint8)
        for mask, pred in zip(masks, bin_preds):
            label_mask = measure.label(mask, connectivity=2)
            regions = np.unique(label_mask)[1:]  # skip background
            for r in regions:
                region = (label_mask == r)
                inter = (pred * region).sum()
                union = region.sum()
                if union > 0:
                    pros.append(inter / union)
    return np.mean(pros) if len(pros) > 0 else 0.0

# ================== 推理 (WinCLIP+ Segmentation) ==================
def run_inference_winclip_plus_seg(cls, max_support=SUPPORT_MAX):
    model, _, preprocess = open_clip.create_model_and_transforms(
        BACKBONE, pretrained=PRETRAINED
    )
    model = model.to(DEVICE).eval()

    # 建立 support embeddings
    emb_norm_support = build_support_embeddings(model, preprocess, cls, max_support=max_support)

    test_root = BASE_DIR / cls / "test"
    gt_root = BASE_DIR / cls / "ground_truth"

    y_true, y_score = [], []
    masks_all, maps_all = [], []

    normal_prompts, anomaly_prompts = build_cpe_prompts(cls)

    # 收集所有測試影像
    all_imgs = []
    for sub in ["good"] + [d.name for d in test_root.iterdir() if d.is_dir() and d.name != "good"]:
        for img_path in (test_root / sub).glob("*.png"):
            all_imgs.append((img_path, sub))
    
    for img_path, sub in tqdm(all_imgs, desc=f"[{cls}] 推理中", unit="img"):
        label = 0 if sub == "good" else 1
        img = Image.open(img_path).convert("RGB").resize((IMGSIZE, IMGSIZE))

        # 多尺度融合
        score_maps = []
        for ws in WINDOW_SIZES:
            score_map = extract_winclip_plus_features(
                model, preprocess, img, normal_prompts, anomaly_prompts,
                emb_norm_support=emb_norm_support,
                window_size=ws, stride=STRIDE
            )
            score_maps.append(score_map)

        final_map = np.mean(score_maps, axis=0)

        # === Classification score (Top-k pooling) ===
        flat = final_map.flatten()
        k = max(1, int(len(flat) * TOPK_RATIO))
        topk = np.partition(flat, -k)[-k:]
        score = topk.mean()

        y_true.append(label)
        y_score.append(score)

        # === Segmentation (only anomaly has mask) ===
        if label == 1:
            mask_path = gt_root / sub / img_path.name
            if mask_path.exists():
                mask = Image.open(mask_path).convert("L").resize(final_map.shape[::-1])
                mask = (np.array(mask) > 127).astype(np.uint8)
                masks_all.append(mask)
                maps_all.append((final_map - final_map.min()) / (final_map.max() - final_map.min() + 1e-6))

    # Image-level AUROC
    img_auroc = roc_auc_score(y_true, y_score)

    # Pixel-level AUROC & PRO
    if len(masks_all) > 0:
        masks_all = np.array(masks_all)
        maps_all = np.array(maps_all)
        px_auroc = roc_auc_score(masks_all.flatten(), maps_all.flatten())
        pro = compute_pro(masks_all, maps_all)
    else:
        px_auroc, pro = np.nan, np.nan

    return img_auroc, px_auroc, pro

# ================== Pipeline 主程式 ==================
results = []
for cls in MVTEC2_CLASSES:
    print(f"\n>>> Class: {cls} (Backbone={BACKBONE})")
    img_auroc, px_auroc, pro = run_inference_winclip_plus_seg(cls, max_support=SUPPORT_MAX)
    print(f"[RESULT] {cls} Image-AUROC={img_auroc:.4f}, Pixel-AUROC={px_auroc:.4f}, PRO={pro:.4f}")
    results.append({
        "class": cls,
        "backbone": BACKBONE,
        "support": SUPPORT_MAX,
        "image_auroc": img_auroc,
        "pixel_auroc": px_auroc,
        "pro": pro
    })

df = pd.DataFrame(results)
df.to_csv(RESULTS_CSV, index=False)
print(f"Results saved to {RESULTS_CSV}")



>>> Class: bagel (Backbone=ViT-L-14)


Xet Storage is enabled for this repo, but the 'hf_xet' package is not installed. Falling back to regular HTTP download. For better performance, install the package with: `pip install huggingface_hub[hf_xet]` or `pip install hf_xet`


open_clip_pytorch_model.bin:   0%|          | 0.00/1.71G [00:00<?, ?B/s]

To support symlinks on Windows, you either need to activate Developer Mode or to run Python as an administrator. In order to activate developer mode, see this article: https://docs.microsoft.com/en-us/windows/apps/get-started/enable-your-device-for-development
[bagel] 推理中: 100%|██████████| 110/110 [11:02<00:00,  6.02s/img]


[RESULT] bagel Image-AUROC=0.8079, Pixel-AUROC=0.4367, PRO=0.4812
Results saved to C:\Users\anywhere4090\Desktop\DDAD-main\mvtec2_winclip_plus_seg_results.csv
