In [None]:
import torch
from transformers import CLIPProcessor, CLIPModel
import torch.nn.functional as F

# Initialize CLIP model and processor
device = "cuda" if torch.cuda.is_available() else "cpu"
model_name = "openai/clip-vit-base-patch16"
processor = CLIPProcessor.from_pretrained(model_name)
model = CLIPModel.from_pretrained(model_name).to(device)

# Define data loader (example data)
# Assuming you already have a DataLoader for loading COCO Caption data

# Initialize model parameters
optimizer = torch.optim.Adam(model.parameters(), lr=0.001)
num_epochs = 10

# Stochastic EM iterations
for epoch in range(num_epochs):
    for batch in dataloader:
        image_inputs = batch['image'].to(device)  # Image data
        text_inputs = batch['text']  # Text descriptions
        
        # Step 3: Calculate similarity scores (computed using CLIP here)
        image_features = model.encode_image(image_inputs)
        text_features = model.encode_text(text_inputs)
        similarity_scores = torch.matmul(image_features, text_features.t())

        # Step 5: Simulate auxiliary random variables and weights (needs to be implemented based on model-specific requirements)
        u = torch.rand(image_inputs.shape[0], device=device)  # Example random auxiliary variable
        w_plus = torch.rand(image_inputs.shape[0], device=device)  # Example positive pair weights
        w_minus = torch.rand(image_inputs.shape[0], image_inputs.shape[0], device=device)  # Example negative pair weights

        # Step 6: Calculate weighted contrastive loss
        contrastive_loss = -torch.log(F.softmax(similarity_scores, dim=1)[:, 0])  # Consider only the first column as positive
        
        weighted_contrastive_loss = (w_plus * similarity_scores[:, 0] / 
                                     (w_plus * similarity_scores[:, 0] + torch.sum(w_minus * similarity_scores, dim=1)))
        
        # Calculate loss
        loss = torch.mean(weighted_contrastive_loss)

        # Step 10: Update model parameters
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

    # Print loss for each epoch
    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {loss.item()}")

# After training, model parameters have been updated


In [None]:
#https://github.com/openai/CLIP/issues/57

import os
import torch
import glob
from PIL import Image
import random
import clip
from tqdm.notebook import tqdm
import numpy as np
from torchvision import transforms
import torch.nn as nn
import torch.optim as optim
import numpy as np
from tqdm.notebook import tqdm

EPOCH =10
BATCH_SIZE =256

device = "cuda:2" if torch.cuda.is_available() else "cpu" # If using GPU then use mixed precision training.
model, preprocess = clip.load("ViT-B/32",device=device,jit=False) #Must set jit=False for training

class cocodtrain(torch.utils.data.Dataset):
    def __init__(self, image_path='/home/jason/data/coco2014/images', text_path='/home/jason/data/coco2014/text', mode='train2014'):

        self.image_list = []
        self.image_list.extend(glob.glob(os.path.join(image_path, mode, '*.jpg')))
        self.image_list.sort()

        self.label_list = []
        self.label_list.extend(glob.glob(os.path.join(text_path, mode, '*.txt')))
        self.label_list.sort()

    def __len__(self):
        return len(self.image_list)

    def __getitem__(self, index):
        image = Image.open(self.image_list[index]).convert("RGB")
        image = image.resize((224,224), Image.BILINEAR)
        image = preprocess(image)
        #image = np.asarray(image)

        with open(self.label_list[index], "r") as f:
            data = f.readlines()
            label = random.choice(data)
            
        return image, label
trainset = cocodtrain('/home/jason/data/coco2014/images','/home/jason/data/coco2014/text','train2014')
trainloader = torch.utils.data.DataLoader(
                    trainset, 
                    batch_size=BATCH_SIZE,
                    shuffle=True, 
                    num_workers=16,
                    drop_last=True)

def convert_models_to_fp32(model): 
    for p in model.parameters(): 
        p.data = p.data.float() 
        p.grad.data = p.grad.data.float() 

#device = "cuda:3" if torch.cuda.is_available() else "cpu" # If using GPU then use mixed precision training.
#model, preprocess = clip.load("ViT-B/32",device=device,jit=False) #Must set jit=False for training

