# Git clone CLIP and install package

In [None]:
!git clone https://github.com/openai/CLIP
%cd /kaggle/working
!pip install ftfy
%cd CLIP
!pip install pytorch-metric-learning
!pip install faiss-cpu
!pip install wikipedia
!pip install py_vncorenlp


# Word segmentation using VNCORE-NLP

In [None]:
import py_vncorenlp

# Automatically download VnCoreNLP components from the original repository
# and save them in some local machine folder
py_vncorenlp.download_model(save_dir='/kaggle/input/vncore-nlp')

# Load the word and sentence segmentation component
# if 'rdrsegmenter' not in globals() or not isinstance(rdrsegmenter, py_vncorenlp.VnCoreNLP):
rdrsegmenter = py_vncorenlp.VnCoreNLP(annotators=["wseg"], save_dir='/kaggle/input/vncore-nlp/test_model')

text = "Ông Nguyễn Khắc Chúc  đang làm việc tại Đại học Quốc gia Hà Nội. Bà Lan, vợ ông Chúc, cũng làm việc tại đây."

output1 = rdrsegmenter.word_segment(text)

print(output1)
# ['Ông Nguyễn_Khắc_Chúc đang làm_việc tại Đại_học Quốc_gia Hà_Nội .', 'Bà Lan , vợ ông Chúc , cũng làm_việc tại đây .']


['Ông Nguyễn_Khắc_Chúc đang làm_việc tại Đại_học Quốc_gia Hà_Nội .', 'Bà Lan , vợ ông Chúc , cũng làm_việc tại đây .']


# Training VNFOOD-CLIP

In [None]:
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from transformers import AutoModel, AutoTokenizer
import pandas as pd
from PIL import Image
from tqdm import tqdm
import os
import time
import torch.nn.functional as F
import numpy as np
from torch.optim import lr_scheduler
from pytorch_metric_learning.losses import SupConLoss
from transformers import ConvNextModel, ConvNextImageProcessor
scheduler = "CosineAnnealingLR"
weight_decay = 1e-6
learning_rate = 1e-5
t_max = 500
min_lr = 1e-5
df_result = None
backbone_vision = "facebook/convnext-large-384"
backbone_text = "vinai/phobert-base"


# -------- Dataset --------
class ImageTextDataset(Dataset):
    def __init__(self, data, processor, tokenizer):
        self.data = data
        self.img = self.data['image']
        self.cap_full = self.data['cap_full']
        self.label = self.data['label']
        self.processor = processor
        self.tokenizer = tokenizer

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        img = Image.open(self.img[index]).convert("RGB")
        img_process = self.processor(img, return_tensors = "pt")
        token = self.tokenizer(self.cap_full[index], return_tensors = 'pt', truncation = True, padding = 'max_length', max_length = 256)
        label = self.label[index]
        return img_process['pixel_values'], token['input_ids'], token['attention_mask'], label

# -------- Vision Encoder (ConvNext Large) --------
class VisionModel(nn.Module):
    def __init__(self, embed_dim = 768):
        super().__init__()
        self.convnext_large = ConvNextModel.from_pretrained(backbone_vision)
        self.projection_head = nn.Linear(1536, embed_dim)
        nn.init.xavier_normal_(self.projection_head.weight)
        nn.init.zeros_(self.projection_head.bias)
        self.drop = nn.Dropout(p = 0.2)

    def forward(self, x):
        x = x.squeeze(1)
        x = self.convnext_large(pixel_values = x)                    
        x = x.pooler_output
        x = self.projection_head(x)
        outputs = self.drop(x)
        return F.normalize(outputs, dim = -1)                      

# -------- Text Encoder (PhoBert base) --------
class TextModel(nn.Module):
    def __init__(self, embed_dim = 768):
        super().__init__()
        self.phobert = AutoModel.from_pretrained(backbone_text)
        self.projection_head = nn.Linear(self.phobert.config.hidden_size, embed_dim)
        nn.init.xavier_normal_(self.projection_head.weight)
        nn.init.zeros_(self.projection_head.bias)
        self.layer_norm = nn.LayerNorm(self.phobert.config.hidden_size)
    def forward(self, input_ids, attention_mask):
        input_ids = input_ids.squeeze(1).to(device)
        attention_mask = attention_mask.squeeze(1).to(device)
        outputs = self.phobert(input_ids = input_ids, attention_mask = attention_mask)
        # CLS token
        cls_embed = outputs.last_hidden_state[:, 0, :]
        projection = self.projection_head(cls_embed)
        outputs = self.layer_norm(projection)
        return F.normalize(outputs, dim = -1)

