In [1]:
!pip install transformers
import os
os.environ["WANDB_NOTEBOOK_NAME"] = "model_train.ipynb"




In [2]:
!pip install datasets



In [1]:
# Importing necessary libraries
import os

from PIL import Image
import numpy as np
import torch
import torch.nn as nn
# The F1 score can be interpreted as a harmonic mean of precision. F1 achieves its best value at 1 and its worst score at 0. 
# Accuracy_score is used to calculate the accuracy of the fraction or number of correct predictions.
# Precision is a measure of the relevance of the results, while recall is a measure of the number of actually relevant results returned.

from sklearn.metrics import accuracy_score, f1_score, precision_score, recall_score
from sklearn import preprocessing
from datasets import load_dataset, set_caching_enabled
from datasets.arrow_dataset import DatasetTransformationNotAllowedError
from typing import Dict, List, Optional, Tuple
from dataclasses import dataclass

from transformers import (
    # Preprocessing
    AutoTokenizer, AutoFeatureExtractor,
    # Text and image models (now, image transformers like ViTModel, DeiTModel, BEiT can also be loaded using AutoModel)
    AutoModel,            
    # Training / Evaluation
    TrainingArguments, Trainer,
    # OPTIONAL: if you want to have more information on what's happening, activate the logger as follows
    logging )
# nltk is an open-source library written in Python dedicated to natural language processing (English language)
import nltk
nltk.download('wordnet')
# WordNet is a lexical database of semantic relations between words in more than 200 languages. 
# WordNet links words in semantic relations including synonyms, hyponyms, and meronyms.


from copy import deepcopy


[nltk_data] Downloading package wordnet to
[nltk_data]     C:\Users\giria\AppData\Roaming\nltk_data...
[nltk_data]   Package wordnet is already up-to-date!


In [2]:

device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')
print(device)
if device.type == 'cuda':
    print(torch.cuda.get_device_name(0))

cpu


### GATHERING OUR DATA: 

'Our Capstone Project Data will be splitted into three lists,'

* 1. Training Data with 12792 rows 
* 2. Validation Data with 2000 rows 
* 3. Testing Data with 500 rows and questions  

In [3]:
# Load and instantiate the training and evaluation datasets present in CSV format
dataset = load_dataset(
    "csv", 
    data_files={
        "training": os.path.join('data_train.csv'),
        "validation": os.path.join('data_val.csv'),
        "test": os.path.join('data_test.csv')
    }
)
# Load the space of all possible answers
with open(os.path.join('answer_space.txt')) as f:
    answer_space = f.read().splitlines()

# Since we are modeling the VQA task as a multiclass classification problem,
# we need to create labels from the actual answers
dataset = dataset.map(
    lambda examples: {
        'label': [
            answer_space.index(ans) for ans in examples['answer']           
        ]
    },
    batched=True
)
dataset

DatasetDict({
    training: Dataset({
        features: ['img_id', 'question', 'answer', 'label'],
        num_rows: 12792
    })
    validation: Dataset({
        features: ['img_id', 'question', 'answer', 'label'],
        num_rows: 2000
    })
    test: Dataset({
        features: ['img_id', 'question', 'answer', 'label'],
        num_rows: 500
    })
})

### MULTI MODAL COLLATOR: 

In our multi modal collator class we implement two transformers: 
* 1. AutoTokenizer: That helps to convert raw questions sentence  into inputs 
* 2. AutoFeatureExtractor that helps to convert our raw images into inputs

