In [None]:
import torch
import torch.nn as nn
import numpy as np
import matplotlib.pyplot as plt
import pandas as pd
import random as rand
import os

from PIL import Image
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from torchvision import transforms
from transformers import ViTModel, ViTImageProcessor
from transformers import AutoTokenizer, RobertaModel

In [None]:
def set_seed(seed):
  rand.seed(seed)
  np.random.seed(seed)
  torch.cuda.manual_seed(seed)
  torch.cuda.manual_seed_all(seed)
  torch.backends.cudnn.deterministic = True
  torch.backends.cudnn.benchmark = False

seed = 59
set_seed(59)
print("Code ran successfully")

In [None]:
data = open('/kaggle/input/vqa-set/vaq2.0.TrainImages.txt', 'r')
lines = data.readlines()
print(lines[:5])  # Print the first 5 lines

In [None]:
# Load train data
train_data = []
train_path = '/kaggle/input/vqa-set/vaq2.0.TrainImages.txt'
with open(train_path, 'r') as f:
    for i, line in enumerate(f.readlines()):
        
        full_sentence = line.split('\t')
        if (i < 3):
            print("Full sentence: ", full_sentence)
        
        img_path = full_sentence[0][:-2]
        if (i < 3):
            print("Image Path: ", img_path)
        
        qa = full_sentence[1].split('?')
        
        question = qa[0]
        if (i < 3):
            print("Question: ", question)

        # Error handling in case
        if len(qa) == 3:
            answer = qa[2]
        else:
            answer = qa[1]
        
        # Remove any trailing newline characters or extra spaces from the answer
        answer = answer.strip()
        
        if (i < 3):
            print("Answer: ", answer)
            
        if (i < 3):
            print(" ")
            
        data_sample = {
            'Image Path': img_path,
            'Question': question + '?',
            'Answer': answer  # No trailing newline
        }
        train_data.append(data_sample)

In [None]:
# Load val data
val_data = []
val_path = '/kaggle/input/vqa-set/vaq2.0.DevImages.txt'
with open(val_path, 'r') as f:
    for i, line in enumerate(f.readlines()):
        
        full_sentence = line.split('\t')
        if (i < 3):
            print("Full sentence: ", full_sentence)
        
        img_path = full_sentence[0][:-2]
        if (i < 3):
            print("Image Path: ", img_path)
        
        qa = full_sentence[1].split('?')
        
        question = qa[0]
        if (i < 3):
            print("Question: ", question)

        # Error handling in case
        if len(qa) == 3:
            answer = qa[2]
        else:
            answer = qa[1]
        
        # Remove any trailing newline characters or extra spaces from the answer
        answer = answer.strip()
        
        if (i < 3):
            print("Answer: ", answer)
            
        if (i < 3):
            print(" ")
            
        data_sample = {
            'Image Path': img_path,
            'Question': question + '?',
            'Answer': answer  # No trailing newline
        }
        val_data.append(data_sample)

In [None]:
# Load train data
test_data = []
test_path = '/kaggle/input/vqa-set/vaq2.0.TestImages.txt'
with open(test_path, 'r') as f:
    for i, line in enumerate(f.readlines()):
        
        full_sentence = line.split('\t')
        if (i < 3):
            print("Full sentence: ", full_sentence)
        
        img_path = full_sentence[0][:-2]
        if (i < 3):
            print("Image Path: ", img_path)
        
        qa = full_sentence[1].split('?')
        
        question = qa[0]
        if (i < 3):
            print("Question: ", question)

        # Error handling in case
        if len(qa) == 3:
            answer = qa[2]
        else:
            answer = qa[1]
        
        # Remove any trailing newline characters or extra spaces from the answer
        answer = answer.strip()
        
        if (i < 3):
            print("Answer: ", answer)
            
        if (i < 3):
            print(" ")
            
        data_sample = {
            'Image Path': img_path,
            'Question': question + '?',
            'Answer': answer  # No trailing newline
        }
        test_data.append(data_sample)

In [None]:
# Get all classes
classes = set([sample['Answer'] for sample in train_data])

label2idx = {
    cls_name: idx for idx, cls_name in enumerate(classes)
}
print(label2idx)
print("Keys: ", label2idx.keys())
print("Values: ", label2idx.values())

print("")

idx2label = {
    idx: cls_name for idx, cls_name in enumerate(classes)
}
print(idx2label)
print("Keys: ", idx2label.keys())
print("Values: ", idx2label.values())