# -------- VNFOOD-CLIP --------
class Model(nn.Module):
    def __init__(self, embed_dim = 768):
        super().__init__()
        self.vision_encoder = VisionModel(embed_dim = embed_dim)
        self.text_encoder = TextModel(embed_dim=embed_dim)

    def forward(self, images, input_ids, attention_mask):
        image_features = self.vision_encoder(images)
        text_features = self.text_encoder(input_ids, attention_mask)
        return image_features, text_features
    
    def get_image_features(self, images):
        return self.vision_encoder(images)
    
    def get_text_features(self, input_ids, attention_mask):
        return self.text_encoder(input_ids, attention_mask)


# -------- Contrastive Loss --------
def supConLoss(image_embed, text_embed, labels):
    sup_loss = SupConLoss(temperature = 0.07)
    embed = torch.cat([image_embed, text_embed], dim = 0)
    labels = labels.repeat(2)
    loss = sup_loss(embed, labels)
    return loss


# -------- Training Loop --------
def train(model, dataloader, optimizer, device, scheduler = None, epochs = 5):
    start = time.time()
    model.train()
    save_weight_loss = {}
    save_weight_loss['model_name'] = "VNFood-CLIP"
    for epoch in range(1, epochs + 1):
        total_loss = 0.0
        for images, input_ids, att_mask, label in tqdm(dataloader, desc=f"Epoch_train {epoch}"):
            optimizer.zero_grad()
            images = images.to(device)
            input_ids = input_ids.to(device)
            att_mask = att_mask.to(device)
            label = label.to(device)
            image_embed, text_embed = model(images, input_ids, att_mask)
            loss = supConLoss(image_embed, text_embed, label)
            total_loss += loss.item()
            loss.backward()
            optimizer.step()
            
        scheduler.step()
        total_loss /= len(dataloader)
        save_weight_loss[f'epoch {epoch}'] = total_loss
        
        if epoch == 5:
            end = time.time()
            total_time_training = (end - start)
            save_weight_loss['total_time_training'] = total_time_training
        df_loss = pd.DataFrame(save_weight_loss, index = [0])
        df_loss.to_csv('/kaggle/working/result/vn_food2.csv', index = False)
        print(f"Epoch {epoch} Loss: {total_loss}")


if __name__ == "__main__":
    device = "cuda" if torch.cuda.is_available() else "cpu"
    tokenizer = AutoTokenizer.from_pretrained(backbone_text)
    processor = ConvNextImageProcessor.from_pretrained(backbone_vision)
    model = Model(embed_dim = 768).to(device)
    data_train = pd.read_csv("/kaggle/working/data/train_5k.csv")
    dataset_train = ImageTextDataset(data_train, processor, tokenizer)
    train_dataloader = DataLoader(dataset_train, batch_size = 16, shuffle = True)
    optimizer = torch.optim.AdamW(model.parameters(), lr = learning_rate)
    scheduler = lr_scheduler.CosineAnnealingLR(optimizer, T_max = t_max, eta_min = min_lr)
    train(model, train_dataloader, optimizer, device, scheduler = scheduler, epochs = 5)
    


# Save model

In [None]:
torch.save(model.state_dict(), "/kaggle/working/weights/vnfood_clip_v1.pth")

# Load model VNFOOD-CLIP after fine tuning

In [None]:
# -------- Vision Encoder (ConvNext Large) --------
class VisionModel(nn.Module):
    def __init__(self, embed_dim = 768):
        super().__init__()
        self.convnext_large = ConvNextModel.from_pretrained(backbone_vision)
        # self.convnext = nn.Sequential(*list(convnext_base.children())[: -2])
        self.projection_head = nn.Linear(1536, embed_dim)
        nn.init.xavier_normal_(self.projection_head.weight)
        nn.init.zeros_(self.projection_head.bias)
        self.drop = nn.Dropout(p = 0.2)

    def forward(self, x):
        x = x.squeeze(1)
        x = self.convnext_large(pixel_values = x)                    
        x = x.pooler_output
        x = self.projection_head(x)
        outputs = self.drop(x)
        return F.normalize(outputs, dim = -1)                      