In [4]:
@dataclass
class MultimodalCollator:
    # We use AutoTokenizer and AutoFeatureExtractor from transformers to convert raw images and questions into inputs.
    tokenizer: AutoTokenizer
    preprocessor: AutoFeatureExtractor

    def tokenize_text(self, texts: List[str]):
      """The tokenize_text function specifically returns a dictionary of values instead of a simple list of values."""
      encoded_text = self.tokenizer(
            text=texts,
            # PyTorch represents data as multi-dimensional arrays, similar to NumPy arrays, called “Tensor”.
            return_tensors='pt', # return_tensors = "pt" is just so that the tokenizer returns PyTorch tensors.
            # Batch inputs are often of different lengths, padding and truncation are strategies to handle this issue.
            # Padding adds a special padding token to ensure that shorter sequences will have the same length as the longest sequence
            # in a batch or the maximum length accepted by the model (512).
            padding='longest',
            # truncation works in the opposite direction of padding
            truncation=True,
            max_length=24,
            # sequences will be encoded with the special tokens related to their model.
            add_special_tokens = True,
            return_token_type_ids=True,
            # The attention mask is a binary tensor indicating the position of padding indices so that the model does not attend to them.
            # For the BertTokenizer, 1 indicates a value that should be attended to, while 0 indicates a padded value
            return_attention_mask=True,         
        )
      return {
            # The squeeze function reduces the length 1 dimensions of the tensor
            # For example, for the following input shape: AX1XBXCX1XD, the function squeeze(input) would return the output AXBXCXD
            "input_ids": encoded_text['input_ids'].squeeze(),
            "token_type_ids": encoded_text['token_type_ids'].squeeze(),
            "attention_mask": encoded_text['attention_mask'].squeeze(),
        }

    def preprocess_images(self, images: List[str]):
        processed_images = self.preprocessor(
            images=[Image.open(os.path.join('train\Train_images', img_id + ".jpg")).convert('RGB') for img_id in images],
            return_tensors="pt",
        )
        return {
            "pixel_values": processed_images['pixel_values'].squeeze(),
        }
            
    def __call__(self, raw_batch_dict):
        return {
            # The isinstance() function checks if the object (raw_batch_dict) is an instance or subclass of the dict class (second argument).
            **self.tokenize_text(
                raw_batch_dict['question']
                if isinstance(raw_batch_dict, dict) else
                [i['question'] for i in raw_batch_dict]
            ),
            **self.preprocess_images(
                raw_batch_dict['img_id']
                if isinstance(raw_batch_dict, dict) else
                [i['img_id'] for i in raw_batch_dict]
            ),
            'labels': torch.tensor(
                raw_batch_dict['label']
                if isinstance(raw_batch_dict, dict) else
                [i['label'] for i in raw_batch_dict],
                dtype=torch.int64
            ),
        }


# Multimodal Visual Question Answering (VQA) Model

## Overview
This PyTorch module, `MultimodalVQAModel`, is designed for Visual Question Answering tasks. It leverages pretrained models for processing both textual and visual inputs, fuses these two modalities, and performs classification to provide answers to visual questions. The class inherits from `nn.Module`, the base class for all neural network modules in PyTorch.

## Components
- **Text and Image Encoders**: Utilizes `AutoModel.from_pretrained` to load pretrained models specified by `pretrained_text_name` and `pretrained_image_name`. These encoders are used to process the text and image inputs, respectively.

- **Fusion Layer**: A sequential module that combines the outputs from the text and image encoders, processes them through a linear layer, applies a ReLU activation, and finally uses dropout for regularization.

- **Classifier**: A linear layer that takes the fused representation and outputs logits over the possible answers.

- **Loss Function**: Utilizes CrossEntropyLoss for training the model.

## Initialization Parameters
- `pretrained_text_name` (str): Identifier for the pretrained text model to be loaded.
- `pretrained_image_name` (str): Identifier for the pretrained image model to be loaded.
- `num_labels` (int): Number of possible answers or labels. Defaults to the length of `answer_space`.
- `intermediate_dim` (int): Dimension of the fused representation. Defaults to 512.
- `dropout` (float): Dropout rate for regularization. Defaults to 0.5.

