In [2]:
import os
import numpy as np
import time
import random
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import DataLoader, Dataset, Subset
from transformers import AutoTokenizer, AutoModel, AutoImageProcessor, ViTModel
from PIL import Image
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
from datasets import load_dataset
from tqdm import tqdm
from copy import deepcopy
from transformers import Trainer, TrainingArguments


In [3]:
# Load the VQAv2 dataset from Hugging Face Hub
# %%timeC
dataset = load_dataset("HuggingFaceM4/VQAv2")

You can avoid this message in future by passing the argument `trust_remote_code=True`.
Passing `trust_remote_code=True` will be mandatory to load this dataset from the next major release of `datasets`.


Downloading builder script:   0%|          | 0.00/7.36k [00:00<?, ?B/s]

Downloading readme:   0%|          | 0.00/352 [00:00<?, ?B/s]

Repo card metadata block was not found. Setting CardData to empty.


Downloading data:   0%|          | 0.00/7.24M [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/3.49M [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/8.97M [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/21.7M [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/10.5M [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/13.5G [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/6.65G [00:00<?, ?B/s]

Downloading data:   0%|          | 0.00/13.3G [00:00<?, ?B/s]

Generating train split: 0 examples [00:00, ? examples/s]

Generating validation split: 0 examples [00:00, ? examples/s]

Generating testdev split: 0 examples [00:00, ? examples/s]

Generating test split: 0 examples [00:00, ? examples/s]

In [4]:
# Access train, validation, and test sets
train_dataset = dataset['train']
test_dataset = dataset['test']
val_dataset = dataset['validation']

print(train_dataset[0]['image'])

image=train_dataset[0]['image']
display(image)
answer=train_dataset[0]['answers']
print(answer)
# If you need to ensure the image is in RGB mode
image = image.convert("RGB")

def load_image(image):
    return image.convert("RGB")

def display_image(image):
    display(image)

In [5]:
# Set up environment
os.environ["TOKENIZERS_PARALLELISM"] = "false"

In [6]:
# Tokenizer and Image Processor
bert_tokenizer = AutoTokenizer.from_pretrained("bert-base-cased")
image_processor = AutoImageProcessor.from_pretrained("google/vit-base-patch16-224")

tokenizer_config.json:   0%|          | 0.00/49.0 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/570 [00:00<?, ?B/s]

vocab.txt:   0%|          | 0.00/213k [00:00<?, ?B/s]

tokenizer.json:   0%|          | 0.00/436k [00:00<?, ?B/s]

preprocessor_config.json:   0%|          | 0.00/160 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/69.7k [00:00<?, ?B/s]

In [7]:
class VQADataset(Dataset):
    def __init__(self, dataset, tokenizer, image_processor, label_encoder):
        self.dataset = dataset
        self.tokenizer = tokenizer
        self.image_processor = image_processor
        self.label_encoder = label_encoder

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        item = self.dataset[idx]
        question = item['question']
        answers = item['answers']
        image = item['image'].convert("RGB")

        text_inputs = self.tokenizer(question, padding='max_length', truncation=True, return_tensors="pt")
        image_inputs = self.image_processor(images=[image], return_tensors="pt")

        text_inputs = {k: v.squeeze(0) for k, v in text_inputs.items()}
        image_inputs = {k: v.squeeze(0) for k, v in image_inputs.items()}

        # Convert answers to soft targets
        answer_texts = [answer['answer'] for answer in answers]
        encoded_labels = self.label_encoder.transform(answer_texts)
        one_hot_labels = np.zeros((len(self.label_encoder.classes_)))
        for encoded_label in encoded_labels:
            one_hot_labels[encoded_label] += 1
        one_hot_labels /= len(encoded_labels)  # Average the one-hot vectors

        label = torch.tensor(one_hot_labels, dtype=torch.float)  # Convert to tensor

        return {'text_inputs': text_inputs, 'image_inputs': image_inputs, 'labels': label}


In [None]:
class VQAModel(nn.Module):
    def __init__(self, text_model_name="bert-base-cased", image_model_name="google/vit-base-patch16-224", num_answers=1000):
        super(VQAModel, self).__init__()
        self.text_model = AutoModel.from_pretrained(text_model_name)
        self.image_model = ViTModel.from_pretrained(image_model_name)
        self.text_fc = nn.Linear(self.text_model.config.hidden_size, 512)
        self.image_fc = nn.Linear(self.image_model.config.hidden_size, 512)
        self.classifier = nn.Linear(1024, num_answers)
        
    def forward(self, text_inputs, image_inputs):
        text_outputs = self.text_model(**text_inputs).last_hidden_state[:, 0, :]  # CLS token
        image_outputs = self.image_model(**image_inputs).last_hidden_state[:, 0, :]  # CLS token
        text_features = self.text_fc(text_outputs)
        image_features = self.image_fc(image_outputs)
        combined_features = torch.cat((text_features, image_features), dim=1)
        logits = self.classifier(combined_features)
        return logits

In [None]:
label_encoder = LabelEncoder()
all_answer_texts = [answer['answer'] for example in train_dataset for answer in example['answers']]
label_encoder.fit(all_answer_texts)
random.seed(42)
subset_indices = random.sample(range(len(dataset['train'])), len(dataset['train']) // 4)
subset_train_dataset = Subset(VQADataset(dataset['train'], bert_tokenizer, image_processor, label_encoder), subset_indices)

In [None]:
# Instantiate the model
num_answers = 162496  # Assuming a fixed number of possible answers
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model = VQAModel(num_answers=num_answers).to(device)
criterion = nn.BCEWithLogitsLoss()  # Suitable for multi-label classification with soft targets
optimizer = optim.Adam(model.parameters(), lr=1e-4)

In [16]:
def train(model, dataloader, criterion, optimizer, num_epochs=5):
    model.train()
    for epoch in range(num_epochs):
        total_loss = 0
        start_time = time.time()  # start time for epoch
        all_predictions=[]
        all_labels=[]
        for batch in tqdm(dataloader):
            text_inputs = {k: v.to(device) for k, v in batch['text_inputs'].items()}
            image_inputs = {k: v.to(device) for k, v in batch['image_inputs'].items()}
            labels = batch['labels'].to(device)

            optimizer.zero_grad()
            outputs = model(text_inputs, image_inputs)
            loss = criterion(outputs, labels)
            loss.backward()
            optimizer.step()

            total_loss += loss.item()
            all_labels.extend(labels.cpu().numpy())
            all_predictions.extend(torch.argmax(outputs, dim=1).cpu().numpy())
            
        end_time = time.time()  # End time for epoch
        epoch_time = end_time - start_time  # Time taken for epoch
        
        # Calculate metrics
        accuracy = accuracy_score(np.array(all_labels), binary_predictions)
        f1 = f1_score(all_labels, all_predictions, average='weighted')
        binary_predictions = np.argmax(all_predictions, axis=1)
        precision = precision_score(np.array(all_labels).argmax(axis=1), binary_predictions.argmax(axis=1), average='weighted')
        recall = recall_score(np.array(all_labels).argmax(axis=1), binary_predictions.argmax(axis=1), average='weighted')

        print(f'Epoch {epoch+1}/{num_epochs}, Loss: {total_loss / len(dataloader)}, '
          f'Accuracy: {accuracy}, F1 Score: {f1}, Precision: {precision}, '
          f'Recall: {recall}, Time Taken: {epoch_time:.2f} seconds')

# DataLoader
dataloader = DataLoader(subset_train_dataset, batch_size=2, shuffle=True)

# Train the model
train(model, dataloader, criterion, optimizer, num_epochs=3)



Epoch 1/3, Loss: 3.66818, Accuracy: 0.224831, F1 Score: 0.003869, Precision: 0.340584, Recall: 0.219438, Time Taken: 7246.38 seconds
Epoch 2/3, Loss: 3.29474, Accuracy: 0.252424, F1 Score: 0.010920, Precision: 0.370049, Recall: 0.221462, Time Taken: 7178.09 seconds
Epoch 3/3, Loss: 2.99719, Accuracy: 0.265961, F1 Score: 0.012784, Precision: 0.376632, Recall: 0.238232, Time Taken: 7234.18 seconds


In [18]:
# Save the trained model
lora_model_path = 'vqa_model_25percent.pth'
torch.save(lora_model.state_dict(), lora_model_path)
print(f'Model saved to {lora_model_path}')

Model saved to vqa_model_25percent.pth


In [None]:
def load_model(model_path):
    model = VQAModel(num_answers=num_answers)
    model.load_state_dict(torch.load(model_path, map_location=device))
    model.to(device)
    model.eval()
    return model

In [None]:
# Load model
loaded_model = load_model(model_path)

In [None]:
import os
import subprocess
from IPython.display import FileLink, display

def download_file(path, download_file_name):
    os.chdir('/kaggle/working/')
    zip_name = f"/kaggle/working/{download_file_name}.zip"
    command = f"zip {zip_name} {path} -r"
    result = subprocess.run(command, shell=True, capture_output=True, text=True)
    if result.returncode != 0:
        print("Unable to run zip command!")
        print(result.stderr)
        return
    display(FileLink(f'{download_file_name}.zip'))

In [None]:
download_file('/kaggle/working/vqa_model_25percent.zip', 'out')