#  Improved  Visual Question Answering By Transformer

**Student:** DAO Hoai Linh vs LE Thi Hoai Luong<br>
**Paper:** Vision Question Answering System Based on Roberta and Vit Model
<br>
**Date created:** 2023/04/12<br>
**Last modified:** 2023/04/14<br>
**Description:** Implementing VQA task using VisionTransformers + RoBERTa (Transformer Familly)

## Setup

In [45]:
! pip install timm transformers



In [46]:
import torch
import torch.nn as nn
import os
import numpy as np
import pandas as pd
import timm
import matplotlib.pyplot as plt

from PIL import Image
from torch.utils.data import Dataset, DataLoader
from transformers import ViTModel, ViTImageProcessor
from transformers import AutoTokenizer, RobertaModel
from sklearn.preprocessing import LabelEncoder
from torch.utils.data import DataLoader

## Downloading the data

In [47]:
import gdown
gdown.download(f"https://drive.google.com/uc?export=download&id=1kc6XNqHZJg27KeBuoAoYj70_1rT92191", "dataset.zip", quiet=False)

Downloading...
From (original): https://drive.google.com/uc?export=download&id=1kc6XNqHZJg27KeBuoAoYj70_1rT92191
From (redirected): https://drive.google.com/uc?export=download&id=1kc6XNqHZJg27KeBuoAoYj70_1rT92191&confirm=t&uuid=6f9a1a59-e5c9-42c6-968e-f96c12769baa
To: /content/dataset.zip
100%|██████████| 196M/196M [00:01<00:00, 161MB/s]


'dataset.zip'

In [48]:
import zipfile

zip_path = './dataset.zip'
extract_to_path = './dataset'

with zipfile.ZipFile(zip_path, 'r') as zip_ref:
    zip_ref.extractall(extract_to_path)

## Loading data


In [78]:
train_data = []
train_set_path = "/content/dataset/vaq2.0.TrainImages.txt"
val_data = []
val_set_path = '/content/dataset/vaq2.0.DevImages.txt'
test_data = []
test_set_path = '/content/dataset/vaq2.0.TestImages.txt'

In [80]:
with open(train_set_path, "r") as f:
    lines = f.readlines()
    for line in lines:
        temp = line.strip().split('\t')
        if len(temp) < 2:
            print("Skipping invalid line:", line)
            continue
        qa = temp[1].split('?')
        if len(qa) < 2:
            print("Skipping invalid question-answer format:", line)
            continue
        answer = qa[-1].strip()
        data_sample = {
            'image_path': temp[0].strip(),
            'question': '?'.join(qa[:-1]) + '?',
            'answer': answer
        }
        train_data.append(data_sample)

with open(val_set_path, "r") as f:
    lines = f.readlines()
    for line in lines:
        temp = line.strip().split('\t')
        if len(temp) < 2:
            print("Skipping invalid line:", line)
            continue
        qa = temp[1].split('?')
        if len(qa) < 2:
            print("Skipping invalid question-answer format:", line)
            continue
        answer = qa[-1].strip()
        data_sample = {
            'image_path': temp[0].strip(),
            'question': '?'.join(qa[:-1]) + '?',
            'answer': answer
        }
        val_data.append(data_sample)

with open(test_set_path, "r") as f:
    lines = f.readlines()
    for line in lines:
        temp = line.strip().split('\t')
        if len(temp) < 2:
            print("Skipping invalid line:", line)
            continue
        qa = temp[1].split('?')
        if len(qa) < 2:
            print("Skipping invalid question-answer format:", line)
            continue
        answer = qa[-1].strip()
        data_sample = {
            'image_path': temp[0].strip(),
            'question': '?'.join(qa[:-1]) + '?',
            'answer': answer
        }
        test_data.append(data_sample)

## Dictionary mapping classes


In [82]:
classes = set([sample['answer'] for sample in train_data])
classes_to_idx = {cls_name: idx for idx, cls_name in enumerate(classes)}
idx_to_classes = {idx: cls_name for idx, cls_name in enumerate(classes)}

## Dataset Class


