<a href="https://colab.research.google.com/github/Trung0Minh/AIO2023-MODULE-6/blob/main/visual_question_answering.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import torch
import torch.nn as nn
import os
import pandas as pd
import matplotlib.pyplot as plt

from PIL import Image
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split

import spacy

In [None]:
!pip install torchtext==0.15.2
!pip install torch==1.13.0
import torchtext
from torchtext.data.utils import get_tokenizer
from torchtext.vocab import build_vocab_from_iterator

In [None]:
def load_data(dataset_path):
    data = []
    with open(dataset_path, "r") as f:
        lines = f.readlines()
        for line in lines:
            temp = line.split("\t")
            qa = temp[1].split('?')

            if len(qa) == 3:
                answer = qa[2].strip()
            else:
                answer = qa[1].strip()

            data_sample = {
                'image_path': temp[0][:-2],
                'question': qa[0],
                'answer': answer
            }
            data.append(data_sample)
    return data

train_set_path = './vaq2.0.TrainImages.txt'
train_data = load_data(train_set_path)

val_set_path = './vaq2.0.DevImages.txt'
val_data = load_data(val_set_path)

test_set_path = './vaq2.0.TestImages.txt'
test_data = load_data(test_set_path)

In [None]:
eng = spacy.load("en_core_web_sm")  # Load the English model to tokenize English text

def get_tokens(data_iter):
    for sample in data_iter:
        question = sample["question"]
        yield [token.text for token in eng.tokenizer(question)]

vocab = build_vocab_from_iterator(
    get_tokens(train_data),
    min_freq=2,
    specials=["<pad>", "<sos>", "<eos>", "<unk>"],
    special_first=True
)

vocab.set_default_index(vocab["<unk>"])

In [None]:
classes = set([sample['answer'] for sample in train_data])
classes_to_idx = {
    class_name: idx for idx, class_name in enumerate(classes)
}

idx_to_classes = {
    idx: class_name for class_name, idx in enumerate(classes)
}

In [None]:
def tokenize(quesion, max_sequence_length):
    tokens = [token.text for token in eng.tokenizer(quesion)]
    sequence = [vocab[token] for token in tokens]
    if len(sequence) < max_sequence_length:
        sequence += [vocab["<pad>"]] * (max_sequence_length - len(sequence))
    else:
        sequence = sequence[:max_sequence_length]
    return sequence

In [None]:
class VQADataset(Dataset):
    def __init__(self, data, classes_to_idx, max_seq_len=30, transform=None, root_dir='val2014-resised'):
        self.transform = transform
        self.data = data
        self.max_seq_len = max_seq_len
        self.root_dir = root_dir
        self.classes_to_idx = classes_to_idx

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path = os.path.join(self.root_dir, self.data[idx]['image_path'])
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)

        question = self.data[idx]['question']
        question = tokenize(question, self.max_seq_len)
        question = torch.tensor(question, dtype=torch.long)

        answer = self.data[idx]['answer']
        answer = self.classes_to_idx[answer]
        answer = torch.tensor(answer, dtype=torch.long)

        return image, question, answer

In [None]:
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [None]:
train_dataset = VQADataset(train_data, classes_to_idx, transform=transform)
val_dataset = VQADataset(val_data, classes_to_idx, transform=transform)
test_dataset = VQADataset(test_data, classes_to_idx, transform=transform)

In [None]:
train_batch_size = 128
test_batch_size = 32

train_dataloader = DataLoader(train_dataset, batch_size=train_batch_size, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=test_batch_size, shuffle=False)
test_dataloader = DataLoader(test_dataset, batch_size=test_batch_size, shuffle=False)

In [None]:
class VQAModel(nn.Module):
    def __init__(self, n_classes, img_model_name='resnet50', embedding_dim=300, n_layers=2, hidden_size=128, dropout=0.2):
        super().__init__()
        self.image_encoder = timm.create_model(
            img_model_name, pretrained=True, num_classes=hidden_size
        )

        self.embedding = nn.Embedding(len(vocab), embedding_dim)
        self.lstm = nn.LSTM(
            input_size=embedding_dim,
            hidden_size=hidden_size,
            num_layers=n_layers,
            dropout=dropout,
            batch_first=True
        )

        self.layernorm = nn.LayerNorm(hidden_size*2)
        self.fc1 = nn.Linear(hidden_size*3, 256)
        self.relu = nn.ReLU()
        self.dropout = nn.Dropout(dropout)
        self.fc2 = nn.Linear(256, n_classes)

    def forward(self, img, text):
            img_features = self.image_encoder(img)

            text_emb = self.embedding(text)
            lstm_out, _ = self.lstm(text_emb)

            lstm_out = lstm_out[:, -1, :]
            lstm_out = self.layernorm(lstm_out)

            combined = torch.cat((img_features, lstm_out), dim=1)

            x = self.fc1(combined)
            x = self.relu(x)
            x = self.dropout(x)
            x = self.fc2(x)
            return x

