In [28]:
import torch
import torch.nn as nn
# import torchtext
import os
import random
import numpy as np
import pandas as pd
import spacy
import timm
import matplotlib.pyplot as plt

from PIL import Image
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
# from torchtext.data.utils import get_tokenizer
# from torchtext.vocab import build_vocab_from_iterator
from torchvision import transforms
from transformers import ViTFeatureExtractor, ViTModel, RobertaTokenizer, RobertaModel  

In [29]:
def set_seed(seed):
    torch.manual_seed(seed)
    torch.cuda.manual_seed(seed)
    torch.cuda.manual_seed_all(seed)
    np.random.seed(seed)
    random.seed(seed)
    torch.backends.cudnn.deterministic = True
    torch.backends.cudnn.benchmark = False


seed = 59
set_seed(seed)

### Chia bộ train-test

In [30]:
import re

# Chuỗi ban đầu
line = "COCO_val2014_000000396568.jpg#0 extra text"

# Sử dụng regex để xóa mọi thứ sau dấu # và dừng lại khi gặp khoảng trắng
line = re.sub(r'#\S*', '', line)

# Loại bỏ khoảng trắng thừa ở cuối (nếu cần)
line = line.strip()

print(line)  # Kết quả: COCO_val2014_000000396568.jpg

COCO_val2014_000000396568.jpg extra text


In [31]:
train_data = []
train_set_path = './vqa_coco_dataset/vaq2.0.TrainImages.txt'

with open(train_set_path, 'r') as f:
    lines = f.readlines()
    for line in lines:
        line = re.sub(r'#\S*', '', line)
        temp = line.split('\t')
        qa = temp[1].split('?')

        if len(qa) == 2:
            answer = qa[1].strip()
        elif len(qa) == 3:
            answer = qa[2].strip()

        data_sample = {
            'image_path': temp[0],
            'question': qa[0] + '?',
            'answer': answer
        }
        train_data.append(data_sample)


val_data = []
val_set_path = './vqa_coco_dataset/vaq2.0.DevImages.txt'

with open(train_set_path, 'r') as f:
    lines = f.readlines()
    for line in lines:
        line = re.sub(r'#\S*', '', line)
        temp = line.split('\t')
        qa = temp[1].split('?')

        if len(qa) == 2:
            answer = qa[1].strip()
        elif len(qa) == 3:
            answer = qa[2].strip()

        data_sample = {
            'image_path': temp[0],
            'question': qa[0] + '?',
            'answer': answer
        }
        val_data.append(data_sample)


test_data = []
test_set_path = './vqa_coco_dataset/vaq2.0.TestImages.txt'

with open(train_set_path, 'r') as f:
    lines = f.readlines()
    for line in lines:
        line = re.sub(r'#\S*', '', line)
        temp = line.split('\t')
        qa = temp[1].split('?')

        if len(qa) == 2:
            answer = qa[1].strip()
        elif len(qa) == 3:
            answer = qa[2].strip()

        data_sample = {
            'image_path': temp[0],
            'question': qa[0] + '?',
            'answer': answer
        }
        test_data.append(data_sample)

### Xây dựng tokenize

In [43]:
def tokenize(text, max_seq_length, device):
    tokenizer = RobertaTokenizer.from_pretrained("FacebookAI/roberta-base")
    encode_text = tokenizer(
        text,
        max_length=max_seq_length,
        padding="max_length",
        truncation=True,
        return_tensors="pt"
    )
    encoded = {key: value.squeeze().to(device)
               for key, value in encode_text.items()}
    if len(encode_text['input_ids']) > max_seq_length:
        encoded = {key: value[:max_seq_length]
                   for key, value in encode_text.items()}
    return encoded

def feature_extractor(img, device):
    feature_extractor = ViTFeatureExtractor.from_pretrained(
        'google/vit-base-patch16-224-in21k'
    )
    inputs = feature_extractor(images=img, return_tensors="pt")
    encoded = {key: value.squeeze().to(device)
               for key, value in inputs.items()}
    return encoded