In [83]:
class VQADataset(Dataset):
    def __init__(self, data, classes_to_idx, img_feature_extractor=None, text_tokenizer=None, label_encoder=None, device=None, root_dir=None):
        self.data = data
        self.root_dir = root_dir
        self.classes_to_idx = classes_to_idx
        self.img_feature_extractor = img_feature_extractor
        self.text_tokenizer = text_tokenizer
        self.label_encoder = label_encoder
        self.device = device

    def __len__(self):
        return len(self.data)

    def __getitem__(self, index):
        img_path = os.path.join(self.root_dir, self.data[index]['image_path'])
        img = Image.open(img_path).convert('RGB')

        if self.img_feature_extractor:
            img = self.img_feature_extractor(images=img, return_tensors="pt")
            img = {k: v.to(self.device).squeeze(0) for k, v in img.items()}

        question = self.data[index]['question']
        if self.text_tokenizer:
            question = self.text_tokenizer(
                question,
                padding="max_length",
                max_length=20,
                truncation=True,
                return_tensors="pt"
            )
            question = {k: v.to(self.device).squeeze(0) for k, v in question.items()}

        label = self.data[index]['answer']
        if self.label_encoder:
            label = self.label_encoder.transform([label])[0]
        else:
            label = self.classes_to_idx[label]  #
        label = torch.tensor(label, dtype=torch.long).to(self.device)

        sample = {
            'image': img,
            'question': question,
            'label': label
        }

        return sample


## Assign object for dataset

In [84]:
img_feature_extractor = ViTImageProcessor.from_pretrained("google/vit-base-patch16-224")
text_tokenizer = AutoTokenizer.from_pretrained("roberta-base")
device = 'cuda' if torch.cuda.is_available() else 'cpu'
root_dir = '/content/val2014-resised/'
# label_encoder = LabelEncoder()
# label_encoder.fit(classes)

train_dataset = VQADataset(
    train_data,
    classes_to_idx=classes_to_idx,
    img_feature_extractor=img_feature_extractor,
    text_tokenizer=text_tokenizer,
    label_encoder=label_encoder,
    device=device,
    root_dir=root_dir
)

val_dataset = VQADataset(
    val_data,
    classes_to_idx=classes_to_idx,
    img_feature_extractor=img_feature_extractor,
    text_tokenizer=text_tokenizer,
    label_encoder=label_encoder,
    device=device,
    root_dir=root_dir
)

test_dataset = VQADataset(
    test_data,
    classes_to_idx=classes_to_idx,
    img_feature_extractor=img_feature_extractor,
    text_tokenizer=text_tokenizer,
    label_encoder=label_encoder,
    device=device,
    root_dir=root_dir
)

#Model

In [85]:
class TextEncoder(nn.Module):
    def __init__(self):
        super(TextEncoder, self).__init__()
        self.model = RobertaModel.from_pretrained("roberta-base")

    def forward(self, inputs):
        outputs = self.model(**inputs)
        return outputs.pooler_output

class VisualEncoder(nn.Module):
    def __init__(self):
        super(VisualEncoder, self).__init__()
        self.model = ViTModel.from_pretrained("google/vit-base-patch16-224")

    def forward(self, inputs):
        outputs = self.model(**inputs)
        return outputs.pooler_output

In [86]:
class Classifier(nn.Module):
    def __init__(self, input_size=768*2, hidden_size=512, n_layers=1, dropout_prob=0.2, n_classes=2):
        super(Classifier, self).__init__()
        self.lstm = nn.LSTM(
            input_size,
            hidden_size,
            num_layers=n_layers,
            batch_first=True,
            bidirectional=True
        )
        self.dropout = nn.Dropout(dropout_prob)
        self.fc1 = nn.Linear(hidden_size * 2, n_classes)

    def forward(self, x):
        x, _ = self.lstm(x)
        x = self.dropout(x)
        x = self.fc1(x)
        return x

In [87]:
class VQAModel(nn.Module):
    def __init__(self, visual_encoder, text_encoder, classifier):
        super(VQAModel, self).__init__()
        self.visual_encoder = visual_encoder
        self.text_encoder = text_encoder
        self.classifier = classifier

    def forward(self, image, answer):
        text_out = self.text_encoder(answer)
        image_out = self.visual_encoder(image)
        x = torch.cat((text_out, image_out), dim=1)
        x = self.classifier(x)
        return x

    def freeze(self, visual=True, textual=True, clas=False):
        if visual:
            for n, p in self.visual_encoder.named_parameters():
                p.requires_grad = False
        if textual:
            for n, p in self.text_encoder.named_parameters():
                p.requires_grad = False
        if clas:
            for n, p in self.classifier.named_parameters():
                p.requires_grad = False


In [88]:
n_classes = len(classes)
hidden_size = 1024
n_layers = 1
dropout_prob = 0.2

text_encoder = TextEncoder().to(device)
visual_encoder = VisualEncoder().to(device)
classifier = Classifier(
    hidden_size=hidden_size,
    n_layers=n_layers,
    dropout_prob=dropout_prob,
    n_classes=n_classes
).to(device)

model = VQAModel(
    visual_encoder=visual_encoder,
    text_encoder=text_encoder,
    classifier=classifier
).to(device)
model.freeze()
print(model)