In [None]:
n_classes = len(classes)
img_model_name = "resnet50"
hidden_size = 128
n_layers = 1
embedding_dim = 128
dropout = 0.2
device = 'cude' if torch.cuda.is_available() else 'cpu'

model = VQAModel(n_classes, img_model_name, embedding_dim, n_layers, hidden_size, dropout).to(device)

In [None]:
def evaluate(model, dataloader, criterion, device):
    model.eval()
    losses = []
    accs = []
    with torch.no_grad():
        for image, question, labels in dataloader:
            image = image.to(device)
            question = question.to(device)
            labels = labels.to(device)

            outputs = model(image, question)
            loss = criterion(outputs, labels)
            losses.append(loss.item())

            _, predicted = torch.max(outputs, 1)

            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    loss = sum(losses) / len(losses)
    acc = correct / total
    return loss, acc

In [None]:
def fit(model, train_dataloader, val_Dataloader, criterion, optimizer, scheduler, device, epochs):
    train_losses = []
    val_losses = []

    for epoch in range(epochs):
        batch_train_losses = []

        for image, question, labels in train_dataloader:
            image = image.to(device)
            question = question.to(device)
            labels = labels.to(device)

            optimizer.zero_grad()
            outputs = model(image, question)

            loss = criterion(outputs, labels)
            batch_train_losses.append(loss.item())

            loss.backward()
            optimizer.step()

        train_loss = sum(batch_train_losses) / len(batch_train_losses)
        train_losses.append(train_loss)

        val_loss, val_acc = evaluate(model, val_dataloader, criterion, device)
        val_losses.append(val_loss)

        print(f"Epoch {epoch+1}/{epochs} | Train Loss: {train_loss:.4f} | Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f}")
        scheduler.step(val_loss)

    return train_losses, val_losses

In [None]:
lr = 1e-2
epochs = 50
weight_decay = 1e-5
scheduler_step_size = epochs * 0.6
criterion = nn.CrossEntropyLoss()

optimizer = torch.optim.Adam(
    model.parameters(),
    lr=lr,
    weight_decay=weight_decay
)

scheduler = torch.optim.lr_scheduler.StepLR(
    optimizer,
    step_size=scheduler_step_size,
    gamma=0.1
)

In [None]:
train_losses, val_losses = fit(model, train_dataloader, val_dataloader, criterion, optimizer, scheduler, device, epochs)

In [None]:
test_loss, test_acc = evaluate(model, test_dataloader, criterion, device)
print(f"Test Loss: {test_loss:.4f} | Test Acc: {test_acc:.4f}")

# Vit + RoBERTa

In [None]:
import torch
import torch.nn as nn
import os
import numpy as np
import pandas as pd
import timm
import matplotlib.pyplot as plt

from PIL import Image
from torch.utils.data import Dataset, DataLoader
from transformers import ViTModel, ViTImageProcessor
from transformers import AutoTokenizer, RobertaModel

In [None]:
def load_data(dataset_path):
    data = []
    with open(dataset_path, "r") as f:
        lines = f.readlines()
        for line in lines:
            temp = line.split("\t")
            qa = temp[1].split('?')

            if len(qa) == 3:
                answer = qa[2].strip()
            else:
                answer = qa[1].strip()

            data_sample = {
                'image_path': temp[0][:-2],
                'question': qa[0],
                'answer': answer
            }
            data.append(data_sample)
    return data

train_set_path = './vaq2.0.TrainImages.txt'
train_data = load_data(train_set_path)

val_set_path = './vaq2.0.DevImages.txt'
val_data = load_data(val_set_path)

test_set_path = './vaq2.0.TestImages.txt'
test_data = load_data(test_set_path)

In [None]:
eng = spacy.load("en_core_web_sm")  # Load the English model to tokenize English text

def get_tokens(data_iter):
    for sample in data_iter:
        question = sample["question"]
        yield [token.text for token in eng.tokenizer(question)]

vocab = build_vocab_from_iterator(
    get_tokens(train_data),
    min_freq=2,
    specials=["<pad>", "<sos>", "<eos>", "<unk>"],
    special_first=True
)

vocab.set_default_index(vocab["<unk>"])

In [None]:
classes = set([sample['answer'] for sample in train_data])
classes_to_idx = {
    class_name: idx for idx, class_name in enumerate(classes)
}