## Forward Pass Arguments
- `input_ids` (torch.LongTensor): Tokenized text input.
- `pixel_values` (torch.FloatTensor): Preprocessed image input.
- `attention_mask` (Optional[torch.LongTensor]): Mask to avoid attention on padding token indices for text.
- `token_type_ids` (Optional[torch.LongTensor]): Segment token indices to indicate first and second portions of the inputs for models that require them.
- `labels` (Optional[torch.LongTensor]): True labels for computing loss during training.

## Outputs
- A dictionary containing:
  - `logits`: The classification logits.
  - `loss`: The computed loss, returned only if `labels` is provided.

## Usage
The model can be instantiated with specific pretrained models for text and image processing. It is designed to work within a training and evaluation framework, where it can be trained on a dataset of visual questions and images, evaluated for accuracy and other metrics, and used to make predictions.


In [5]:
import torch
from torch import nn
from transformers import AutoModel

# nn.Module is the base class for PyTorch's neural network
class MultimodalVQAModel(nn.Module):
    def __init__(self, pretrained_text_name, pretrained_image_name, num_labels=len(answer_space), intermediate_dim=512, dropout=0.5):
        super(MultimodalVQAModel, self).__init__()
        self.num_labels = num_labels
        self.pretrained_text_name = pretrained_text_name
        self.pretrained_image_name = pretrained_image_name

        # Text and image encoders
        self.text_encoder = AutoModel.from_pretrained(self.pretrained_text_name)
        self.image_encoder = AutoModel.from_pretrained(self.pretrained_image_name)

        # Fusion layer
        # The "Sequential" module is a "container" module that allows defining a feed-forward network
        self.fusion = nn.Sequential(
            # text_encoder.config.hidden_size allows us to get the size of the raw outputs from the text encoder
            nn.Linear(self.text_encoder.config.hidden_size + self.image_encoder.config.hidden_size, intermediate_dim),
            # the ReLU (Rectified Linear Unit) function is an activation function that filters our data.
            # It passes positive values (x > 0) to the following layers of the neural network.
            nn.ReLU(),
            # Dropout method involves randomly deactivating neuron outputs (0.5 for hidden layers)
            nn.Dropout(dropout),
        )

        # Classifier
        self.classifier = nn.Linear(intermediate_dim, self.num_labels)
        # Loss function
        self.criterion = nn.CrossEntropyLoss()

    # Forward pass as a method
    def forward(
            self,
            input_ids: torch.LongTensor,
            pixel_values: torch.FloatTensor,
            attention_mask: Optional[torch.LongTensor] = None,
            token_type_ids: Optional[torch.LongTensor] = None,
            labels: Optional[torch.LongTensor] = None):
        
        encoded_text = self.text_encoder(
            input_ids=input_ids,
            attention_mask=attention_mask,
            token_type_ids=token_type_ids,
            return_dict=True,
        )
        
        encoded_image = self.image_encoder(
            pixel_values=pixel_values,
            return_dict=True,
        )
        
        fused_output = self.fusion(
            torch.cat(
                [
                    encoded_text['pooler_output'],
                    encoded_image['pooler_output'],
                ],
                dim=1
            )
        )

        logits = self.classifier(fused_output)
        
        out = {
            "logits": logits
        }
        if labels is not None:
            loss = self.criterion(logits, labels)
            out["loss"] = loss
        
        return out


### Function to Create Multimodal VQA Collator and Model

##### Overview
This function, `createMultimodalVQACollatorAndModel`, initializes and returns a data collator and a multimodal Visual Question Answering (VQA) model for use in training and evaluation. It specifically prepares the components for handling both text and image inputs.

###### Parameters
- `text` (str): Identifier for the pretrained text model. Defaults to `'dmis-lab/biobert-v1.1'`.
- `image` (str): Identifier for the pretrained image model. Defaults to `'microsoft/swin-tiny-patch4-window7-224'`.

##### Returns
- `multi_collator`: An instance of `MultimodalCollator`, configured with the specified text tokenizer and image feature extractor.
- `multi_model`: An instance of `MultimodalVQAModel`, prepared with the given pretrained text and image models, ready for training or inference.