# -------- Text Encoder (PhoBert base) --------
class TextModel(nn.Module):
    def __init__(self, embed_dim = 768):
        super().__init__()
        self.phobert = AutoModel.from_pretrained(backbone_text)
        self.projection_head = nn.Linear(self.phobert.config.hidden_size, embed_dim)
        nn.init.xavier_normal_(self.projection_head.weight)
        nn.init.zeros_(self.projection_head.bias)
        self.layer_norm = nn.LayerNorm(self.phobert.config.hidden_size)
    def forward(self, input_ids, attention_mask):
        input_ids = input_ids.squeeze(1).to(device)
        attention_mask = attention_mask.squeeze(1).to(device)
        outputs = self.phobert(input_ids = input_ids, attention_mask = attention_mask)
        # CLS token
        cls_embed = outputs.last_hidden_state[:, 0, :]
        projection = self.projection_head(cls_embed)
        outputs = self.layer_norm(projection)
        return F.normalize(outputs, dim = -1)

# -------- VNFOOD-CLIP --------
class Model(nn.Module):
    def __init__(self, embed_dim = 768):
        super().__init__()
        self.vision_encoder = VisionModel(embed_dim = embed_dim)
        self.text_encoder = TextModel(embed_dim=embed_dim)

    def forward(self, images, input_ids, attention_mask):
        image_features = self.vision_encoder(images)
        text_features = self.text_encoder(input_ids, attention_mask)
        return image_features, text_features
    
    def get_image_features(self, images):
        return self.vision_encoder(images)
    
    def get_text_features(self, input_ids, attention_mask):
        return self.text_encoder(input_ids, attention_mask)

model = Model()
model.load_state_dict(torch.load("/kaggle/working/weights/vnfood_clip_v2.pth", map_location = device, weights_only = True))

<All keys matched successfully>

# Function to evaluate model.
# This function implemment MAP@K metric.

In [None]:
import faiss
def mapK(queries, labels, df_sample, index, k, model, processor, tokenizer, task):
    """
    queries: number of query.
    labels: true label.
    df_sample: dataframe used to find the result
    index: FAISS index
    k: k result
    """
    Q = len(queries)
    print(task)
    print(f"Number of queries = {Q}")
    print(f"Size of database = {index.ntotal}")
    ap_scores = []
    model.eval()
    for q in range(Q):
        y_true = labels[q]
        if task == "T2I":
            segment = rdrsegmenter.word_segment(queries[q])
            caption = " ".join(word for word in segment)
            token = tokenizer(caption, return_tensors = 'pt', max_length = 256, padding = 'max_length', truncation = True)
            with torch.no_grad():
                embeddings = model.get_text_features(token['input_ids'].to(device), token['attention_mask'].to(device))
                embeddings /= embeddings.norm(dim = -1, keepdim = True)
        else:
            image = Image.open(queries[q]).convert('RGB')
            image = processor(image, return_tensors = "pt")
            image_stack = image['pixel_values']
            with torch.no_grad():
                embeddings = model.get_image_features(image_stack.to(device))
                embeddings /= embeddings.norm(dim = -1, keepdim = True)
        embeddings = embeddings.cpu().numpy()
        faiss.normalize_L2(embeddings)
        D, I = index.search(embeddings, k)
        result = df_sample.iloc[I[0]]['label'].tolist() # list label model retrieval
        ap_num = 0
        total = 0
        correct = 0
        for x in range(1, k + 1):
            if result[x - 1] == y_true:
                ap_num += 1
                rel_k = 1
                correct += 1
            else:
                rel_k = 0
            precision_k = ap_num / x
            total += precision_k * rel_k
        ap_scores.append(total / correct if correct > 0 else total)
    map_at_k = sum(ap_scores) / Q
    print(f"{round(map_at_k * 100, 4)}%")