idx_to_classes = {
    idx: class_name for class_name, idx in enumerate(classes)
}

In [None]:
def tokenize(quesion, max_sequence_length):
    tokens = [token.text for token in eng.tokenizer(quesion)]
    sequence = [vocab[token] for token in tokens]
    if len(sequence) < max_sequence_length:
        sequence += [vocab["<pad>"]] * (max_sequence_length - len(sequence))
    else:
        sequence = sequence[:max_sequence_length]
    return sequence

In [None]:
class VQADataset(Dataset):
    def __init__(self, data, classes_to_idx, max_seq_len=30, transform=None, root_dir='val2014-resided'):
        self.transform = transform
        self.data = data
        self.max_seq_len = max_seq_len
        self.root_dir = root_dir
        self.classes_to_idx = classes_to_idx

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path = os.path.join(self.root_dir, self.data[idx]['image_path'])
        image = Image.open(img_path).convert('RGB')
        if self.transform:
            image = self.transform(image)

        question = self.data[idx]['question']
        question = tokenize(question, self.max_seq_len)
        question = torch.tensor(question, dtype=torch.long)

        answer = self.data[idx]['answer']
        answer = self.classes_to_idx[answer]
        answer = torch.tensor(answer, dtype=torch.long)

        return image, question, answer

In [None]:
from torchvision import transforms

transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225])
])

In [None]:
train_dataset = VQADataset(train_data, classes_to_idx, transform=transform)
val_dataset = VQADataset(val_data, classes_to_idx, transform=transform)
test_dataset = VQADataset(test_data, classes_to_idx, transform=transform)

In [None]:
train_batch_size = 128
test_batch_size = 32

train_dataloader = DataLoader(train_dataset, batch_size=train_batch_size, shuffle=True)
val_dataloader = DataLoader(val_dataset, batch_size=test_batch_size, shuffle=False)
test_dataloader = DataLoader(test_dataset, batch_size=test_batch_size, shuffle=False)

In [None]:
class VQADataset(Dataset):
    def __init__(self, data, classes_to_idx, img_feature_extractor, text_tokenizer, device, root_dir='./val2014-resided'):
        self.data = data
        self.root_dir = root_dir
        self.classes_to_idx = classes_to_idx
        self.img_feature_extractor = img_feature_extractor
        self.text_tokenizer = text_tokenizer
        self.device = device

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        img_path = os.path.join(self.root_dir, self.data[index]['image_path'])
        img = Image.open(img_path).convert('RGB')

        if self.img_feature_extractor:
            img = self.img_feature_extractor(img, return_tensors='pt')
            img = {k: v.to(self.device) for k, v in img.items()}

        question = self.data[index]['question']
        if self.text_tokenizer:
            question = self.text_tokenizer(
                question,
                padding='max_length',
                truncation=True,
                max_length=30,
                return_tensors='pt'
            )
            question = {k: v.to(self.device) for k, v in question.items()}

        label = self.data[index]['answer']
        label = torch.tensor(self.classes_to_idx[label], dtype=torch.long).to(device)
        sample = {
            'image': img,
            'question': question,
            'label': label
        }

        return sample

In [None]:
img_feature_extractor = ViTImageProcessor.from_pretrained("google/vit-base-patch16-224")
text_tokenizer = AutoTokenizer.from_pretrained("roberta-base")

device = 'cuda' if torch.cuda.is_available() else 'cpu'

train_dataset = VQADataset(
    train_data,
    classes_to_idx=classes_to_idx,
    img_feature_extractor=img_feature_extractor,
    text_tokenizer=text_tokenizer,
    #label_encoder=label_encoder,
    device=device
)

val_dataset = VQADataset(
    val_data,
    classes_to_idx=classes_to_idx,
    img_feature_extractor=img_feature_extractor,
    text_tokenizer=text_tokenizer,
    #label_encoder=label_encoder,
    device=device
)

test_dataset = VQADataset(
    test_data,
    classes_to_idx=classes_to_idx,
    img_feature_extractor=img_feature_extractor,
    text_tokenizer=text_tokenizer,
    #label_encoder=label_encoder,
    device=device
)

In [None]:
class TextEncoder(nn.Module):
    def __init__(self):
        super(TextEncoder, self).__init__()
        self.model = RobertaModel.from_pretrained("roberta-base")

    def forward(self, inputs):
        outputs = self.model(**inputs)
        return outputs.pooler_output

class VisualEncoder(nn.Module):
    def __init__(self):
        super(VisualEncoder, self).__init__()
        self.model = ViTModel.from_pretrained("google/vit-base-patch16-224")

    def forward(self, inputs):
        outputs = self.model(**inputs)
        return outputs.pooler_output