In [46]:
## Test hàm tokenize và feature_extractor

import torch
from PIL import Image

# Thiết bị
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Test hàm tokenize
sample_text = "This is a sample text."
max_seq_length = 20
encoded_text = tokenize(sample_text, max_seq_length, device)
print("Tokenized output shape:", encoded_text["input_ids"].shape)

# Test hàm feature_extractor
sample_img_path = "./vqa_coco_dataset/val2014-resised/COCO_val2014_000000000133.jpg"
img = Image.open(sample_img_path).convert('RGB')
features = feature_extractor(img, device)
print("Extracted feature shape:", features["pixel_values"].shape)

Tokenized output shape: torch.Size([20])
Extracted feature shape: torch.Size([3, 224, 224])


### Xây dựng mapping labels dictionary

In [47]:
labels = set(
    sample['answer'] for sample in train_data
)

label2id = {label: i for i, label in enumerate(labels)}
id2label = {i: label for label, i in label2id.items()}

print(label2id)

{'yes': 0, 'no': 1}


### Xây dựng Pytorch datasets

In [48]:
class VQADatasets(Dataset):
    def __init__(self, data, label2idx, max_seq_length=20, transform=None, tokenize=None, feature_extractor=None, device='cpu', img_dir='./vqa_coco_dataset/val2014-resised'):
        super().__init__()
        self.data = data
        self.label2idx = label2idx
        self.max_seq_length = max_seq_length
        self.transform = transform
        self.tokenize = tokenize
        self.feature_extractor = feature_extractor
        self.img_dir = img_dir
        self.device = device

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        # Lấy đường dẫn ảnh
        img_path = os.path.join(self.img_dir, self.data[index]['image_path'])

        # Mở ảnh và chuyển sang RGB
        img = Image.open(img_path).convert('RGB')

        # Áp dụng transform nếu có
        if self.transform:
            img = self.transform(img)

        # Trích xuất đặc trưng ảnh
        img = self.feature_extractor(img, self.device)

        # Xử lý câu hỏi
        questions = self.data[index]['question']
        questions = self.tokenize(questions, self.max_seq_length, self.device)

        # Xử lý nhãn
        answer = self.data[index]['answer']
        id_label = self.label2idx[answer]
        id_label = torch.tensor(id_label).to(self.device)

        # Tạo sample
        sample = {
            'image': img,
            'question': questions,
            'label': id_label
        }

        return sample

### Xây dựng Transforms

In [52]:
data_transform = {
    'train': transforms.Compose([
        transforms.Resize(size=(224, 224)),
        transforms.CenterCrop(size=180),
        transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1),
        transforms.RandomHorizontalFlip(),
        transforms.GaussianBlur(3),
    ]),
    'val': transforms.Compose([
        transforms.Resize(size=(224, 224)),
        transforms.CenterCrop(size=180),
        transforms.ColorJitter(brightness=0.1, contrast=0.1, saturation=0.1),
        transforms.RandomHorizontalFlip(),
        transforms.GaussianBlur(3),
    ])
}

### Khai báo datasets object

In [53]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'

train_dataset = VQADatasets(
    train_data,
    label2idx=label2id,
    max_seq_length=20, transform=data_transform['train'], tokenize=tokenize, feature_extractor=feature_extractor, device=device
)
val_dataset = VQADatasets(
    val_data,
    label2idx=label2id,
    max_seq_length=20, transform=data_transform['val'], tokenize=tokenize, feature_extractor=feature_extractor, device=device
)
test_dataset = VQADatasets(
    test_data,
    label2idx=label2id,
    max_seq_length=20, transform=data_transform['val'], tokenize=tokenize, feature_extractor=feature_extractor, device=device
)

