In [1]:
import os
import math
from typing import List, Dict

from PIL import Image
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset, DataLoader

from transformers import CLIPModel, CLIPProcessor

2025-11-18 09:08:17.033605: I tensorflow/core/platform/cpu_feature_guard.cc:210] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

Using device: cuda


In [3]:
class LoRALinear(nn.Module):
    """
    LoRA wrapper around a Linear layer:
      y = x W^T + (alpha/r) * B(A(x))
    where A: in -> r, B: r -> out.
    """
    def __init__(self, base_layer: nn.Linear, r: int = 8, alpha: float = 16.0):
        super().__init__()
        self.in_features = base_layer.in_features
        self.out_features = base_layer.out_features
        self.r = r
        self.alpha = alpha
        self.scaling = alpha / r

        # Original (frozen) weight & bias
        self.weight = base_layer.weight
        self.bias = base_layer.bias

        # LoRA trainable weights
        self.lora_A = nn.Linear(self.in_features, r, bias=False)
        self.lora_B = nn.Linear(r, self.out_features, bias=False)

        # Init LoRA
        nn.init.kaiming_uniform_(self.lora_A.weight, a=math.sqrt(5))
        nn.init.zeros_(self.lora_B.weight)

        # Freeze original
        self.weight.requires_grad = False
        if self.bias is not None:
            self.bias.requires_grad = False

    def forward(self, x: torch.Tensor) -> torch.Tensor:
        base = F.linear(x, self.weight, self.bias)
        lora_out = self.lora_B(self.lora_A(x)) * self.scaling
        return base + lora_out


In [4]:
def apply_lora_to_clip_attn(model: nn.Module, r: int = 8, alpha: float = 16.0):
    """
    Replace all q_proj and v_proj Linear layers in CLIP with LoRALinear.
    """
    for module_name, module in model.named_modules():
        for child_name, child in list(module.named_children()):
            if isinstance(child, nn.Linear) and child_name in ["q_proj", "v_proj"]:
                lora_layer = LoRALinear(child, r=r, alpha=alpha)
                setattr(module, child_name, lora_layer)



In [5]:
BASE_DIR = os.getcwd()
print("BASE_DIR:", BASE_DIR)

def ap(rel_path: str) -> str:
    return os.path.join(BASE_DIR, rel_path)

BASE_DIR: /anvil/projects/x-cis250308/unlearning_test


In [6]:

def build_data() -> List[Dict]:
    # TODO: make sure these files actually exist
    return [
        {
            "images": [
                ap("set1_data/1.png"),
                ap("set2_data/1.png"),
                ap("set3_data/1.png"),
            ],
            "texts": [
                "A tasty hamburger served with fries and ketchup",
                "A kid enjoying their time outside",
                "A kid enjoying a delicious burger",
            ],
        },
        {
            "images": [
                ap("set1_data/1.png"),
                ap("set2_data/1.png"),
                ap("set3_data/1.png"),
            ],
            "texts": [
                "A delicious hamburger served with fries and ketchup",
                "A kid enjoying their time in the park",
                "A kid eating a good burger",
            ],
        },
    ]



In [7]:
class TripleDataset(Dataset):
    def __init__(self, data: List[Dict]):
        self.data = data

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        entry = self.data[idx]
        imgs = []
        for p in entry["images"]:
            if not os.path.exists(p):
                raise FileNotFoundError(f"Image not found: {p}")
            imgs.append(Image.open(p).convert("RGB"))
        return {"images": imgs, "texts": entry["texts"]}



In [8]:
def collate_fn(batch: List[Dict]) -> Dict:
    all_images, all_texts = [], []
    for item in batch:
        all_images.extend(item["images"])
        all_texts.extend(item["texts"])
    return {"images": all_images, "texts": all_texts}