In [None]:
class Classifier(nn.Module):
    def __init__(self, input_size=768*2, hidden_size=512, n_layers=1, dropout=0.2,  n_classes=2):
        super(Classifier, self).__init__()
        self.lstm = nn.LSTM(
            input_size=input_size,
            hidden_size=hidden_size,
            num_layers=n_layers,
            dropout=dropout,
            batch_first=True
        )
        self.dropout = nn.Dropout(dropout)
        self.fc = nn.Linear(hidden_size*2, n_classes)

    def forward(self, x):
        x, _ = self.lstm(x)
        x = self.dropout(x)
        x = self.fc(x)

        return x

In [None]:
class VQAModel(nn.Module):
    def __init__(self, text_encoder, visual_encoder, classifier):
        super(VQAModel, self).__init__()
        self.text_encoder = text_encoder
        self.visual_encoder = visual_encoder
        self.classifier = classifier

    def forward(self, x):
        text_out = self.text_encoder(x['question'])
        img_out = self.visual_encoder(x['image'])
        x = torch.cat((text_out, img_out), dim=1)
        x = self.classifier(x)
        return x

    def freeze(self, visual=True, textual=True, clas=False):
        if visual:
            for n, p in self.visual_encoder.named_parameters():
                p.requires_grad = False
        if textual:
            for n, p in self.text_encoder.named_parameters():
                p.requires_grad = False
        if clas:
            for n, p in self.classifier.named_parameters():
                p.requires_grad = False

In [None]:
n_classes = len(classes)
hidden_size = 1024
n_layers = 1
dropout_prob = 0.2

text_encoder = TextEncoder().to(device)
visual_encoder = VisualEncoder().to(device)

classifier = Classifier(
    hidden_size=hidden_size,
    n_layers=n_layers,
    dropout=dropout_prob,
    n_classes=n_classes
).to(device)

model = VQAModel(
    visual_encoder=visual_encoder,
    text_encoder=text_encoder,
    classifier=classifier
).to(device)

model.freeze()

Some weights of RobertaModel were not initialized from the model checkpoint at roberta-base and are newly initialized: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Some weights of ViTModel were not initialized from the model checkpoint at google/vit-base-patch16-224 and are newly initialized: ['vit.pooler.dense.bias', 'vit.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
def evaluate(model, dataloader, criterion):
    model.eval()
    correct = 0
    total = 0
    losses = []

    with torch.no_grad():
        for idx, inputs in enumerate(dataloader):
            images = inputs['image']
            questions = inputs['question']
            labels = inputs['label']
            outputs = model(images, questions)
            loss = criterion(outputs, labels)
            losses.append(loss.item())
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()

    loss = sum(losses) / len(losses)
    acc = correct / total

    return loss, acc

In [None]:
def fit(
    model,
    train_loader,
    val_loader,
    criterion,
    optimizer,
    scheduler,
    epochs
):
    train_losses = []
    val_losses = []

    for epoch in range(epochs):
        batch_train_losses = []

        model.train()
        for idx, inputs in enumerate(train_loader):
            images = inputs['image']
            questions = inputs['question']
            labels = inputs['label']

            optimizer.zero_grad()

            outputs = model(images, questions)
            loss = criterion(outputs, labels)

            optimizer.step()

            batch_train_losses.append(loss.item())

        train_loss = sum(batch_train_losses) / len(batch_train_losses)
        train_losses.append(train_loss)

        val_loss, val_acc = evaluate(
            model, val_loader,
            criterion
        )
        val_losses.append(val_loss)

        print(f'EPOCH {epoch + 1}:\\tTrain loss: {train_loss: .4f}\\tVal loss: {val_loss: .4f}\\tVal Acc: {val_acc}')

        scheduler.step()

    return train_losses, val_losses

In [None]:
lr = 1e-2
epochs = 50
scheduler_step_size = epochs * 0.6
criterion = nn.CrossEntropyLoss()

optimizer = torch.optim.Adam(
    model.parameters(),
    lr=lr
)

scheduler = torch.optim.lr_scheduler.StepLR(
    optimizer,
    step_size=scheduler_step_size,
    gamma=0.1
)

In [None]:
train_loss, val_loss = fit(
    model,
    train_dataloader,
    val_dataloader,
    criterion,
    optimizer,
    scheduler,
    epochs
)

In [None]:
val_loss, val_acc = evaluate(model, val_dataloader, criterion)
test_loss, test_acc = evaluate(model, test_dataloader, criterion)

print(f"Val Loss: {val_loss:.4f} | Val Acc: {val_acc:.4f}")
print(f"Test Loss: {test_loss:.4f} | Test Acc: {test_acc:.4f}")