In [54]:
# Duyệt qua train_dataset và in thử một vài phần tử
for idx, sample in enumerate(train_dataset):
    print(f"Sample {idx}:")
    print("Image shape:", sample['image'])  # Kiểm tra hình dạng của ảnh
    print("Question:", sample['question'])        # Kiểm tra câu hỏi
    print("Label:", sample['label'])              # Kiểm tra nhãn
    print("-" * 50)

    break

Sample 0:
Image shape: {'pixel_values': tensor([[[ 0.8745,  0.8667,  0.8588,  ...,  0.1294, -0.0353, -0.0980],
         [ 0.8667,  0.8588,  0.8510,  ...,  0.2784,  0.0588, -0.0353],
         [ 0.8588,  0.8510,  0.8431,  ...,  0.4667,  0.2157,  0.0275],
         ...,
         [ 0.0039,  0.0275,  0.0118,  ..., -0.4745, -0.5059, -0.4824],
         [-0.0745, -0.0353, -0.0118,  ..., -0.3490, -0.2863, -0.2706],
         [-0.1059, -0.1294, -0.0980,  ..., -0.0980, -0.1059, -0.1608]],

        [[ 0.9451,  0.9373,  0.9294,  ...,  0.1686,  0.0510,  0.0196],
         [ 0.9373,  0.9294,  0.9216,  ...,  0.3098,  0.1294,  0.0667],
         [ 0.9294,  0.9216,  0.9137,  ...,  0.4902,  0.2784,  0.1216],
         ...,
         [-0.0824, -0.0431, -0.0275,  ..., -0.6471, -0.6627, -0.6157],
         [-0.1686, -0.1137, -0.0745,  ..., -0.5373, -0.4667, -0.4196],
         [-0.2157, -0.2235, -0.1765,  ..., -0.2863, -0.2863, -0.3255]],

        [[ 0.9373,  0.9294,  0.9216,  ...,  0.1059,  0.0275,  0.0196],
     

In [55]:
train_batch_size = 256
test_batch_size = 32

train_loader = DataLoader(
    train_dataset,
    batch_size=train_batch_size,
    shuffle=True
)
val_loader = DataLoader(
    val_dataset,
    batch_size=test_batch_size,
    shuffle=False
)
test_loader = DataLoader(
    test_dataset,
    batch_size=test_batch_size,
    shuffle=False
)

In [57]:
sample_batch = next(iter(train_loader))
print("Sample batch:")
print(sample_batch['label'].size(0))



Sample batch:
256


In [58]:
total = 0
total += sample_batch['label'].size(0)
total

256

### Xây dựng model 

#### Text encoder

In [59]:
class TextEncoder(nn.Module):
    def __init__(self):
        super().__init__()

        self.model = RobertaModel.from_pretrained("FacebookAI/roberta-base")

    def forward(self, inputs):
        return self.model(**inputs)

#### VisualEncoder

In [60]:
class VissionEncoder(nn.Module):
    def __init__(self):
        super().__init__()

        self.model = ViTModel.from_pretrained('google/vit-base-patch16-224-in21k')

    def forward(self, inputs):
        return self.model(**inputs)

### Classifier

In [61]:
class ClassfiyModel(nn.Module):
    def __init__(self, hidden_size = 512,dropout_prob = 0.2 ,num_classes = 2):
        super().__init__()

        self.fc1 = nn.Linear(768*2, hidden_size)
        self.dropout = nn.Dropout(dropout_prob)
        self.gelu = nn.GELU()
        self.fc2 = nn.Linear(hidden_size, num_classes)
    def forward(self, x):
        x = self.fc1(x)
        x = self.gelu(x)
        x = self.dropout(x)
        x = self.fc2(x)

        return x


#### VQA Model

In [64]:
class VQAModel(nn.Module):
    def __init__(self, visual_encoder, text_encoder, classifier):
        super().__init__()
        self.visual_encoder = visual_encoder
        self.text_encoder = text_encoder
        self.classifier = classifier

    def forward(self, image, answer):
        text_out = self.text_encoder(answer)
        image_out = self.visual_encoder(image)

        x = torch.cat((image_out, text_out), dim =1)
        x = self.classifier(x)

        return x
    
    def freeze(self, visual=True, textual=True, classifier=True):
        if visual:
            for n, p in self.visual_encoder.named_parameters():
                p.requires_grad = visual

        if textual:
            for n, p in self.text_encoder.named_parameters():
                p.requires_grad = textual

        if classifier:
            for param in self.classifier.parameters():
                param.requires_grad = classifier
        

In [65]:
n_classes = len(labels)
hidden_size = 512
dropout_prob = 0.2

text_encoder = TextEncoder().to(device)
visual_encoder = VissionEncoder().to(device)

classifier = ClassfiyModel(hidden_size=hidden_size, dropout_prob=dropout_prob, num_classes=n_classes).to(device)

model = VQAModel(visual_encoder, text_encoder, classifier).to(device)
model.freeze()


Some weights of RobertaModel were not initialized from the model checkpoint at FacebookAI/roberta-base and are newly initialized: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [66]:
lr = 1e-3
epochs = 50
scheduler_step_size = epochs * 0.8
criterion = nn.CrossEntropyLoss()

optimizer = torch.optim.Adam(
    model.parameters(),
    lr=lr
)
scheduler = torch.optim.lr_scheduler.StepLR(
    optimizer,
    step_size=scheduler_step_size,
    gamma=0.1
)

In [70]:
def evaluate(model, dataLoader, criterion):
    model.eval()
    correct = 0
    total = 0
    losses = []

    for i, sample in enumerate(dataLoader):
        img = sample["image"]
        question = sample["question"]
        label = sample["label"]

        outputs = model(img, question)
        loss = criterion(outputs, label)
        losses.append(loss.item())

        total += label.size(0)
        _, predicted = torch.max(outputs.data, 1)
        correct = (predicted == label).sum().item()
        break
    loss = sum(losses) / len(dataLoader)
    acc = correct / total

    return loss , acc


In [None]:
def fit(
    model,
    train_loader,
    val_loader,
    criterion,
    optimizer,
    scheduler,
    epochs
):
    train_losses = []
    val_losses = []

    for epoch in range(epochs):
        batch_train_losses = []

        model.train()
        for idx, inputs in enumerate(train_loader):
            images = inputs['image']
            questions = inputs['question']
            labels = inputs['label']

            optimizer.zero_grad()
            outputs = model(images, questions)
            loss = criterion(outputs, labels)

            loss.backward()
            optimizer.step()

            batch_train_losses.append(loss.item())

        train_loss = sum(batch_train_losses) / len(batch_train_losses)
        train_losses.append(train_loss)

        val_loss, val_acc = evaluate(
            model, val_loader,
            criterion
        )
        val_losses.append(val_loss)

        print(
            f'EPOCH {epoch + 1}: Train loss: {train_loss:.4f} Val loss: {val_loss:.4f} Val Acc: {val_acc}')

        scheduler.step()

    return train_losses, val_losses

In [73]:
def fit(
    model,
    train_loader,
    val_loader,
    criterion,
    optimizer,
    scheduler,
    epochs
):
    train_losses = []
    val_losses = []

    for epoch in range(epochs):
        batch_train_losses = []

        model.train()
        for idx, inputs in enumerate(train_loader):
            images = inputs['image']
            questions = inputs['question']
            labels = inputs['label']

            optimizer.zero_grad()
            outputs = model(images, questions)
            loss = criterion(outputs, labels)

            loss.backward()
            optimizer.step()

            batch_train_losses.append(loss.item())

        train_loss = sum(batch_train_losses) / len(batch_train_losses)
        train_losses.append(train_loss)

        val_loss, val_acc = evaluate(
            model, val_loader,
            criterion
        )
        val_losses.append(val_loss)

        print(
            f'EPOCH {epoch + 1}: Train loss: {train_loss:.4f} Val loss: {val_loss:.4f} Val Acc: {val_acc}')

        scheduler.step()

    return train_losses, val_losses