In [None]:
class VQA_Dataset(Dataset):
    def __init__(self, data, label2idx, img_feature_extractor, text_tokenizer,
                device, transforms = None,
                img_dir = '/kaggle/input/vqa-set/val2014-resised'):
        self.data = data
        self.img_dir = img_dir
        self.label2idx = label2idx
        self.img_encoder = img_feature_extractor
        self.text_tokenizer = text_tokenizer
        self.device = device
        self.transforms = transforms

    def __len__(self):
        return len(self.data)

    def __getitem__(self, idx):
        img_path = os.path.join(self.img_dir, self.data[idx]['Image Path'])
        img = Image.open(img_path).convert('RGB')

        if self.transforms:
            img = self.transforms(img)

        if self.img_encoder: # Image Encoder
            img = self.img_encoder(images = img, return_tensors = 'pt')
            img = {k: v.to(self.device).squeeze(0) for k, v in img.items()}
        question = self.data[idx]['Question']
        
        if self.text_tokenizer: # Text Encoder
            question = self.text_tokenizer(
                question,
                padding = "max_length",
                max_length = 20,
                truncation = True,
                return_tensors = 'pt'
            )
            question = {k: v.to(self.device).squeeze(0) for k, v in question.items()}

        label = self.data[idx]['Answer']
        label = torch.tensor(
            self.label2idx[label],
            dtype = torch.long
        ).to(self.device)
        
        # The Encoded Image and Question with Label (Binary)
        sample = {
            'image': img,
            'question': question,
            'label': label
        }
        return sample

In [None]:
data_transform = transforms.Compose([
    transforms.Resize(size = (224, 224)),
    transforms.CenterCrop(size = 180),
    transforms.ColorJitter(brightness = 0.1, contrast = 0.1, saturation = 0.1),
    transforms.RandomHorizontalFlip(),
    transforms.GaussianBlur(3),
])

In [None]:
img_encoder = ViTImageProcessor.from_pretrained("google/vit-base-patch16-224")
text_encoder = AutoTokenizer.from_pretrained('roberta-base')
device = 'cuda' if torch.cuda.is_available() else 'cpu'
train_set = VQA_Dataset(
    train_data,
    label2idx = label2idx,
    img_feature_extractor = img_encoder,
    text_tokenizer = text_encoder,
    device = device,
    transforms = data_transform
)

val_set = VQA_Dataset(
    val_data,
    label2idx = label2idx,
    img_feature_extractor = img_encoder,
    text_tokenizer = text_encoder,
    device = device
)


test_set = VQA_Dataset(
    test_data,
    label2idx = label2idx,
    img_feature_extractor = img_encoder,
    text_tokenizer = text_encoder,
    device = device
)

print(train_set)
print(val_set)
print(test_set)

In [None]:
train_batch = 256
test_batch = 32

train_loader = DataLoader(
    train_set,
    batch_size = train_batch,
    shuffle = True
)

val_loader = DataLoader(
    val_set,
    batch_size = test_batch,
    shuffle = False
)

test_loader = DataLoader(
    test_set,
    batch_size = test_batch,
    shuffle = False
)
print(train_loader)
print(val_loader)
print(test_loader)

In [None]:
class TextEncoder(nn.Module):
    def __init__(self):
        super(TextEncoder, self).__init__()
        self.model = RobertaModel.from_pretrained('roberta-base')

    def forward(self, x):
        out = self.model(**x)
        return out.pooler_output

In [None]:
class VisualEncoder(nn.Module):
    def __init__(self):
        super(VisualEncoder, self).__init__()
        self.model = ViTModel.from_pretrained("google/vit-base-patch16-224")

    def forward(self, x):
        out = self.model(**x)
        return out.pooler_output

In [None]:
class Classifier(nn.Module):
    def __init__(self, hidden_size=512, dropout_prob = .2, n_classes = 2):
        super(Classifier, self).__init__()
        self.fc1 = nn.Linear(768*2, hidden_size)
        self.dropout = nn.Dropout(dropout_prob)
        self.gelu = nn.GELU()
        self.fc2 = nn.Linear(hidden_size, n_classes)

    def forward(self, x):
        x = self.fc1(x)
        x = self.gelu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x

