In [1]:
import os
import json
import torch
import numpy as np
from PIL import Image
from transformers import CLIPModel, CLIPProcessor
import pandas as pd
from sklearn.metrics import classification_report, accuracy_score, precision_recall_fscore_support

DEVICE = "cuda" if torch.cuda.is_available() else "cpu"
MODEL_NAME = "openai/clip-vit-base-patch32"

model = CLIPModel.from_pretrained(MODEL_NAME).to(DEVICE)
processor = CLIPProcessor.from_pretrained(MODEL_NAME)

with open("prompt_cn.json", "r", encoding="utf-8") as f:
    prompt_data = json.load(f)

CATEGORIES = list(prompt_data.keys())
PROMPT_VARIATIONS = list(prompt_data.values())


Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.


In [2]:
def init_text_features(variations: list, device: str):
    """
    把所有 prompt 一次性编码为归一化后的特征向量。
    返回：
      text_features: Tensor[num_prompts, dim]
      prompt_to_cat: List[num_prompts]，映射到类别索引
    """
    all_prompts = [p for vs in variations for p in vs]
    prompt_to_cat = []
    for idx, vs in enumerate(variations):
        prompt_to_cat += [idx] * len(vs)

    inputs = processor(
        text=all_prompts,
        return_tensors="pt",
        padding=True,
        truncation=True
    ).to(device)

    with torch.no_grad():
        feats = model.get_text_features(**inputs)
        feats = feats / feats.norm(dim=-1, keepdim=True)
    return feats, prompt_to_cat

# 执行一次
TEXT_FEATURES, PROMPT_TO_CAT = init_text_features(PROMPT_VARIATIONS, DEVICE)


In [3]:
def classify_image(img: Image.Image, 
                   text_feats: torch.Tensor, 
                   prompt_to_cat: list, 
                   top_k: int = 5, 
                   return_all: bool = False):
    inputs = processor(images=img, return_tensors="pt", padding=True).to(DEVICE)
    with torch.no_grad():
        img_feat = model.get_image_features(**inputs)
        img_feat = img_feat / img_feat.norm(dim=-1, keepdim=True)
        logits = model.logit_scale.exp() * img_feat @ text_feats.T
        probs = logits.softmax(dim=-1)[0].cpu().numpy()

    # 聚合到类别
    cat_probs = np.zeros(len(CATEGORIES))
    for i, p in enumerate(probs):
        cat_probs[prompt_to_cat[i]] += p
    cat_probs /= cat_probs.sum()

    if return_all:
        idxs = np.argsort(-cat_probs)
    else:
        idxs = np.argpartition(cat_probs, -top_k)[-top_k:]
        idxs = idxs[np.argsort(-cat_probs[idxs])]

    return [(CATEGORIES[i], float(cat_probs[i])) for i in idxs]


def classify_batch(imgs: list, **kwargs) -> list:
    return [ classify_image(img, TEXT_FEATURES, PROMPT_TO_CAT, **kwargs) for img in imgs ]


In [None]:
def evaluate_directory(directory: str,
                       ground_truth: dict,
                       batch_size: int = 16,
                       top_k: int = 5) -> pd.DataFrame:
    """
    return every image's top-5 predictions (labels and probabilities) and rank.
    """
    files = [f for f in os.listdir(directory) if f.lower().endswith((".png"))]
    rows = []
    for i in range(0, len(files), batch_size):
        batch = files[i:i+batch_size]
        imgs  = []
        for fn in batch:
            img = Image.open(os.path.join(directory, fn)).convert("RGB")
            imgs.append(img)

        preds_batch = classify_batch(imgs, top_k=top_k, return_all=True)
        for fn, preds in zip(batch, preds_batch):
            true = ground_truth.get(fn, "Unknown")
            # extract top-5 predictions
            labels = [x[0] for x in preds[:5]]
            scores = [x[1] for x in preds[:5]]
            if len(labels) < 5:
                labels += [""] * (5 - len(labels))
                scores += [0.0] * (5 - len(scores))
            row = {
                "Image": fn,
                "True Label": true
            }
            # include top-5 predictions (labels and probabilities)
            for i in range(5):
                row[f"Top{i+1}_Label"] = labels[i]
                row[f"Top{i+1}_Prob"] = scores[i]
            # calculate rank
            found = -1
            for k in range(1, 6):
                if row[f"Top{k}_Label"] == true:
                    found = k
                    break
            row["Rank"] = found
            rows.append(row)
    return pd.DataFrame(rows)


In [None]:
with open("ground_truth_cn.json", "r", encoding="utf-8") as f:
    gt = json.load(f)

# Per image predictions (labels and probabilities) and rank
df_results = evaluate_directory("cn", ground_truth=gt, batch_size=16, top_k=5)
df_results.to_csv("rs/zeroshotRS/per_image_top5.csv", index=False)
df_results.head()

