In [1]:
!pip install datasets
!pip install deep_translator



In [2]:
import json
from tqdm import tqdm
from PIL import Image
import pandas as pd
import torch
import random
import datasets
from torch.utils.data import Dataset, DataLoader
import json
import pandas as pd
from sqlalchemy import create_engine
from deep_translator import GoogleTranslator

In [3]:
from transformers import CLIPProcessor, CLIPModel

model_name = "openai/clip-vit-large-patch14"

model = CLIPModel.from_pretrained(model_name)
processor = CLIPProcessor.from_pretrained(model_name)

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


In [4]:
BASE_PATH = '/content/drive/MyDrive/social/'

images_path = f'{BASE_PATH}images/'
db_path = f'{BASE_PATH}metadata.db'
train_json_path = f'{BASE_PATH}train_images.json'

In [5]:
# Define the dataset class
class ImageCaptioningDataset(Dataset):
    def __init__(self, data, processor, max_length=77): # Add max_length
        """
        Args:
            data: List of tuples (image_path, caption).
            processor: CLIPProcessor for image and text preprocessing.
            max_length: Maximum sequence length for text.
        """
        self.data = data
        self.processor = processor
        self.max_length = max_length # Store max_length

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        image= self.data[idx]['image']
        caption = self.data[idx]['text']
        inputs = self.processor(images=image, text=caption, return_tensors="pt", padding=True, truncation=True, max_length=self.max_length) # Add truncation and max_length

        return {
            "pixel_values": inputs["pixel_values"].squeeze(0),
            "input_ids": inputs["input_ids"].squeeze(0),
            "attention_mask": inputs["attention_mask"].squeeze(0),
        }

In [6]:
engine = create_engine(f'sqlite:///{db_path}')
metadata_df = pd.read_sql('ImageData', engine)

translator = GoogleTranslator(source='es', target='en')
data_dict = {}
train_images_dict : dict[str, str]
with open(train_json_path, "r") as file:
    train_images_dict = json.load(file)

for id, name in train_images_dict.items():
    caption = metadata_df[metadata_df['id'] == int(id)][['caption']].values[0][0]
    image = Image.open(f'{images_path}{name}')
    caption = translator.translate(caption)
    data_dict[id] = {'image': image, 'text': [caption]}

In [7]:
data_list = list(data_dict.values())
dataset = datasets.Dataset.from_list(data_list)

In [8]:
train_dataset = ImageCaptioningDataset(dataset, processor)
train_dataloader = DataLoader(train_dataset, shuffle=True, batch_size=1)

In [9]:
print(train_dataset[0])

{'pixel_values': tensor([[[1.4486, 1.4194, 1.4340,  ..., 1.4194, 1.3464, 1.3902],
         [1.4632, 1.4486, 1.4340,  ..., 1.4048, 1.3902, 1.3902],
         [1.4632, 1.4340, 1.4340,  ..., 1.3902, 1.3902, 1.3756],
         ...,
         [1.5946, 1.5946, 1.5946,  ..., 1.5216, 1.4778, 1.5070],
         [1.5946, 1.6092, 1.5946,  ..., 1.5216, 1.4632, 1.4924],
         [1.5654, 1.5800, 1.5800,  ..., 1.5800, 1.5216, 1.5362]],

        [[1.4446, 1.3995, 1.4446,  ..., 1.4295, 1.3545, 1.3995],
         [1.4446, 1.4145, 1.4446,  ..., 1.4145, 1.3995, 1.3995],
         [1.4446, 1.4145, 1.4446,  ..., 1.3995, 1.3995, 1.3845],
         ...,
         [1.6697, 1.6697, 1.6697,  ..., 1.6096, 1.5496, 1.5646],
         [1.6697, 1.6697, 1.6697,  ..., 1.5946, 1.5346, 1.5496],
         [1.6997, 1.6997, 1.7147,  ..., 1.6096, 1.5646, 1.5646]],

        [[1.4491, 1.4065, 1.4491,  ..., 1.3780, 1.3069, 1.3496],
         [1.4633, 1.4349, 1.4349,  ..., 1.3638, 1.3496, 1.3496],
         [1.4633, 1.4349, 1.4349,  ..., 1

In [10]:
import torch.nn.functional as F

device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)

# Define optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-6)

# Training loop
num_epochs = 3  # Adjust epochs as needed
model.train()

for epoch in range(num_epochs):
    loop = tqdm(train_dataloader, leave=True)

    for batch in loop:
        optimizer.zero_grad()

        # Move data to device
        pixel_values = batch["pixel_values"].to(device)
        input_ids = batch["input_ids"].to(device)
        attention_mask = batch["attention_mask"].to(device)

        # Forward pass
        outputs = model(pixel_values=pixel_values, input_ids=input_ids, attention_mask=attention_mask)
        image_embeds = outputs.image_embeds  # Image embeddings
        text_embeds = outputs.text_embeds  # Text embeddings

        # Normalize embeddings
        image_embeds = F.normalize(image_embeds, p=2, dim=-1)
        text_embeds = F.normalize(text_embeds, p=2, dim=-1)

        # Compute cosine similarity
        logit_scale = model.logit_scale.exp()  # Scaling factor
        logits_per_image = torch.matmul(image_embeds, text_embeds.T) * logit_scale
        logits_per_text = logits_per_image.T  # Symmetric loss

        # Generate labels (identity matrix)
        batch_size = pixel_values.shape[0]
        labels = torch.arange(batch_size, device=device)

        # Compute contrastive loss
        loss_img = F.cross_entropy(logits_per_image, labels)
        loss_text = F.cross_entropy(logits_per_text, labels)
        loss = (loss_img + loss_text) / 2  # Final loss

        # Backward pass
        loss.backward()
        optimizer.step()

        # Update progress bar
        loop.set_description(f"Epoch {epoch+1}")
        loop.set_postfix(loss=loss.item())

print("Training complete!")

Epoch 1: 100%|██████████| 200/200 [01:34<00:00,  2.11it/s, loss=0]
Epoch 2: 100%|██████████| 200/200 [01:38<00:00,  2.03it/s, loss=0]
Epoch 3: 100%|██████████| 200/200 [01:42<00:00,  1.95it/s, loss=0]

Training complete!





In [11]:
# Save the model and processor locally
processor.save_pretrained(f"{BASE_PATH}trained_models/{model_name}_model_tuned_en_cap")
model.save_pretrained(f"{BASE_PATH}trained_models/{model_name}_processor_tuned_en_cap")