In [2]:
import os
import torch
import requests
import numpy as np 
import pandas as pd 
from torch import nn
from PIL import Image
from torch.optim import Adam
from datasets import load_dataset
from torch.nn import DataParallel
from IPython.display import display
from torch.nn import CrossEntropyLoss
from transformers import AutoTokenizer
from huggingface_hub import hf_hub_download
from transformers import BertModel, ViTModel
from torch.utils.data import Dataset, DataLoader
from transformers import BertTokenizer, ViTFeatureExtractor
from transformers import AutoProcessor, AutoModelForCausalLM
from torchvision.transforms import Compose, Resize, Normalize, ToTensor

import warnings
warnings.filterwarnings("ignore")

2024-04-22 22:06:32.841526: E external/local_xla/xla/stream_executor/cuda/cuda_dnn.cc:9261] Unable to register cuDNN factory: Attempting to register factory for plugin cuDNN when one has already been registered
2024-04-22 22:06:32.841627: E external/local_xla/xla/stream_executor/cuda/cuda_fft.cc:607] Unable to register cuFFT factory: Attempting to register factory for plugin cuFFT when one has already been registered
2024-04-22 22:06:33.113433: E external/local_xla/xla/stream_executor/cuda/cuda_blas.cc:1515] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


In [3]:
dataset = load_dataset(
    "csv", 
    data_files={
        "train": os.path.join("..","input","visual-question-answering-computer-vision-nlp","dataset","data_train.csv"),
        "test": os.path.join("..","input","visual-question-answering-computer-vision-nlp","dataset", "data_eval.csv")
    }
)
with open(os.path.join("..","input","visual-question-answering-computer-vision-nlp","dataset", "answer_space.txt")) as f:
    answer_space = f.read().splitlines()
dataset = dataset.map(
    lambda examples: {
        'label': [
            answer_space.index(ans.replace(" ", "").split(",")[0]) # Select the 1st answer if multiple answers are provided
            for ans in examples['answer']
        ]
    },
    batched=True
)
dataset

Generating train split: 0 examples [00:00, ? examples/s]

Generating test split: 0 examples [00:00, ? examples/s]

Map:   0%|          | 0/9974 [00:00<?, ? examples/s]

Map:   0%|          | 0/2494 [00:00<?, ? examples/s]

DatasetDict({
    train: Dataset({
        features: ['question', 'answer', 'image_id', 'label'],
        num_rows: 9974
    })
    test: Dataset({
        features: ['question', 'answer', 'image_id', 'label'],
        num_rows: 2494
    })
})

In [4]:
def load_image(image_id):
    base_path = os.path.join("..", "input", "visual-question-answering-computer-vision-nlp", "dataset", "images")
    image_path = os.path.join(base_path, f"{image_id}.png")
    image = Image.open(image_path).convert("RGB")
    return feature_extractor(image, return_tensors="pt")['pixel_values'].squeeze(0)

In [5]:
tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
feature_extractor = ViTFeatureExtractor.from_pretrained('google/vit-base-patch16-224-in21k')