#clip.model.convert_weights(model) # Actually this line is unnecessary since clip by default already on float16

loss_img = nn.CrossEntropyLoss()
loss_txt = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=5e-5,betas=(0.9,0.98),eps=1e-6,weight_decay=0.2) #Params used from paper, the lr is smaller, more safe for fine tuning to new dataset

for epoch in range(EPOCH):
    print('epoch:', epoch)
    for batch in tqdm(trainloader):
        optimizer.zero_grad()
        list_image,list_txt = batch #list_images is list of image in numpy array(np.uint8), or list of PIL images
        # print(list_image.size()) #torch.Size([32, 3, 224, 224])
        print(len(list_txt))
      
        images = torch.tensor(np.stack(list_image)).to(device)
        texts = clip.tokenize(list_txt).to(device) #torch.Size([32, 77])
         # print(texts.size()) #torch.Size([32, 77])
        logits_per_image, logits_per_text = model(images, texts)
        ground_truth = torch.arange(BATCH_SIZE,dtype=torch.long,device=device)

        total_loss = (loss_img(logits_per_image,ground_truth) + loss_txt(logits_per_text,ground_truth))/2
        total_loss.backward()
        print('total loss:', total_loss)
      
        convert_models_to_fp32(model)
        optimizer.step()
        clip.model.convert_weights(model)
    
torch.save({
    'epoch': epoch,
    'model_state_dict': model.state_dict(),
    'optimizer_state_dict': optimizer.state_dict(),
    'loss': total_loss,
    }, f"model_checkpoint/model_10.pt") #just change to your preferred folder/filename      

In [None]:
# Info NCE loss 


import torch
import torchvision.transforms as T
from transformers import CLIPProcessor, CLIPModel
from torch.utils.data import DataLoader
from torchvision.datasets import CocoCaptions
import torch.nn.functional as F

# Load the CLIP model and processor
device = "cuda" if torch.cuda.is_available() else "cpu"
model = CLIPModel.from_pretrained("openai/clip-vit-base-patch16").to(device)
processor = CLIPProcessor.from_pretrained("openai/clip-vit-base-patch16")

# Load the COCO caption dataset
transform = T.Compose([
    T.Resize((224, 224)),
    T.ToTensor(),
    T.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])
coco_dataset = CocoCaptions(root='path_to_coco_dataset', annFile='path_to_annotations_file', transform=transform)

# Create a data loader
data_loader = DataLoader(coco_dataset, batch_size=16, shuffle=True)

# Define an optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=1e-4)

# Train the model
num_epochs = 5
temperature = 0.07  # InfoNCE temperature parameter

for epoch in range(num_epochs):
    model.train()
    total_loss = 0.0

    for images, captions in data_loader:
        # Image preprocessing
        image_inputs = processor(images, return_tensors="pt", padding=True).to(device)
        
        # Text encoding
        text_inputs = processor(captions, return_tensors="pt", padding=True, truncation=True).to(device)

        # Compute image and text features
        with torch.no_grad():
            image_features = model.get_image_features(**image_inputs)
            text_features = model.get_text_features(**text_inputs)

        # Calculate positive sample similarity
        sim_pos = (text_features @ image_features.T) / temperature

        # Randomly select a negative sample
        indices = torch.randperm(len(images)).to(device)
        neg_captions = captions[indices]

        # Calculate negative sample similarity
        text_inputs_neg = processor(neg_captions, return_tensors="pt", padding=True, truncation=True).to(device)
        text_features_neg = model.get_text_features(**text_inputs_neg)
        sim_neg = (text_features_neg @ image_features.T) / temperature

        # Calculate InfoNCE loss
        logits = torch.cat([sim_pos, sim_neg], dim=1)
        labels = torch.zeros(len(images), dtype=torch.long).to(device)
        loss = F.cross_entropy(logits, labels)

        # Backpropagation and optimization
        optimizer.zero_grad()
        loss.backward()
        optimizer.step()

        total_loss += loss.item()

    avg_loss = total_loss / len(data_loader)
    print(f"Epoch [{epoch + 1}/{num_epochs}] - Loss: {avg_loss:.4f}")

# Save the trained model
model.save_pretrained("path_to_save_model")
processor.save_pretrained("path_to_save_processor")