In [None]:
class VQA_Model(nn.Module):
    def __init__(self, visual_encoder, text_encoder, classifier):
        super(VQA_Model, self).__init__()
        self.visual_encoder = visual_encoder
        self.text_encoder = text_encoder
        self.classifier = classifier

    def forward(self, img, ans):
        text_out = self.text_encoder(ans)
        img_out = self.visual_encoder(img)
        x = torch.concat((img_out, text_out), dim  =1)
        x = self.classifier(x)
        return x
    
    def freeze(self, visual=True, textual=True, classifier = False):
        if visual:
            for n, p in self.visual_encoder.named_parameters():
                p.requires_grad = False

        if textual:
            for n, p in self.visual_encoder.named_parameters():
                p.requires_grad = False

        if classifier:
            for n, p in self.visual_encoder.named_parameters():
                p.requires_grad = False

In [None]:
n_classes = len(classes)
hidden_size = 256
dropout_prob = 0.2
text_encoder = TextEncoder().to(device)
visual_encoder = VisualEncoder().to(device)
classifier = Classifier(hidden_size, dropout_prob, n_classes).to(device)
model = VQA_Model(visual_encoder, text_encoder, classifier).to(device)
model.freeze()

In [None]:
def eval(model, val_set, criterion, device):
    model.eval()
    corr = 0
    total = 0
    losses = []

    with torch.no_grad():
        for idx, inputs in enumerate(val_set):
            
            # Move inputs to device
            img = inputs['image']
            labels = inputs['label']

            # Handle optional 'questions' key
            questions = inputs['question']

            # Forward pass
            if questions is not None:
                outputs = model(img, questions)
            else:
                outputs = model(img)

            # Calculate loss
            loss = criterion(outputs, labels)
            losses.append(loss.item())

            # Calculate accuracy
            _, pred = torch.max(outputs.data, 1)  # Index of max logits
            total += labels.size(0)
            corr += (pred == labels).sum().item()

    # Calculate average loss and accuracy
    loss = sum(losses) / len(losses) if losses else 0.0
    acc = corr / total if total > 0 else 0.0

    return loss, acc
print("Done!")

In [None]:
import time
def train(model, train_data, val_data, criterion, optim, scheduler, device, epochs, log_interval = 10):
    train_loss = []
    val_loss = []
    for i in range(epochs):
        print("-" * 59)
        print(f"Starting Epoch {i + 1}...")
        epoch_start_time = time.time()

        batch_loss = []
        model.train()
        
        for idx, inputs in enumerate(train_data):
            image = inputs['image']
            questions = inputs['question']
            labels = inputs['label']

            optim.zero_grad()

            out = model(image, questions)
            loss = criterion(out, labels)
            loss.backward()
            optim.step()
            batch_loss.append(loss.item())
            if idx % log_interval == 0 and idx > 0:
                print(f"| Epoch: {i + 1} | {idx + 1}/{len(train_data)} Batches | Train Loss: {loss.item():.4f} |")
            
        train_loss1 = sum(batch_loss)/len(batch_loss)
        train_loss.append(train_loss1)
        val_loss1, val_acc = eval(
            model, val_data, criterion, device
        )
        val_loss.append(val_loss1)
        
        print(f"| Epoch: {i + 1}/{epochs:3d} | Train Loss: {train_loss1:.4f} | Val Loss: {val_loss1:.4f} | Val Accuracy: {val_acc:.2f} | Time: {(time.time() - epoch_start_time):.2f}s |")
        print(f"Epoch {i + 1} was ran successfully")
        print("-"*59)
        
        scheduler.step()
    return train_loss, val_loss
print("Done!")

In [None]:
lr = 1e-3
epochs = 50

scheduler_step_size = epochs*0.8
criterion = nn.CrossEntropyLoss()
optim = torch.optim.Adam(
    model.parameters(),
    lr = lr
)

scheduler = torch.optim.lr_scheduler.StepLR(
    optim,
    step_size = scheduler_step_size,
    gamma = 0.1
)
print("Done!")

In [None]:
train_loss, val_loss = train(model,
                            train_loader,
                            val_loader,
                            criterion,
                            optim,
                            scheduler,
                            device,
                            epochs)
print("Done!")

In [None]:
val_loss, val_acc = eval(model,
                        val_loader,
                        criterion,
                        device)

test_loss, test_acc = eval(
    model,
    test_loader,
    criterion,
    device
)
print("Done!")

In [None]:
print(f"Val Accuracy: {val_acc}")
print(f"Test Accuracy: {test_acc}")
print("Done!")