##### Functionality
1. **Tokenizer and Feature Extractor Initialization**: Loads the specified tokenizer for text and feature extractor for images.
2. **Collator Creation**: Combines the tokenizer and feature extractor into a collator that prepares batches of data.
3. **Model Initialization**: Constructs the multimodal model with the specified text and image encoders.

##### Usage
This setup is essential for multimodal learning tasks where both textual questions and visual inputs are involved, facilitating seamless preprocessing, model training, and evaluation.


In [6]:
from transformers import AutoTokenizer, AutoFeatureExtractor

def createMultimodalVQACollatorAndModel(text='dmis-lab/biobert-v1.1', image='microsoft/swin-tiny-patch4-window7-224'):
    # Initialize the correct text tokenizer and image feature extractor, and use them to create the collator (Dataloader)
    tokenizer = AutoTokenizer.from_pretrained(text)
    preprocessor = AutoFeatureExtractor.from_pretrained(image)
    multi_collator = MultimodalCollator(tokenizer=tokenizer, preprocessor=preprocessor)
    
    # Instantiate and initialize the multimodal model with the appropriate pretrained models
    multi_model = MultimodalVQAModel(pretrained_text_name=text, pretrained_image_name=image).to(device)
    
    return multi_collator, multi_model


### Wu-Palmer Similarity Measure Function

#### Overview
Calculates the Wu-Palmer similarity score between two words using WordNet's synsets, facilitating semantic similarity assessment in NLP tasks.

#### Functions

- **wup_measure(a, b, similarity_threshold=0.925)**: Main function to compute the Wu-Palmer similarity score between two words `a` and `b`. It considers the best semantic field match for each word and applies a similarity threshold to adjust the weighting.

- **get_semantic_field(word)**: Retrieves the semantic field (synsets) for a given word, used to interpret its meanings in different contexts.

- **get_stem_word(word)**: Extracts the base word and its weight, especially handling cases where the word comes with an additional identifier (e.g., `word\d+:wordid`).

#### Process
1. **Initialization**: Sets a global weight and extracts the stem word for both input words.
2. **Semantic Field Retrieval**: Gets the semantic fields for both words.
3. **Similarity Calculation**: Computes the Wu-Palmer similarity scores across all semantic field combinations of the two words.
4. **Final Score**: Adjusts the final score based on similarity thresholds and returns the weighted score.

This method offers a nuanced way to compare word meanings based on their positions within a hierarchical structure like WordNet.


In [7]:
from nltk.corpus import wordnet

def wup_measure(a, b, similarity_threshold=0.925):
    """
    Returns the Wu-Palmer similarity score.
    Specifically, it computes:
    max_{x \in interp(a)} max_{y \in interp(b)} wup(x,y)
    where interp is a 'field of interpretation'
    """
    def get_semantic_field(word):
        """
        Retrieves the semantic field (synsets) for the given word.
        """
        semantic_field = wordnet.synsets(word, pos=wordnet.NOUN)
        weight = 1.0  # Initial weight is set to 1.0 for all words
        return semantic_field, weight

    def get_stem_word(word):
        """
        Sometimes, the answer is in the form word\d+:wordid.
        In that case, we return the word and down-weight it.
        """
        weight = 1.0  # Initial weight, assuming no special weighting is needed
        return word, weight

    global_weight = 1.0  # Initialize global weight to 1.0

    a, global_weight_a = get_stem_word(a)
    b, global_weight_b = get_stem_word(b)
    global_weight = min(global_weight_a, global_weight_b)  # Use the smaller of the two global weights

    if a == b:
        # The words are the same
        return 1.0 * global_weight

    if not a or not b:
        # If either word is empty, return 0
        return 0

    interp_a, weight_a = get_semantic_field(a)
    interp_b, weight_b = get_semantic_field(b)

    if not interp_a or not interp_b:
        # If either word has no semantic field, return 0
        return 0

    # Take the most optimistic interpretation
    global_max = 0.0
    for x in interp_a:
        for y in interp_b:
            local_score = x.wup_similarity(y)
            if local_score and local_score > global_max:
                global_max = local_score

    # Adjust weighting based on the semantic fields, unless the score is high, indicating synonyms
    if global_max < similarity_threshold:
        interp_weight = 0.1
    else:
        interp_weight = 1.0

    final_score = global_max * weight_a * weight_b * interp_weight * global_weight
    return final_score