In [9]:
def load_clip_with_manual_lora():
    # 1) Load CLIP on CPU first
    model = CLIPModel.from_pretrained("openai/clip-vit-base-patch32", cache_dir = "./clip_cache")
    processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch32", cache_dir = "./clip_cache")

    # 2) Freeze all original params
    for p in model.parameters():
        p.requires_grad = False

    # 3) Inject LoRA into q_proj and v_proj (still on CPU)
    apply_lora_to_clip_attn(model, r=8, alpha=16.0)

    # 4) Move entire model (including LoRA layers) to device
    model.to(device)

    # 5) Count params
    total, trainable = 0, 0
    for p in model.parameters():
        total += p.numel()
        if p.requires_grad:
            trainable += p.numel()
    print(f"Total params: {total}, trainable (LoRA): {trainable}")

    return model, processor


In [10]:
def clip_loss(image_embeds, text_embeds, temp: float = 0.07):
    image_embeds = F.normalize(image_embeds, dim=-1)
    text_embeds = F.normalize(text_embeds, dim=-1)

    logits = image_embeds @ text_embeds.T / temp
    labels = torch.arange(logits.size(0), device=logits.device)

    loss_i2t = F.cross_entropy(logits, labels)
    loss_t2i = F.cross_entropy(logits.T, labels)
    return (loss_i2t + loss_t2i) / 2



In [11]:
data = build_data()

In [13]:
dataset = TripleDataset(data)

In [20]:
loader = DataLoader(
    dataset,
    batch_size=1,
    shuffle=False,
    collate_fn=collate_fn
)

In [26]:
for batch in loader:
    print(batch)

{'images': [<PIL.Image.Image image mode=RGB size=1024x1024 at 0x14A8FFEAE250>, <PIL.Image.Image image mode=RGB size=1024x682 at 0x14A8FFEAC7C0>, <PIL.Image.Image image mode=RGB size=1024x684 at 0x14A90C46E250>], 'texts': ['A tasty hamburger served with fries and ketchup', 'A kid enjoying their time outside', 'A kid enjoying a delicious burger']}
{'images': [<PIL.Image.Image image mode=RGB size=1024x1024 at 0x14A90C46FA50>, <PIL.Image.Image image mode=RGB size=1024x682 at 0x14A8FFE17E30>, <PIL.Image.Image image mode=RGB size=1024x684 at 0x14A8FFB458B0>], 'texts': ['A delicious hamburger served with fries and ketchup', 'A kid enjoying their time in the park', 'A kid eating a good burger']}


In [27]:
model, processor = load_clip_with_manual_lora()

Total params: 151768833, trainable (LoRA): 491520


In [28]:
optimizer = torch.optim.AdamW(
    [p for p in model.parameters() if p.requires_grad],
    lr = 1e-4,
)

In [None]:
model.train()

In [31]:
batch = None
for b in loader:
    batch = b
    break

batch

{'images': [<PIL.Image.Image image mode=RGB size=1024x1024>,
  <PIL.Image.Image image mode=RGB size=1024x682>,
  <PIL.Image.Image image mode=RGB size=1024x684>],
 'texts': ['A tasty hamburger served with fries and ketchup',
  'A kid enjoying their time outside',
  'A kid enjoying a delicious burger']}

In [32]:
inputs = processor(
    text = batch["texts"],
    images = batch["images"],
    padding = True,
    return_tensors="pt",
).to(device)

outputs = model(
    input_ids = inputs["input_ids"],
    attention_mask = inputs["attention_mask"],
    pixel_values = inputs["pixel_values"],
)

In [33]:
image_embeds = outputs.image_embeds

In [43]:
image_embeds[0].shape

torch.Size([512])

In [37]:
text_embeds = outputs.text_embeds

In [42]:
text_embeds[0].shape

torch.Size([512])

In [64]:
from torch.nn.functional import cosine_similarity

def calculate_loss(image_embeds, text_embeds):
    l1 = cosine_similarity(image_embeds[0], text_embeds[0], dim = 0)
    l2 = cosine_similarity(image_embeds[0], text_embeds[0], dim = 0)
    l3 = cosine_similarity(image_embeds[2], text_embeds[2], dim = 0)

    loss = l1 + 0.3*l2 - 0.7*l3
    
    return loss

In [65]:
loss = calculate_loss(image_embeds, text_embeds, )

In [67]:
optimizer.zero_grad()
loss.backward()
optimizer.step()