In [None]:
import os
import torch
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader
from torchvision.transforms import Compose, Resize, CenterCrop, ToTensor, Normalize
from PIL import Image
import pandas as pd
from transformers import CLIPConfig, CLIPProcessor, CLIPModel
from torch.optim import AdamW
from transformers import get_linear_schedule_with_warmup, get_scheduler

In [None]:
from pathlib import Path
import json
import matplotlib.pyplot as plt

In [None]:
from google.colab import drive

In [None]:
drive.mount('/content/drive')

Mounted at /content/drive


In [None]:
image_folder = '/content/drive/MyDrive/545data'

Generate CSV file for data

In [None]:
!wget http://images.cocodataset.org/zips/train2014.zip
!unzip train2014.zip


[1;30;43mStreaming output truncated to the last 5000 lines.[0m
 extracting: train2014/COCO_train2014_000000408557.jpg  
 extracting: train2014/COCO_train2014_000000013714.jpg  
 extracting: train2014/COCO_train2014_000000194043.jpg  
 extracting: train2014/COCO_train2014_000000219859.jpg  
 extracting: train2014/COCO_train2014_000000278135.jpg  
 extracting: train2014/COCO_train2014_000000141015.jpg  
 extracting: train2014/COCO_train2014_000000280923.jpg  
 extracting: train2014/COCO_train2014_000000200024.jpg  
 extracting: train2014/COCO_train2014_000000435713.jpg  
 extracting: train2014/COCO_train2014_000000249993.jpg  
 extracting: train2014/COCO_train2014_000000424160.jpg  
 extracting: train2014/COCO_train2014_000000142761.jpg  
 extracting: train2014/COCO_train2014_000000532668.jpg  
 extracting: train2014/COCO_train2014_000000564904.jpg  
 extracting: train2014/COCO_train2014_000000346384.jpg  
 extracting: train2014/COCO_train2014_000000560934.jpg  
 extracting: train2014/

In [None]:
BASE = Path("/content/drive/MyDrive/545data")
ORIGIN_CAPS     = "origin_captions.json"
ORIGIN_IMAGES   = "origin_images_paths.json"
DUAL_IMAGES     = "/content/drive/MyDrive/545data/dual_images_paths.json"
DUAL_CAPS       = "/content/drive/MyDrive/545data/dual_captions.json"

# 1) load each JSON
with open(ORIGIN_CAPS,   "r") as f:
    origin_captions     = json.load(f)

with open(ORIGIN_IMAGES, "r") as f:
    origin_images_paths = json.load(f)

with open(DUAL_IMAGES,   "r") as f:
    dual_images_paths   = json.load(f)

with open(DUAL_CAPS,   "r") as f:
    dual_captions    = json.load(f)

# 2) depending on structure, either they’re already dicts
#    or lists of records.  Here are both options:

# If your JSONs are already of the form { "1": "...", "2": "...", … }:
captions    = origin_captions
true_images = origin_images_paths
neg_images  = dual_images_paths
dual_captions = dual_captions

simpleClip_caption = dict(list(origin_captions.items())[:10000])
simpleClip_image = dict(list(origin_images_paths.items())[:10000])

In [None]:
print (captions)
print (true_images)
# print (dual_captions)
# print (neg_images)
print (simpleClip_caption)
print (simpleClip_image)




{'1': 'A man preparing desserts in a kitchen covered in frosting.', '5': 'A black and white image of a man in a suit wearing glasses walking through a door.', '7': 'A large bus and some people on the street.', '8': 'A bicycle parked in a kitchen with a stove and cabinets.', '9': 'Two people in a food truck, one looking at an order.', '11': 'A person is cutting a roast with a fork and knife.', '12': 'a kitchen with a table and some chairs ', '14': 'A chef preparing food inside of a kitchen near  a window.', '15': 'Adults using laptop computers while sitting at outdoor venue.', '16': 'A group of men at a table preparing food together', '19': 'Dining room table set for a casual meal, with flowers.', '21': 'Two people flying a kite above pine trees.', '23': 'Several kitchen workers making dishes in commercial kitchen.', '25': 'A man laying on his stomach with a towel on his head.', '26': 'A small cluttered kitchen with a window and sink.', '28': 'People on a skateboard ramp with one doing 

In [None]:
valid_ids = [k for k,v in neg_images.items() if v]

# rebuild each dict to contain only valid_ids
captions    = { k: captions[k]    for k in valid_ids }
true_images = { k: true_images[k] for k in valid_ids }
neg_images  = { k: neg_images[k]  for k in valid_ids }
dual_captions = { k: dual_captions[k] for k in valid_ids}

captions = { k: v[0] for k, v in captions.items() }
true_images = { k: v[0] for k, v in true_images.items() }



for k in neg_images:
    neg_images[k] = [str(BASE / p) for p in neg_images[k]]

In [None]:
import os
import pandas as pd

# captions   = {"1": "ewfwfwef",
#               "2": "Dining room table set for a casual meal,with flowers.",
#               "3": "A large bus and some people on the street.",
#               "4": "A bicycle parked in a kitchen with a stove and cabinets.",
#               "5": "A group of men at a table preparing food together.",
#               "6": "Adults using laptop computers while sitting at outer venue",
#               "7": "A man preparing desserts in a kitchen covered in frosting",
#               "8": "Two people in a food truck, one looking at an order"}
# true_images = {"1": "./data/COCO_train2014_000000161919.jpg", "2": "./data/COCO_train2014_000000071631.jpg",
#                "3": "./data/COCO_train2014_000000392136.jpg", "4": "./data/COCO_train2014_000000398494.jpg",
#                "5": "./data/COCO_train2014_000000405613.jpg", "6": "./data/COCO_train2014_000000170558.jpg",
#                "7": "./data/COCO_train2014_000000384029.jpg", "8": "./data/COCO_train2014_000000090570.jpg"}
# neg_images  = {
#     "1": ["./data/161919-person.jpg"],
#     "2": ["./data/71631-vase.jpg", "./data/71631-dining table.jpg"],
#     "3": ["./data/392136-person.jpg","./data/392136-bus.jpg"],
#     "4": ["./data/398494-bicycle.jpg"],
#     "5": ["./data/405613-person.jpg", "./data/405613-dining table.jpg"],
#     "6": ["./data/170558-laptop.jpg", "./data/170558-person.jpg"],
#     "7": ["./data/384029-cake.jpg", "./data/384029-person.jpg"],
#     "8": ["./data/90570-person.jpg"]
# }
# dual_captions   = {"1": ["dual "],
#               "2": ["dualmeal,with flowers.","dual2meal,with flowers."],
#               "3": ["dual street.","dua2lhen with a stove and cabinets."],
#               "4": ["dualhen with a stove and cabinets."],
#               "5": ["Adualring food together.","dua2lhen with a stove and cabinets."],
#               "6": ["Adualring food together.","dua2lhen with a stove and cabinets."],
#               "7": ["Adualring food together.","dua2lhen with a stove and cabinets."],
#               "8": ["Twdualing at an order"]}

# ─── 2. Determine max number of negatives ────────────────────────
max_negs = max(len(v) for v in neg_images.values())

# ─── 3. Build CSV rows ───────────────────────────────────────────
rows = []
for sample_id in captions:
    row = {
        "true_image": true_images[sample_id]
    }
    # Add neg_image_1, neg_image_2, ...
    negs = neg_images.get(sample_id, [])
    for i in range(max_negs):
        col_name = f"neg_image_{i+1}"
        row[col_name] = negs[i] if i < len(negs) else ""  # fill empty if missing
    row["caption"] = captions[sample_id]

    dual = dual_captions.get(sample_id, [])
    for j in range(max_negs):
        col = f"dual_caption_{j+1}"
        row[col] = dual[j] if j < len(dual) else ""
    rows.append(row)

# ─── 4. Convert to DataFrame and reorder columns ────────────────
df = pd.DataFrame(rows)

# Force 'caption' to be the last column
cols = [c for c in df.columns if c != "caption"] + ["caption"]
df = df[cols]

# ─── 5. Save CSV ────────────────────────────────────────────────
df.to_csv("data.csv", index=False)
print(f"✅ Saved {len(df)} rows to 'data.csv' with {max_negs} negative columns and caption last.")


✅ Saved 8066 rows to 'data.csv' with 5 negative columns and caption last.


In [None]:
CSV_FILE   = "data.csv"        # wide-format CSV: true_image, neg_image_1…neg_image_k, caption
MODEL_NAME = "openai/clip-vit-base-patch32"
OUTPUT_DIR = "clip_hardneg_margin"
NUM_EPOCHS = 1
BATCH_SIZE = 32
LR         = 1e-5
MARGIN     = 0.1             # margin for contrastive loss
DEVICE     = "cuda" if torch.cuda.is_available() else "cpu"

os.makedirs(OUTPUT_DIR, exist_ok=True)

Train a CLIP model from scratch

In [None]:
from torch.utils.data import Dataset, DataLoader

In [None]:
examples = []
for key, (caption, _, _) in simpleClip_caption.items():
    img_path, coco_id = simpleClip_image[key]
    examples.append({
        "image_id":  int(key),
        "coco_id":   coco_id,
        "caption":   caption,
        "image_path": img_path
    })

In [None]:
print(examples)

[{'image_id': 0, 'coco_id': 57870, 'caption': 'A restaurant has modern wooden tables and chairs.', 'image_path': 'train2014/COCO_train2014_000000057870.jpg'}, {'image_id': 1, 'coco_id': 384029, 'caption': 'A man preparing desserts in a kitchen covered in frosting.', 'image_path': 'train2014/COCO_train2014_000000384029.jpg'}, {'image_id': 2, 'coco_id': 222016, 'caption': 'a big red telephone booth that a man is standing in', 'image_path': 'train2014/COCO_train2014_000000222016.jpg'}, {'image_id': 3, 'coco_id': 520950, 'caption': 'the kitchen is full of spices on the rack', 'image_path': 'train2014/COCO_train2014_000000520950.jpg'}, {'image_id': 4, 'coco_id': 69675, 'caption': 'A child and woman are cooking in the kitchen.', 'image_path': 'train2014/COCO_train2014_000000069675.jpg'}, {'image_id': 5, 'coco_id': 547471, 'caption': 'A black and white image of a man in a suit wearing glasses walking through a door.', 'image_path': 'train2014/COCO_train2014_000000547471.jpg'}, {'image_id': 6,

In [None]:

# 3. Define a simple Dataset
class SimpleCLIPDataset(Dataset):
    def __init__(self, examples, processor):
        self.examples  = examples
        self.processor = processor

    def __len__(self):
        return len(self.examples)

    def __getitem__(self, idx):
      ex = self.examples[idx]
      img = Image.open(ex["image_path"]).convert("RGB")
      proc = processor(
        text=[ex["caption"]],
        images=[img],
        return_tensors="pt",
        padding="max_length",      # <-- pad *all* to max_length
        max_length=77             # CLIP’s default
      )
      return {
        "input_ids":      proc["input_ids"].squeeze(0),
        "attention_mask": proc["attention_mask"].squeeze(0),
        "pixel_values":   proc["pixel_values"].squeeze(0),
      }

# 4. Hyperparameters
DEVICE     = "cuda" if torch.cuda.is_available() else "cpu"
MODEL_SIZE = "openai/clip-vit-base-patch32"
BATCH_SIZE = 32
NUM_EPOCHS = 5
LR         = 5e-6

# 5. Instantiate random-init CLIP
config = CLIPConfig.from_pretrained(MODEL_SIZE)
model  = CLIPModel(config).to(DEVICE)


# 6. Processor (for transforms & tokenization)
processor = CLIPProcessor.from_pretrained(MODEL_SIZE)

# 7. DataLoader
dataset  = SimpleCLIPDataset(examples, processor)
loader   = DataLoader(dataset, batch_size=BATCH_SIZE, shuffle=True, num_workers=2)

# 8. Optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=LR)
for epoch in range(1, NUM_EPOCHS+1):
    model.train()
    running_loss = 0.0

    for batch_idx, batch in enumerate(loader, start=1):
        # Move to GPU
        batch = {k: v.to(DEVICE) for k,v in batch.items()}

        # Forward pass
        outputs = model(**batch)
        logits_per_image = outputs.logits_per_image  # (B, B)
        logits_per_text  = outputs.logits_per_text   # (B, B)

        # Labels: 0,1,...,B-1
        labels = torch.arange(logits_per_image.size(0), device=DEVICE)

        # Compute loss
        loss_i = F.cross_entropy(logits_per_image, labels)
        loss_t = F.cross_entropy(logits_per_text,  labels)
        loss   = (loss_i + loss_t) / 2

        # Backprop
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        running_loss += loss.item()

        # <-- print progress per batch -->
        print(f"Epoch {epoch}/{NUM_EPOCHS} — "
              f"Batch {batch_idx}/{len(loader)} — "
              f"Batch Loss: {loss.item():.4f}")

    avg_loss = running_loss / len(loader)
    print(f"✅ Finished Epoch {epoch}/{NUM_EPOCHS} — Avg Loss: {avg_loss:.4f}\n")

save_dir = "my_finetuned_clip4"
model.save_pretrained(save_dir)
processor.save_pretrained(save_dir)

Epoch 1/4 — Batch 1/313 — Batch Loss: 3.4720
Epoch 1/4 — Batch 2/313 — Batch Loss: 3.4570
Epoch 1/4 — Batch 3/313 — Batch Loss: 3.4289
Epoch 1/4 — Batch 4/313 — Batch Loss: 3.4591
Epoch 1/4 — Batch 5/313 — Batch Loss: 3.4346
Epoch 1/4 — Batch 6/313 — Batch Loss: 3.4021
Epoch 1/4 — Batch 7/313 — Batch Loss: 3.4456
Epoch 1/4 — Batch 8/313 — Batch Loss: 3.4637
Epoch 1/4 — Batch 9/313 — Batch Loss: 3.4164
Epoch 1/4 — Batch 10/313 — Batch Loss: 3.5269
Epoch 1/4 — Batch 11/313 — Batch Loss: 3.5618
Epoch 1/4 — Batch 12/313 — Batch Loss: 3.4310
Epoch 1/4 — Batch 13/313 — Batch Loss: 3.4019
Epoch 1/4 — Batch 14/313 — Batch Loss: 3.4531
Epoch 1/4 — Batch 15/313 — Batch Loss: 3.4435
Epoch 1/4 — Batch 16/313 — Batch Loss: 3.4919
Epoch 1/4 — Batch 17/313 — Batch Loss: 3.4355
Epoch 1/4 — Batch 18/313 — Batch Loss: 3.4171
Epoch 1/4 — Batch 19/313 — Batch Loss: 3.3949
Epoch 1/4 — Batch 20/313 — Batch Loss: 3.4867
Epoch 1/4 — Batch 21/313 — Batch Loss: 3.4570
Epoch 1/4 — Batch 22/313 — Batch Loss: 3.35

[]

Our fine tuned model

In [None]:
class CLIPHardNegDataset(Dataset):
    def __init__(self, csv_file, processor):
        self.df = pd.read_csv(csv_file)          # load the CSV into a DataFrame
        self.processor = processor               # CLIPProcessor for later use
        # find all columns whose name starts with "neg_image" → list of negative cols
        self.neg_cols = [c for c in self.df.columns if c.startswith("neg_image")]
        self.dual_captions = [c for c in self.df.columns if c.startswith("dual_caption")]
        # define the standard CLIP image transforms for ViT‑B/32
        self.transforms = Compose([
            Resize(224, interpolation=Image.BICUBIC),  # scale shortest side to 224
            CenterCrop(224),                          # take center 224×224 patch
            ToTensor(),                               # convert to [0,1] tensor, shape (C,H,W)
            Normalize(
              mean=processor.feature_extractor.image_mean,
              std=processor.feature_extractor.image_std
            )
        ])

    def __len__(self):
        return len(self.df)  # total number of rows (samples) in the CSV

    def __getitem__(self, idx):
        row = self.df.iloc[idx]  # get the idx‑th row
        # always start with the true image path
        paths = [row["true_image"]]
        # then append each non-empty negative image path
        for c in self.neg_cols:
            p = row[c]
            if isinstance(p, str) and p.strip():
                paths.append(p)
        # load & preprocess each image into a tensor
        imgs = []
        for p in paths:
            img = Image.open(p).convert("RGB")   # load and ensure 3 channels
            imgs.append(self.transforms(img))    # apply transforms
        pixel_values = torch.stack(imgs)        # stack into shape (N_images, 3, 224,224)
        caption = row["caption"]                # the text caption
        dual_caption = []
        for d in self.dual_captions:
          p = row[d]
          if isinstance(p, str) and p.strip():
                dual_caption.append(p)
        return {"pixel_values": pixel_values, "caption": caption, "dual_caption": dual_caption}

def collate_fn(batch):
    images = [b["pixel_values"] for b in batch]  # list of Tensors [(N1,3,224,224), (N2,3,224,224), …]
    captions = [b["caption"] for b in batch]     # list of strings, length = batch size
    dual_captions = [b["dual_caption"] for b in batch]
    return {"images": images, "captions": captions, "dual_captions": dual_captions}

In [None]:
print(CSV_FILE)

data.csv


In [None]:
# processor = CLIPProcessor.from_pretrained(MODEL_NAME)  # load CLIP tokenizer & image transforms
# model     = CLIPModel.from_pretrained(MODEL_NAME).to(DEVICE)  # load CLIP model onto GPU/CPU
model     = CLIPModel.from_pretrained("my_finetuned_clip2").to(DEVICE)
processor = CLIPProcessor.from_pretrained("my_finetuned_clip2")

BATCH_SIZE = 32
NUM_EPOCHS = 2
LR         = 5e-6
MARGIN     = 0.1

dataset   = CLIPHardNegDataset(CSV_FILE, processor)  # instantiate our custom dataset
dataloader= DataLoader(
    dataset,
    batch_size=BATCH_SIZE,
    shuffle=True,
    collate_fn=collate_fn,
    num_workers=4
)  # wrap in a DataLoader for batching and shuffling




In [None]:
print (len(dataloader))
print (len(loader))

253
313


Partial freeze model

In [None]:
for param in model.parameters():
    param.requires_grad = False

# ─── 3. Un-freeze last N_text layers of text encoder ───────────────────────
N_text = 2
text_layers = model.text_model.encoder.layers
for layer in text_layers[-N_text:]:
    for p in layer.parameters():
        p.requires_grad = True
# also un-freeze the final layer norm + projection head
model.text_model.final_layer_norm.requires_grad = True
model.text_projection.requires_grad            = True

# ─── 4. Un-freeze last M_vision layers of vision encoder ──────────────────
M_vision = 6
vision_layers = model.vision_model.encoder.layers
for layer in vision_layers[-M_vision:]:
    for p in layer.parameters():
        p.requires_grad = True
# and the visual projection head
model.visual_projection.requires_grad = True


In [None]:
optimizer = AdamW(model.parameters(), lr=LR)     # AdamW optimizer
total_steps = NUM_EPOCHS * len(dataloader)       # total training steps for scheduler
scheduler = get_scheduler(
    name="linear",
    optimizer=optimizer,
    num_warmup_steps=total_steps // 10,           # 10% warmup
    num_training_steps=total_steps
)



# Train method 1

In [None]:


epoch_losses = []
# ─── 6. TRAINING LOOP ─────────────────────────────────────────────────────────
for epoch in range(1, NUM_EPOCHS + 1):
    model.train()                 # set model to training mode
    running_loss = 0.0            # accumulator for epoch loss

    for batch_idx, batch in enumerate(dataloader, start=1):
        images_list = batch["images"]    # list of N_i images per sample
        captions    = batch["captions"]  # list of B captions
        dual_captions = batch["dual_captions"] # lisy of B dual captions
        # print(dual_captions)
        B = len(images_list)             # actual batch size

        # ─── 6.1 Text embeddings ───────────────────────────────────
        text_inputs = processor(
            text=captions,
            return_tensors="pt",
            padding=True
        ).to(DEVICE)
        caption_embeds = model.get_text_features(**text_inputs)  # (B, D)
        caption_embeds = F.normalize(caption_embeds, dim=-1)        # unit‑norm

        # ─── encode & average dual captions per sample ───────────────
        dual_embeds_list = []
        for duals in dual_captions:
            embeds = []
            if len(duals) > 0:
                dual_inputs = processor(text=duals, return_tensors="pt", padding=True).to(DEVICE)
                dual_embeds = model.get_text_features(**dual_inputs)  # (n_duals, D)
                dual_embeds = F.normalize(dual_embeds, dim=-1)
                embeds = [d for d in dual_embeds]
            dual_embeds_list.append(embeds)  # list of Tensors (D,)


        # ─── 6.2 Image embeddings ──────────────────────────────────
        # concatenate all samples' images into one big batch
        flat_imgs = torch.cat(images_list, dim=0).to(DEVICE)   # (sum N_i, 3,224,224)
        # image_inputs = processor(
        #     images=flat_imgs,
        #     return_tensors="pt"
        # ).to(DEVICE)
        # img_embeds_flat = model.get_image_features(**image_inputs)  # (sum N_i, D)
        # img_embeds_flat = F.normalize(img_embeds_flat, dim=-1)      # unit‑norm
        img_embeds_flat = model.get_image_features(pixel_values=flat_imgs)  # (sum N_i, D)
        img_embeds_flat = F.normalize(img_embeds_flat, dim=-1)

        # split back into per-sample groups of embeddings
        sizes  = [imgs.shape[0] for imgs in images_list]  # [N1, N2, ...]
        groups = img_embeds_flat.split(sizes, dim=0)      # list of length B


        # 1) collect all the “true” images in one (B, D) tensor
        true_imgs = torch.stack([g[0] for g in groups], dim=0)

        # # 2) full batch similarity: (B, B)
        # sims = true_imgs @ caption_embeds.T   # dot-product = cosine if normalized

        # # 3) mask out the diagonal (the matching pairs)
        # mask = torch.eye(B, device=sims.device).bool()
        # sims.masked_fill_(mask, -1e9)

        # # 4) hardest-neg per row and hinge
        # hardest_sims, _ = sims.max(dim=1)    # (B,)
        # loss_batch_hardneg = F.relu(MARGIN + hardest_sims).mean()


        # ─── 6.3 InfoNCE + Dual‐Caption Hinge Loss ─────────────────────
        # 1) compute true_imgs and caption_embeds as you already do above…

        # InfoNCE portion (standard CLIP loss)
        scale = model.logit_scale.exp()
        logits_per_image = scale * true_imgs @ caption_embeds.T    # (B, B)
        logits_per_text  = scale * caption_embeds @ true_imgs.T    # (B, B)
        labels = torch.arange(B, device=DEVICE)
        loss_i = F.cross_entropy(logits_per_image, labels)
        loss_t = F.cross_entropy(logits_per_text,  labels)
        infoce_loss = (loss_i + loss_t) / 2

        # Dual‐caption hinge penalty (weak weight)
        dual_terms = []
        for i, group in enumerate(groups):
          true_img = group[0]                  # (D,)
          sim_pos   = F.cosine_similarity(true_img, caption_embeds[i], dim=0)
          for d in dual_embeds_list[i]:        # each dual caption embed
        # penalize if dual is closer to the image than the true caption
            dual_terms.append(F.relu(F.cosine_similarity(true_img, d, dim=0) - sim_pos))

        if len(dual_terms) > 0:
          dual_loss = torch.stack(dual_terms).mean()
        else:
          dual_loss = torch.tensor(0.0, device=DEVICE)

        # final combined loss
        DUAL_WEIGHT = 0.1
        loss = infoce_loss + DUAL_WEIGHT * dual_loss
        # ─── 6.4 BACKPROP & OPTIMIZE ───────────────────────────────
        optimizer.zero_grad()   # clear previous gradients
        loss.backward()         # compute new gradients
        optimizer.step()        # update parameters
        scheduler.step()        # update learning rate schedule
        print(f"   Epoch {epoch} — batch {batch_idx}/{len(dataloader)} — loss {loss.item():.4f}")

        running_loss += loss.item()

    avg_loss = running_loss / len(dataloader)
    epoch_losses.append(avg_loss)

    # save model weights each epoch
    ckpt_path = os.path.join(OUTPUT_DIR, f"epoch{epoch:02d}.pt")
    torch.save(model.state_dict(), ckpt_path)

print("✅ Fine‑tuning complete. Checkpoints saved in", OUTPUT_DIR)


# Train method two


In [None]:


epoch_losses = []
# ─── 6. TRAINING LOOP ─────────────────────────────────────────────────────────
for epoch in range(1, NUM_EPOCHS + 1):
    model.train()                 # set model to training mode
    running_loss = 0.0            # accumulator for epoch loss

    for batch_idx, batch in enumerate(dataloader, start=1):
        images_list = batch["images"]    # list of N_i images per sample
        captions    = batch["captions"]  # list of B captions
        dual_captions = batch["dual_captions"] # lisy of B dual captions
        # print(dual_captions)
        B = len(images_list)             # actual batch size

        # ─── 6.1 Text embeddings ───────────────────────────────────
        text_inputs = processor(
            text=captions,
            return_tensors="pt",
            padding=True
        ).to(DEVICE)
        caption_embeds = model.get_text_features(**text_inputs)  # (B, D)
        caption_embeds = F.normalize(caption_embeds, dim=-1)        # unit‑norm

        # ─── encode & average dual captions per sample ───────────────
        dual_embeds_list = []
        for duals in dual_captions:
            embeds = []
            if len(duals) > 0:
                dual_inputs = processor(text=duals, return_tensors="pt", padding=True).to(DEVICE)
                dual_embeds = model.get_text_features(**dual_inputs)  # (n_duals, D)
                dual_embeds = F.normalize(dual_embeds, dim=-1)
                embeds = [d for d in dual_embeds]
            dual_embeds_list.append(embeds)  # list of Tensors (D,)


        # ─── 6.2 Image embeddings ──────────────────────────────────
        # concatenate all samples' images into one big batch
        flat_imgs = torch.cat(images_list, dim=0).to(DEVICE)   # (sum N_i, 3,224,224)
        # image_inputs = processor(
        #     images=flat_imgs,
        #     return_tensors="pt"
        # ).to(DEVICE)
        # img_embeds_flat = model.get_image_features(**image_inputs)  # (sum N_i, D)
        # img_embeds_flat = F.normalize(img_embeds_flat, dim=-1)      # unit‑norm
        img_embeds_flat = model.get_image_features(pixel_values=flat_imgs)  # (sum N_i, D)
        img_embeds_flat = F.normalize(img_embeds_flat, dim=-1)

        # split back into per-sample groups of embeddings
        sizes  = [imgs.shape[0] for imgs in images_list]  # [N1, N2, ...]
        groups = img_embeds_flat.split(sizes, dim=0)      # list of length B


        # 1) collect all the “true” images in one (B, D) tensor
        true_imgs = torch.stack([g[0] for g in groups], dim=0)

        # 2) full batch similarity: (B, B)
        sims = true_imgs @ caption_embeds.T   # dot-product = cosine if normalized

        # 3) mask out the diagonal (the matching pairs)
        mask = torch.eye(B, device=sims.device).bool()
        sims.masked_fill_(mask, -1e9)

        # 4) hardest-neg per row and hinge
        hardest_sims, _ = sims.max(dim=1)    # (B,)
        loss_batch_hardneg = F.relu(MARGIN + hardest_sims).mean()


        # ─── 6.3 MARGIN‑BASED LOSS ────────────────────────────────
        losses = []
        for i, group in enumerate(groups):
            true_img = group[0]
            neg_imgs = group[1:]        # (N_neg, D)

            caption_vec = caption_embeds[i]
            duals_i = dual_embeds_list[i]  # list of T tensors
            # true image and true caption -> Maximize

            # 1. similarity: true image ↔ true caption
            sim_pos = F.cosine_similarity(true_img, caption_vec, dim=0)

            # 2. penalize if any dual caption is *closer* than the true caption
            loss_trueimg_duals = sum(
              F.relu(F.cosine_similarity(true_img, d, dim=0) - sim_pos)
              for d in duals_i
            )

            # 3. for each negative image, compare sim(neg, true_caption) vs sim(neg, dual_caption)
            loss_negimgs_caption = 0.0
            for j in range(min(len(neg_imgs), len(duals_i))):
              sim_neg_dual = F.cosine_similarity(neg_imgs[j], duals_i[j], dim=0)
              sim_neg_true = F.cosine_similarity(neg_imgs[j], caption_vec, dim=0)
              loss_negimgs_caption += F.relu(sim_neg_dual - sim_neg_true)


            # (4) POSITIVE: paired (neg_img_j, dual_caption_j) -> MAXIMIZE
            loss_negimg_dualcaption = 0.0
            for j in range(min(len(neg_imgs), len(duals_i))):
                sim = F.cosine_similarity(neg_imgs[j], duals_i[j], dim=0)
                loss_negimg_dualcaption += -sim

            # loss_pos = -sim_pos

            # cross sample error
            # total_loss = (-sim_pos) - (0.8*loss_negimg_dualcaption)
            total_loss = (-sim_pos) + (0.1 * loss_trueimg_duals) + (0.1 * loss_negimgs_caption) - (0.5*loss_negimg_dualcaption)
            losses.append(total_loss)

        loss = torch.stack(losses).mean() + 0.8 * loss_batch_hardneg
        # average over the batch
        # ─── 6.4 BACKPROP & OPTIMIZE ───────────────────────────────
        optimizer.zero_grad()   # clear previous gradients
        loss.backward()         # compute new gradients
        optimizer.step()        # update parameters
        scheduler.step()        # update learning rate schedule
        print(f"   Epoch {epoch} — batch {batch_idx}/{len(dataloader)} — loss {loss.item():.4f}")

        running_loss += loss.item()

    avg_loss = running_loss / len(dataloader)
    epoch_losses.append(avg_loss)

    # save model weights each epoch
    ckpt_path = os.path.join(OUTPUT_DIR, f"epoch{epoch:02d}.pt")
    torch.save(model.state_dict(), ckpt_path)

print("✅ Fine‑tuning complete. Checkpoints saved in", OUTPUT_DIR)


In [None]:
plt.figure(figsize=(8, 5))
plt.plot(
    list(range(1, NUM_EPOCHS + 1)),
    epoch_losses,
    marker='o',
    linestyle='-'
)
plt.title("Training Loss per Epoch")
plt.xlabel("Epoch")
plt.ylabel("Average Loss")
plt.grid(True)
plt.tight_layout()
plt.show()

Below are for test

In [None]:
!pip install datasets
import torch
from torchvision.datasets import CIFAR10
from torchvision.datasets import Food101
from torchvision.datasets import CIFAR100
from torchvision.datasets import StanfordCars
from torchvision.datasets import Caltech101
from transformers import CLIPProcessor, CLIPModel
from tqdm import tqdm

import torch
from torchvision import datasets
from transformers import CLIPProcessor, CLIPModel
from tqdm import tqdm

# 设置模型
device = "cuda" if torch.cuda.is_available() else "cpu"
model     = CLIPModel.from_pretrained("my_CLIP base from 3").to(DEVICE)
processor = CLIPProcessor.from_pretrained("my_CLIP base from 3")
# ✅ 支持的数据集
DATASETS = {
    "cifar10": lambda: datasets.CIFAR10("./data", train=False, download=True),
    "cifar100": lambda: datasets.CIFAR100("./data", train=False, download=True),
    "food101": lambda: datasets.Food101("./data", split="test", download=True),
    "stl10": lambda: datasets.STL10("./data", split="test", download=True),
}

for dataset_name in DATASETS.keys():
  dataset = DATASETS[dataset_name]()
  classnames = dataset.classes
  prompts = [f"a photo of a {name}" for name in classnames]

  # 编码文本 prompts
  with torch.no_grad():
      text_inputs = processor(text=prompts, return_tensors="pt", padding=True).to(device)
      text_features = model.get_text_features(**text_inputs)
      text_features = text_features / text_features.norm(dim=-1, keepdim=True)

  # 逐张测试图像
  correct = 0
  for image, label in tqdm(dataset, desc=f"Zero-shot on {dataset_name}"):
      inputs = processor(images=image, return_tensors="pt").to(device)
      with torch.no_grad():
          image_features = model.get_image_features(**inputs)
          image_features = image_features / image_features.norm(dim=-1, keepdim=True)
          logits = (image_features @ text_features.T).softmax(dim=-1)
          pred = logits.argmax(dim=-1).item()
          correct += int(pred == label)

  # 输出结果
  accuracy = correct / len(dataset)
  print(f"\n✅ Zero-shot Accuracy on {dataset_name}: {accuracy:.4f}")


Test two for simple sample

In [None]:

def compute_two_similarities(image_paths, caption, model, processor, device=None):
    """
    image_paths: list of two image filepaths
    caption:     single string
    model:       your fine-tuned CLIPModel in eval() mode
    processor:   your CLIPProcessor
    device:      torch device (e.g. "cuda" or "cpu"); if None, infer from model
    """
    model.eval()
    if device is None:
        device = next(model.parameters()).device

    # 1. Load & preprocess
    images = [Image.open(p).convert("RGB") for p in image_paths]
    # replicate the caption for each image so batching works
    texts = [caption, caption]

    inputs = processor(
        text=texts,
        images=images,
        return_tensors="pt",
        padding=True
    ).to(device)

    # 2. Embed
    with torch.no_grad():
        image_feats = model.get_image_features(pixel_values=inputs["pixel_values"])
        text_feats  = model.get_text_features(
            input_ids=inputs["input_ids"],
            attention_mask=inputs["attention_mask"]
        )

        # 3. Normalize
        image_feats = F.normalize(image_feats, dim=-1)  # shape (2, D)
        text_feats  = F.normalize(text_feats,  dim=-1)  # shape (2, D)

        # 4. Cosine sims: since each text is the same, we can take
        #    the diagonal of image_feats @ text_feats.T
        sims = (image_feats @ text_feats.T).diag()       # shape (2,)

    return sims.cpu().tolist()


# ─── Example usage ─────────────────────────────────────────────────────────────
img_paths = ["./train2014/COCO_train2014_000000170558.jpg", "./170558person.jpg"]
caption   = "Adults using laptop computers while sitting at outer venue"

# assumes `model` and `processor` are already loaded & on the right device
scores = compute_two_similarities(img_paths, caption, model, processor)
print(f"Similarity to image1: {scores[0]:.4f}")
print(f"Similarity to image2: {scores[1]:.4f}")

Similarity to image1: 0.4362
Similarity to image2: -0.2005


In [None]:
img_paths = ["./OriImg.jpg", "./modified.jpg"]
caption   = "People enjoying the weather in boats on a canal."

# assumes `model` and `processor` are already loaded & on the right device
scores = compute_two_similarities(img_paths, caption, model, processor)
print(f"Similarity to image1: {scores[0]:.4f}")
print(f"Similarity to image2: {scores[1]:.4f}")

Similarity to image1: 0.4197
Similarity to image2: 0.1250


In [None]:
img_paths = ["./OriImg.jpg", "./modified.jpg"]
caption   = "People enjoying the weather in boats on a canal."

# assumes `model` and `processor` are already loaded & on the right device
scores = compute_two_similarities(img_paths, caption, model, processor)
print(f"Similarity to image1: {scores[0]:.4f}")
print(f"Similarity to image2: {scores[1]:.4f}")

Test three to see which example have different result

In [None]:
import torch
from torchvision import datasets
from transformers import CLIPProcessor, CLIPModel
from tqdm.auto import tqdm
from PIL import Image

# ————— Setup —————
device    = "cuda" if torch.cuda.is_available() else "cpu"
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32")

# Model A: off-the-shelf CLIP
model_a   = CLIPModel.from_pretrained("my_finetuned_clip5").to(device)
# processor = CLIPProcessor.from_pretrained("my_finetuned_clip5")

# Model B: your fine-tuned checkpoint (path or HF repo ID)
model_b   = CLIPModel.from_pretrained("my_CLIP base from 3").to(device)
# processor_b = CLIPProcessor.from_pretrained("my_CLIP base from 3")


# CIFAR-10 test split
cifar10   = datasets.CIFAR10("./data", train=False, download=True)

# Prepare the text prompts & features once
classnames    = cifar10.classes
prompts       = [f"a photo of a {c}" for c in classnames]
text_inputs   = processor(text=prompts, return_tensors="pt", padding=True).to(device)

with torch.no_grad():
    tf_a     = model_a.get_text_features(**text_inputs)
    tf_a    /= tf_a.norm(dim=-1, keepdim=True)
    scale_a  = model_a.logit_scale.exp()

    tf_b     = model_b.get_text_features(**text_inputs)
    tf_b    /= tf_b.norm(dim=-1, keepdim=True)
    scale_b  = model_b.logit_scale.exp()

# ————— Compare across the test set —————
mismatches = []
for idx, (pil_img, label) in enumerate(tqdm(cifar10, desc="CIFAR-10 eval")):
    inputs = processor(images=pil_img, return_tensors="pt").to(device)
    with torch.no_grad():
        # image features
        ia = model_a.get_image_features(**inputs)
        ia /= ia.norm(dim=-1, keepdim=True)
        ib = model_b.get_image_features(**inputs)
        ib /= ib.norm(dim=-1, keepdim=True)

        # logits & preds
        la = (ia @ tf_a.T) * scale_a
        lb = (ib @ tf_b.T) * scale_b
        pa = int(la.argmax(dim=-1))
        pb = int(lb.argmax(dim=-1))

    ca = (pa == label)
    cb = (pb == label)
    if ca != cb:
        mismatches.append({
            "index":       idx,
            "true_label":  classnames[label],
            "pred_A":      classnames[pa],
            "pred_B":      classnames[pb],
            "correct_A":   ca,
            "correct_B":   cb
        })

# ————— Inspect the first few —————
for m in mismatches[:10]:
    print(m)


CIFAR-10 eval:   0%|          | 0/10000 [00:00<?, ?it/s]

{'index': 4, 'true_label': 'frog', 'pred_A': 'bird', 'pred_B': 'frog', 'correct_A': False, 'correct_B': True}
{'index': 25, 'true_label': 'bird', 'pred_A': 'bird', 'pred_B': 'frog', 'correct_A': True, 'correct_B': False}
{'index': 31, 'true_label': 'dog', 'pred_A': 'bird', 'pred_B': 'dog', 'correct_A': False, 'correct_B': True}
{'index': 39, 'true_label': 'dog', 'pred_A': 'dog', 'pred_B': 'ship', 'correct_A': True, 'correct_B': False}
{'index': 44, 'true_label': 'airplane', 'pred_A': 'bird', 'pred_B': 'airplane', 'correct_A': False, 'correct_B': True}
{'index': 49, 'true_label': 'frog', 'pred_A': 'deer', 'pred_B': 'frog', 'correct_A': False, 'correct_B': True}
{'index': 62, 'true_label': 'frog', 'pred_A': 'deer', 'pred_B': 'frog', 'correct_A': False, 'correct_B': True}
{'index': 63, 'true_label': 'cat', 'pred_A': 'deer', 'pred_B': 'cat', 'correct_A': False, 'correct_B': True}
{'index': 67, 'true_label': 'bird', 'pred_A': 'bird', 'pred_B': 'airplane', 'correct_A': True, 'correct_B': Fal

ARC Test

In [None]:
from datasets import load_dataset
from tqdm import tqdm

In [None]:
model     = CLIPModel.from_pretrained("my_finetuned_clip5").to(DEVICE)
processor = CLIPProcessor.from_pretrained("my_finetuned_clip5")

In [None]:
# 2. 加载 ARO 数据集（测试集）
dataset = load_dataset("gowitheflow/ARO-Visual-Attribution", split="test")

# 3. 评估 ARC 分数
correct = 0
total = 0
device = "cuda" if torch.cuda.is_available() else "cpu"

for sample in tqdm(dataset, desc="Evaluating ARO..."):
    try:
        image = sample['image']  # PIL Image
        true_caption = sample['true_caption']
        false_caption = sample['false_caption']

        inputs = processor(text=[true_caption, false_caption], images=image, return_tensors="pt", padding=True).to(device)
        outputs = model(**inputs)
        logits = outputs.logits_per_image  # shape [1, 2]

        if logits[0, 0] > logits[0, 1]:
            correct += 1
        total += 1
    except Exception as e:
        print(f"Skipping due to error: {e}")



Evaluating ARO...: 100%|██████████| 28748/28748 [14:51<00:00, 32.24it/s]


In [None]:
print(correct/total)

0.4982955336023376