In [8]:
import numpy as np
# Assuming wup_measure is defined as shown earlier and answer_space is a predefined dictionary mapping indices to words

def batch_wup_measure(labels, preds):
    """
    Wrapper around the wup_measure function to process inputs in batches.
    Calculates the Wu-Palmer similarity scores for pairs of labels and predictions,
    then returns the average score for the batch.

    :param labels: A list of indices representing the correct answers.
    :param preds: A list of indices representing the predicted answers.
    :return: The average Wu-Palmer similarity score for the batch.
    """
    wup_scores = [wup_measure(answer_space[label], answer_space[pred]) for label, pred in zip(labels, preds)]
    return np.mean(wup_scores)


In [9]:
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import numpy as np
from typing import Dict, Tuple

# Assuming batch_wup_measure is defined as shown previously

def compute_metrics(eval_tuple: Tuple[np.ndarray, np.ndarray]) -> Dict[str, float]:
    """
    Function to calculate all relevant performance metrics, to be passed to the trainer.
    This function calculates and returns a dictionary containing metrics such as WUPS,
    accuracy, precision, recall, and F1 score, with precision, recall, and F1 score multiplied by 10.

    :param eval_tuple: A tuple containing an array of logits and an array of labels.
    :return: A dictionary with keys as metric names and values as the calculated metrics.
    """
    logits, labels = eval_tuple  # Unpack the tuple into logits and labels
    preds = logits.argmax(axis=-1)  # Predictions are derived from logits

    # Calculate metrics
    metrics = {
        "wups": batch_wup_measure(labels, preds),  # WUPS calculation
        "accuracy": accuracy_score(labels, preds),  # Accuracy calculation
        "precision": precision_score(labels, preds, average='macro') * 10,  # Precision calculation
        "recall": recall_score(labels, preds, average='macro') * 10,  # Recall calculation
        "f1": f1_score(labels, preds, average='macro') * 10,  # F1 score calculation
    }
    return metrics


## Training with Multimodal VQA Model

### Training Setup
- **Arguments**: Configurations for training include epochs, batch sizes, learning rate, and strategies for logging, evaluation, and saving checkpoints.
- **Collator & Model**: Initializes using `createMultimodalVQACollatorAndModel` to properly handle multimodal (text and image) inputs.
- **Trainer**: Manages training and evaluation processes, utilizing specified datasets, collator, and metrics calculation.

### Process Overview
1. **Training Loop**: Executes training over 15 epochs, optimizing model weights based on training data.
2. **Model Saving**: Limits storage by saving only the top 3 performing checkpoints during training.
3. **Evaluation**: After training completion, evaluates the model on a validation set to calculate final metrics.


In [11]:
# Assuming the device is already defined (e.g., device = torch.device("cuda" if torch.cuda.is_available() else "cpu"))
# Initialize the collator and model
collator, model = createMultimodalVQACollatorAndModel(text='dmis-lab/biobert-v1.1', image='microsoft/swin-tiny-patch4-window7-224')
model.to(device)

# Prepare DataLoader
train_dataloader = DataLoader(dataset, batch_size=32, shuffle=True, collate_fn=collator, num_workers=8)

# Define an optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=5e-5)

# Optionally, define a scheduler
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

# Define the number of epochs
num_epochs = 3