Some weights of RobertaModel were not initialized from the model checkpoint at roberta-base and are newly initialized: ['roberta.pooler.dense.bias', 'roberta.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.
Some weights of ViTModel were not initialized from the model checkpoint at google/vit-base-patch16-224 and are newly initialized: ['vit.pooler.dense.bias', 'vit.pooler.dense.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


VQAModel(
  (visual_encoder): VisualEncoder(
    (model): ViTModel(
      (embeddings): ViTEmbeddings(
        (patch_embeddings): ViTPatchEmbeddings(
          (projection): Conv2d(3, 768, kernel_size=(16, 16), stride=(16, 16))
        )
        (dropout): Dropout(p=0.0, inplace=False)
      )
      (encoder): ViTEncoder(
        (layer): ModuleList(
          (0-11): 12 x ViTLayer(
            (attention): ViTSdpaAttention(
              (attention): ViTSdpaSelfAttention(
                (query): Linear(in_features=768, out_features=768, bias=True)
                (key): Linear(in_features=768, out_features=768, bias=True)
                (value): Linear(in_features=768, out_features=768, bias=True)
                (dropout): Dropout(p=0.0, inplace=False)
              )
              (output): ViTSelfOutput(
                (dense): Linear(in_features=768, out_features=768, bias=True)
                (dropout): Dropout(p=0.0, inplace=False)
              )
            )
            

#Utils

In [89]:
def evaluate(model, dataloader, criterion):
    model.eval()
    correct = 0
    total = 0
    losses = []
    with torch.no_grad():
        for idx, inputs in enumerate(dataloader):
            images = inputs['image']
            questions = inputs['question']
            labels = inputs['label']
            outputs = model(images, questions)
            loss = criterion(outputs, labels)
            losses.append(loss.item())
            _, predicted = torch.max(outputs.data, 1)
            total += labels.size(0)
            correct += (predicted == labels).sum().item()
    loss = sum(losses) / len(losses)
    acc = correct / total
    return loss, acc


In [90]:
def fit(model, train_loader, val_loader, criterion, optimizer, scheduler, epochs):
    train_losses = []
    val_losses = []
    for epoch in range(epochs):
        batch_train_losses = []
        model.train()
        for idx, inputs in enumerate(train_loader):
            images = inputs['image']
            questions = inputs['question']
            labels = inputs['label']
            optimizer.zero_grad()
            outputs = model(images, questions)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()
            batch_train_losses.append(loss.item())
        train_loss = sum(batch_train_losses) / len(batch_train_losses)
        train_losses.append(train_loss)
        val_loss, val_acc = evaluate(model, val_loader, criterion)
        val_losses.append(val_loss)
        print(f'EPOCH {epoch+1}: Train loss: {train_loss:.4f} Val loss: {val_loss:.4f} Val Acc: {val_acc:.4f}')
        scheduler.step()
    return train_losses, val_losses

# Train and Evaluate

In [91]:
batch_size = 32

train_loader = DataLoader(dataset=VQADataset(data=train_data, classes_to_idx=classes_to_idx, img_feature_extractor=img_feature_extractor, text_tokenizer=text_tokenizer, label_encoder=label_encoder, device=device),
                          batch_size=batch_size,
                          shuffle=True,
                          num_workers=4)

val_loader = DataLoader(dataset=VQADataset(data=val_data, classes_to_idx=classes_to_idx, img_feature_extractor=img_feature_extractor, text_tokenizer=text_tokenizer, label_encoder=label_encoder, device=device),
                        batch_size=batch_size,
                        shuffle=False,
                        num_workers=4)

test_loader = DataLoader(dataset=VQADataset(data=test_data, classes_to_idx=classes_to_idx, img_feature_extractor=img_feature_extractor, text_tokenizer=text_tokenizer, label_encoder=label_encoder, device=device),
                         batch_size=batch_size,
                         shuffle=False,
                         num_workers=4)


In [94]:
import torch.multiprocessing as mp

mp.set_start_method('spawn', force=True)

In [None]:
lr = 1e-2
epochs = 50
scheduler_step_size = epochs * 0.6
criterion = nn.CrossEntropyLoss()

optimizer = torch.optim.Adam(model.parameters(), lr=lr)
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=scheduler_step_size, gamma=0.1)

train_losses, val_losses = fit(
    model,
    train_loader,
    val_loader,
    criterion,
    optimizer,
    scheduler,
    epochs
)

val_loss, val_acc = evaluate(model, val_loader, criterion)
test_loss, test_acc = evaluate(model, test_loader, criterion)

print('Evaluation on val/test dataset')
print('Val accuracy:', val_acc)
print('Test accuracy:', test_acc)