Unnamed: 0,Image,True Label,Top1_Label,Top1_Prob,Top2_Label,Top2_Prob,Top3_Label,Top3_Prob,Top4_Label,Top4_Prob,Top5_Label,Top5_Prob,Rank
0,animal_crossing_0.png,animal crossing,animal crossing,0.153623,round about,0.145946,stop,0.085567,give way,0.059918,steep descent,0.054768,1
1,animal_crossing_1.png,animal crossing,steep descent,0.159773,steep ascent,0.130587,pedestrian crossing,0.081538,cycle crossing,0.06719,road work ahead,0.064027,-1
2,animal_crossing_2.png,animal crossing,steep descent,0.12922,no overtaking,0.09427,animal crossing,0.075137,steep ascent,0.073996,narrow roads ahead,0.057431,3
3,animal_crossing_3.png,animal crossing,animal crossing,0.439263,round about,0.124138,slippery road,0.038707,no overtaking,0.03373,bumpy road,0.033481,1
4,animal_crossing_4.png,animal crossing,steep descent,0.127017,slippery road,0.118197,steep ascent,0.11065,pedestrian crossing,0.077748,bumpy road,0.058492,-1


In [None]:
# Overall macro metrics
with open("ground_truth_cn.json", "r", encoding="utf-8") as f:
    gt = json.load(f)


label_names = sorted(set(gt.values()))
label2id = {name: idx for idx, name in enumerate(label_names)}
id2label = {idx: name for name, idx in label2id.items()}
NUM_CLASSES = len(label2id)

y_true = df_results["True Label"].values
y_pred = df_results["Top1_Label"].values

acc = accuracy_score(y_true, y_pred)
p, r, f1, s = precision_recall_fscore_support(
    y_true, y_pred, labels=label_names, average='macro', zero_division=0)

overall_metrics = {
    "Accuracy": [acc],
    "Macro Precision": [p],
    "Macro Recall": [r],
    "Macro F1": [f1],
    "Support": [len(y_true)]
}
df_overall_metrics = pd.DataFrame(overall_metrics)
df_overall_metrics.to_csv("rs/zeroshotRS/overall_macro_metrics.csv", index=False)

df_overall_metrics.head()

Unnamed: 0,Accuracy,Macro Precision,Macro Recall,Macro F1,Support
0,0.450893,0.470479,0.439722,0.411967,448


In [7]:
# Per-class macro metrics
y_true = df_results["True Label"].values
y_pred = df_results["Top1_Label"].values

p_c, r_c, f1_c, s_c = precision_recall_fscore_support(
    y_true, y_pred, labels=label_names, average=None, zero_division=0)

df_per_class_metrics = pd.DataFrame({
    "Class": label_names,
    "Precision": p_c,
    "Recall": r_c,
    "F1": f1_c,
    "Support": s_c
})

df_per_class_metrics.to_csv("rs/zeroshotRS/per_class_macro_metrics.csv", index=False)
df_per_class_metrics.head()

Unnamed: 0,Class,Precision,Recall,F1,Support
0,animal crossing,0.428571,0.3,0.352941,10
1,bumpy road,0.0,0.0,0.0,10
2,cross road,1.0,0.4,0.571429,10
3,cycle crossing,0.666667,1.0,0.8,20
4,dip,0.0,0.0,0.0,10


In [None]:
# Overall top-K accuracy
Ks = [1, 2, 3, 4, 5]
overall_acc = {}

for k in Ks:
    hit = 0
    for i, row in df_results.iterrows():
        found = False
        for ki in range(1, k+1):
            if row[f"Top{ki}_Label"] == row["True Label"]:
                found = True
                break
        if found:
            hit += 1
    overall_acc[f"Top-{k}"] = hit / len(df_results)

overall_acc_df = pd.DataFrame(list(overall_acc.items()), columns=["Top-K", "Accuracy"])
overall_acc_df.to_csv("rs/zeroshotRS/overall_top5_accuracy.csv", index=False)
overall_acc_df.head()

Unnamed: 0,Top-K,Accuracy
0,Top-1,0.450893
1,Top-2,0.649554
2,Top-3,0.758929
3,Top-4,0.808036
4,Top-5,0.84375


In [None]:
# Per-class top-K accuracy
per_class_acc = []

for class_name in label_names:
    row = [class_name]
    mask = df_results["True Label"] == class_name
    df_sub = df_results[mask]
    n = len(df_sub)
    for k in Ks:
        hit = 0
        for _, r in df_sub.iterrows():
            found = False
            for ki in range(1, k+1):
                if r[f"Top{ki}_Label"] == r["True Label"]:
                    found = True
                    break
            if found:
                hit += 1
        acc = hit / n if n > 0 else 0.0
        row.append(acc)
    per_class_acc.append(row)

header = ["Class"] + [f"Top-{k} Acc" for k in Ks]
df_per_class_acc = pd.DataFrame(per_class_acc, columns=header)
df_per_class_acc.to_csv("rs/zeroshotRS/per_class_top5_accuracy.csv", index=False)
df_per_class_acc.head()

Unnamed: 0,Class,Top-1 Acc,Top-2 Acc,Top-3 Acc,Top-4 Acc,Top-5 Acc
0,animal crossing,0.3,0.5,0.6,0.7,0.8
1,bumpy road,0.0,0.0,0.4,0.7,0.9
2,cross road,0.4,0.7,0.9,1.0,1.0
3,cycle crossing,1.0,1.0,1.0,1.0,1.0
4,dip,0.0,0.0,0.0,0.0,0.0