# Training loop
for epoch in range(num_epochs):
    model.train()
    running_loss = 0.0
    for batch in tqdm(train_dataloader, desc=f"Epoch {epoch+1}/{num_epochs}"):
        # Move batch to device
        input_ids = batch['input_ids'].to(device)
        attention_mask = batch['attention_mask'].to(device)
        token_type_ids = batch['token_type_ids'].to(device)
        pixel_values = batch['pixel_values'].to(device)
        labels = batch['labels'].to(device)

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward pass
        outputs = model(input_ids=input_ids, attention_mask=attention_mask, token_type_ids=token_type_ids, pixel_values=pixel_values, labels=labels)
        loss = outputs['loss']
        running_loss += loss.item()

        # Backward pass and optimize
        loss.backward()
        optimizer.step()

    # Update the learning rate
    scheduler.step()

    print(f"Epoch {epoch+1}/{num_epochs}, Loss: {running_loss/len(train_dataloader)}")

# Save the model
torch.save(model.state_dict(), 'multimodal_vqa_model.pth')


KeyboardInterrupt: 

In [1]:
from transformers import AutoModel, AutoTokenizer, AutoFeatureExtractor

# Test loading the text model
try:
    text_model = AutoModel.from_pretrained("dmis-lab/biobert-v1.1")
    print("Text model loaded successfully.")
except Exception as e:
    print(f"Failed to load text model: {e}")

# Test loading the image model
try:
    image_model = AutoFeatureExtractor.from_pretrained("microsoft/swin-tiny-patch4-window7-224")
    print("Image model loaded successfully.")
except Exception as e:
    print(f"Failed to load image model: {e}")


Text model loaded successfully.
Image model loaded successfully.




In [12]:
import os
import torch
from tqdm.auto import tqdm
from torch.utils.data import DataLoader
from datasets import load_dataset


In [13]:
# Define training hyperparameters
num_epochs = 3
batch_size = 32

# Initialize device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

# Initialize the collator and model
collator, model = createMultimodalVQACollatorAndModel(
    text='dmis-lab/biobert-v1.1', 
    image='microsoft/swin-tiny-patch4-window7-224'
)
model.to(device)

# DataLoader for training and validation sets
train_dataloader = DataLoader(
    dataset['training'], 
    batch_size=batch_size, 
    shuffle=True, 
    collate_fn=collator, 
    num_workers=8,
    pin_memory=True if device.type == "cuda" else False  # Pin memory to speed up data transfer to GPU
)

valid_dataloader = DataLoader(
    dataset['validation'], 
    batch_size=batch_size, 
    shuffle=False, 
    collate_fn=collator, 
    num_workers=8,
    pin_memory=True if device.type == "cuda" else False
)

# Define an optimizer
optimizer = torch.optim.Adam(model.parameters(), lr=5e-5)

# Optionally, define a scheduler
scheduler = torch.optim.lr_scheduler.StepLR(optimizer, step_size=3, gamma=0.1)

# Training loop
for epoch in range(num_epochs):
    model.train()
    train_loss = 0.0
    for batch in tqdm(train_dataloader, desc=f"Epoch {epoch+1}/{num_epochs} Training"):
        # Move batch to device
        batch = {k: v.to(device) for k, v in batch.items() if v is not None}

        # Zero the parameter gradients
        optimizer.zero_grad()

        # Forward + backward + optimize
        outputs = model(**batch)
        loss = outputs['loss']
        loss.backward()
        optimizer.step()

        train_loss += loss.item()

    scheduler.step()

    # Validation loop
    model.eval()
    valid_loss = 0.0
    with torch.no_grad():
        for batch in tqdm(valid_dataloader, desc=f"Epoch {epoch+1} Validation"):
            batch = {k: v.to(device) for k, v in batch.items() if v is not None}
            outputs = model(**batch)
            loss = outputs['loss']
            valid_loss += loss.item()

    print(f"Epoch {epoch+1}: Train Loss = {train_loss / len(train_dataloader)}, Validation Loss = {valid_loss / len(valid_dataloader)}")

# Save the model
torch.save(model.state_dict(), 'multimodal_vqa_model.pth')