tokenizer_config.json:   0%|          | 0.00/48.0 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/232k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/466k [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

preprocessor_config.json:   0%|          | 0.00/160 [00:00<?, ?B/s]

In [6]:
def preprocess_text(text):
    encoding = tokenizer(text, padding="max_length", truncation=True, max_length=128, return_tensors="pt")
    return encoding['input_ids'].squeeze(0), encoding['attention_mask'].squeeze(0)

In [7]:
class VQADataset(Dataset):
    def __init__(self, dataset):
        self.dataset = dataset
    def __len__(self):
        return len(self.dataset)
    def __getitem__(self, idx):
        item = self.dataset[idx]
        combined_text = f"Question: {item['question']}"
        input_ids, attention_mask = preprocess_text(combined_text)
        image = load_image(item['image_id'])
        label = torch.tensor(item['label'])
        return {
            'input_ids': input_ids,
            'attention_mask': attention_mask,
            'pixel_values': image,
            'labels': label
        }

In [8]:
train_loader = DataLoader(VQADataset(dataset['train']), batch_size=32, shuffle=True)
test_loader = DataLoader(VQADataset(dataset['test']), batch_size=32, shuffle=False)

In [9]:
class VisualQuestionAnsweringModel(nn.Module):
    def __init__(self, num_labels):
        super(VisualQuestionAnsweringModel, self).__init__()
        self.text_model = BertModel.from_pretrained('bert-base-uncased')
        self.image_model = ViTModel.from_pretrained('google/vit-base-patch16-224-in21k')
        self.classifier = nn.Linear(self.text_model.config.hidden_size + self.image_model.config.hidden_size, num_labels)

    def forward(self, input_ids, attention_mask, pixel_values):
        text_features = self.text_model(input_ids=input_ids, attention_mask=attention_mask).pooler_output
        image_features = self.image_model(pixel_values=pixel_values).pooler_output
        combined_features = torch.cat((text_features, image_features), dim=1)
        logits = self.classifier(combined_features)
        return logits
    def freeze_layers(self, freeze_fraction=0.7):
        all_parameters = list(self.text_model.named_parameters()) + list(self.image_model.named_parameters())
        total_params = len(all_parameters)
        freeze_until = int(total_params * freeze_fraction)
        all_parameters.sort(key=lambda x: x[0])
        for name, param in all_parameters[:freeze_until]:
            param.requires_grad = False
        for name, param in all_parameters:
            print(f"{name}: {'frozen' if not param.requires_grad else 'not frozen'}")

In [10]:
num_labels = len(answer_space)

In [11]:
model = VisualQuestionAnsweringModel(num_labels)
model.freeze_layers()
model = DataParallel(model)
model.to('cuda')

model.safetensors:   0%|          | 0.00/440M [00:00<?, ?B/s]

config.json:   0%|          | 0.00/502 [00:00<?, ?B/s]

model.safetensors:   0%|          | 0.00/346M [00:00<?, ?B/s]

embeddings.LayerNorm.bias: frozen
embeddings.LayerNorm.weight: frozen
embeddings.cls_token: frozen
embeddings.patch_embeddings.projection.bias: frozen
embeddings.patch_embeddings.projection.weight: frozen
embeddings.position_embeddings: frozen
embeddings.position_embeddings.weight: frozen
embeddings.token_type_embeddings.weight: frozen
embeddings.word_embeddings.weight: frozen
encoder.layer.0.attention.attention.key.bias: frozen
encoder.layer.0.attention.attention.key.weight: frozen
encoder.layer.0.attention.attention.query.bias: frozen
encoder.layer.0.attention.attention.query.weight: frozen
encoder.layer.0.attention.attention.value.bias: frozen
encoder.layer.0.attention.attention.value.weight: frozen
encoder.layer.0.attention.output.LayerNorm.bias: frozen
encoder.layer.0.attention.output.LayerNorm.weight: frozen
encoder.layer.0.attention.output.dense.bias: frozen
encoder.layer.0.attention.output.dense.bias: frozen
encoder.layer.0.attention.output.dense.weight: frozen
encoder.layer.0.

DataParallel(
  (module): VisualQuestionAnsweringModel(
    (text_model): BertModel(
      (embeddings): BertEmbeddings(
        (word_embeddings): Embedding(30522, 768, padding_idx=0)
        (position_embeddings): Embedding(512, 768)
        (token_type_embeddings): Embedding(2, 768)
        (LayerNorm): LayerNorm((768,), eps=1e-12, elementwise_affine=True)
        (dropout): Dropout(p=0.1, inplace=False)
      )
      (encoder): BertEncoder(
        (layer): ModuleList(
          (0-11): 12 x BertLayer(
            (attention): BertAttention(
              (self): BertSelfAttention(
                (query): Linear(in_features=768, out_features=768, bias=True)
                (key): Linear(in_features=768, out_features=768, bias=True)
                (value): Linear(in_features=768, out_features=768, bias=True)
                (dropout): Dropout(p=0.1, inplace=False)
              )
              (output): BertSelfOutput(
                (dense): Linear(in_features=768, out_features=

In [12]:
optimizer = Adam(model.parameters(), lr=5e-5)
criterion = CrossEntropyLoss()
num_epochs = 100
model.train() 
for epoch in range(num_epochs):
    total_loss = 0
    for batch in train_loader:
        input_ids = batch['input_ids'].to('cuda')
        attention_mask = batch['attention_mask'].to('cuda')
        pixel_values = batch['pixel_values'].to('cuda')
        labels = batch['labels'].to('cuda')
        optimizer.zero_grad()
        outputs = model(input_ids, attention_mask, pixel_values)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        total_loss += loss.item()
    overall_loss += total_loss
print(f"Total Loss over all epochs: {overall_loss}")

Total Loss over all epochs: 0.009664024118755535


In [13]:
test_loader = DataLoader(VQADataset(dataset['test']), batch_size=32, shuffle=False)
model.eval()
torch.no_grad()

predictions = []
true_labels = []

with torch.no_grad(): 
    for batch in test_loader:
        input_ids = batch['input_ids'].to('cuda')
        attention_mask = batch['attention_mask'].to('cuda')
        images = batch['pixel_values'].to('cuda')
        labels = batch['labels'].to('cuda')
        outputs = model(input_ids, attention_mask, images)
        preds = torch.argmax(outputs, dim=1)
        predictions.extend(preds.cpu().numpy())
        true_labels.extend(labels.cpu().numpy())
        
        
predicted_answers = [answer_space[pred] for pred in predictions]
true_answers = [answer_space[label] for label in true_labels]

correct_count = sum(p == t for p, t in zip(predicted_answers, true_answers))
total = len(predicted_answers)
accuracy = correct_count / total
print(f"Accuracy: {accuracy:.2f}")

Accuracy: 0.13
