In [None]:
import torch
import torch.nn as nn
import torchvision.transforms as transforms
from transformers import BertTokenizer, BertModel, CLIPModel,CLIPProcessor
from datasets import load_dataset
from collections import Counter
from torchtext.vocab import Vocab
import cv2
import torch.optim as optim
import math

In [None]:
dataset= load_dataset("flaviagiammarino/vqa-rad")

In [None]:
train_data = dataset['train']

# Extracting train image, question, and answer
train_images = train_data['image']
train_questions = train_data['question']
train_answers = train_data['answer']

Image Encoder

The medical image is fed into the image encoder to generate the corresponding visual representation. The image is first resized to the size 224x224x3 pixels. Then, it is partitioned into 49 non overlapping patches with a spatial dimension 0f 32 x 32 pixels. These patches are flattened into one dimensional vectors and mapped with an image embedding layer into dimensions 768 to match the encoder dimension.The positional encoding is combined with the patch representations and passed to the image encoder.The adopted image encoder is a ViT32 model. Typically this model is composed of 12 identical layers. Each block is preceded by a normalization layer and a residual connection to the next block. The MSA in the encoder employs the self attention mechanism which is utilized to find correlation between different patches of the medical image. To determine this correlation the embedded representation of the input image is transformed into three distinct matrices by using three linear layers. These resultant matrices are the query Q and the K matrices. The resulting value is divided by the square root of dimension of the K.The score is passed through a SoftMax operation to obtain the attention weights. Finally the V vector is multiplied by the output of the SoftMax to find the weighted input. This operation is expressed in the following formula: Attention(Q,K,V)= Softmax(QKT / √d). V Multiple Independent self-attention heads compute the scaled dot product attention in the MSA block. The results of all the attention heads are concatenated together and then passed to the FFN block. The FFN consists of two fully connected layers with a Gaussian error linear unit activation function (GELU) applied in between. The encoded image representation obtained from the image encoder is subsequently projected into a vector of dimension 512 to match it with the dimension of the question representation. Thus the resultant representation has a dimension of 49 x 512.

In [None]:
class ImageEncoder(nn.Module):
    def __init__(self):
        super(ImageEncoder, self).__init__()
        self.resize = transforms.Resize((224, 224))
        self.patch_embedding = nn.Linear(32*32*3, 768)
        self.positional_encoding = nn.Parameter(torch.randn(1, 49, 768))
        self.encoder = ViT32()

    def forward(self, x):
        x = self.resize(x)
        patches = x.unfold(2, 32, 32).unfold(3, 32, 32)
        patches = patches.contiguous().view(-1, 32*32*3)
        patches = self.patch_embedding(patches)
        patches = patches + self.positional_encoding
        encoded_image = self.encoder(patches)
        encoded_image = encoded_image.view(-1, 49, 512)
        return encoded_image

class ViT32(nn.Module):
    def __init__(self):
        super(ViT32, self).__init__()
        self.layers = nn.ModuleList([EncoderLayer() for _ in range(12)])

    def forward(self, x):
        for layer in self.layers:
            x = layer(x)
        return x

class EncoderLayer(nn.Module):
    def __init__(self):
        super(EncoderLayer, self).__init__()
        self.norm = nn.LayerNorm(768)
        self.msa = MultiHeadSelfAttention()
        self.ffn = FeedForwardNetwork()

    def forward(self, x):
        x = x + self.msa(self.norm(x))
        x = x + self.ffn(self.norm(x))
        return x

class MultiHeadSelfAttention(nn.Module):
    def __init__(self):
        super(MultiHeadSelfAttention, self).__init__()
        self.query_linear = nn.Linear(768, 768)
        self.key_linear = nn.Linear(768, 768)
        self.value_linear = nn.Linear(768, 768)
        self.dropout = nn.Dropout(0.1)

    def forward(self, x):
        Q = self.query_linear(x)
        K = self.key_linear(x)
        V = self.value_linear(x)
        V = V.reshape(-1, 49, 768)
        attention_scores = torch.matmul(Q, K.T)/ math.sqrt(768)
        attention_scores = attention_scores.softmax(dim=-1)
        attention_scores = attention_scores * V
        return attention_scores

class FeedForwardNetwork(nn.Module):
    def __init__(self):
        super(FeedForwardNetwork, self).__init__()
        self.fc1 = nn.Linear(768, 2048)
        self.gelu = nn.GELU()
        self.fc2 = nn.Linear(2048, 768)

    def forward(self, x):
        x = self.fc1(x)
        x = self.gelu(x)
        x = self.fc2(x)
        return x

Question Encoder

The question encoder uses a BERT-like architecture to generate the question’s textual features. Similar to the image encoder, the question encoder consists of a stack of 12 identical layers. the first step in encoding the question is tokenization, in which the question is tokenized as a sequence of word tokens. Two special tokens, and , are appended to the sequence to mark its beginning and end, respectively. The encoder uses a sequence with a fixed length equal to 77 tokens and uses a vocabulary size of 49,408 words. The word embedding layer embeds the sequence of the question tokens into features of dimension 512. A learnable positional embedding is added to the sequence to provide information about the order of each word. The final representation is generated by feeding the initial representation through the 12 layers of the encoder. Analogously to the image encoder, the question encoder employs the MSA block to capture dependencies within the question tokens. The model also uses normalization layers and skip connections, but unlike the image encoder, the normalization layers come after the MSA and FNN. The output of the question encoder is the question feature representation of size 77 x 512. This representation holds information about the semantics of the question and the relationships between words.

In [None]:
class QuestionEncoder(nn.Module):
    def __init__(self):
        super(QuestionEncoder, self).__init__()
        self.tokenizer = BertTokenizer.from_pretrained('bert-base-uncased')
        self.bert = BertModel.from_pretrained('bert-base-uncased')
        self.positional_embedding = nn.Parameter(torch.randn(1, 77, 512))

    def forward(self, question):
        # Tokenize the question
        input_ids = self.tokenizer.encode(question, add_special_tokens=True)
        input_ids = input_ids[:77]  # Truncate or pad to 77 tokens
        input_ids = torch.tensor([input_ids])

        # Add positional embedding
        pos_embedding = self.positional_embedding.expand(input_ids.shape[0], -1, -1)
        input_embeddings = self.bert.embeddings(input_ids) + pos_embedding

        # Pass through the BERT model
        output = self.bert(input_embeddings)[0]

        return output

Multi modal Representation

VQA model is supposed to receive a question and look at the given image to find relevant information for generating the correct answer. To model this, the image features fXi ∈ R49 x 512 obtained from the image encoder and the question features fqi ∈ R77x512 obtained from the question encoder are concatenated to form the joint representation f i = fiX XOR fiq. Here, XOR is the concatenation operator. The representation f i, which aggregates the relevant information from the two modalities, is supplied as input to the answer generator which decodes it into an answer. Since the VQA task requires encoding both the question and the image, we leverage the rich semantic embedding of the contrastive language–image pre-training (CLIP) model [52] and use it as a backbone. CLIP is built on dual transformers that have been optimized by contrastive learning to match a large batch of image–text pairs. Specifically, CLIP learns a multi-modal embedding space by jointly training an image encoder and a text encoder on a corpus of 400 M image–text pairs. The contrastive learning used by the CLIP model aims at maximizing the similarity of truly corresponding image–text pairs while minimizing the similarity of mismatched image–text pairs

In [None]:
class MultimodalFusion(nn.Module):
    def __init__(self):
        super(MultimodalFusion, self).__init__()
        self.question_encoder = QuestionEncoder()
        self.image_encoder = ImageEncoder()
        self.clip_model = CLIPModel.from_pretrained('clip')

    def forward(self, question, image):
        # Encode the question and image
        question_features = self.question_encoder(question)
        image_features = self.image_encoder(image)

        # Flatten the features
        question_features = question_features.view(question_features.shape[0], -1)
        image_features = image_features.view(image_features.shape[0], -1)

        # Concatenate the features
        fused_features = torch.cat([question_features, image_features], dim=-1)

        # Pass the fused features through the CLIP model
        clip_output = self.clip_model(fused_features.unsqueeze(0))[0]

        # Return the CLIP output
        return clip_output

Answer Decoder
The decoder is modeled as a generative model. It generates the answer one word at a time in an autoregressive manner. When a word is predicted, it is added to the input sequence, which then serves as the model’s new input in the next time step. The decoder architecture consists of two identical layers. Similar to the question encoding, the input answer is first tokenized into words and trimmed or padded to the maximum length of 77 words. The two special tokens and are appended to the sequence, and each word is represented as a word embedding. The positional information is added, and the word is fed into the first layer of the decoder. The decoder layer is composed of the same MSA and FFN blocks present in the encoder. However, the decoder uses a masked self-attention block that learns the dependencies within the answer tokens without considering future tokens. This helps the model to make a prediction about the next word based on the sequence of the previous tokens. Another difference in the decoder is the multi-head cross-attention block, which is designed to capture the interdependencies between two different inputs, as opposed to the self-attention mechanism employed by the image and question encoders, which derives Q, K, and V from the same modality. As shown in Figure 4, the cross-attention mechanism in each decoder layer uses Q derived from the multi-modal representation, and K, and V derived from the answer. This helps the model to detect the correlation between the different data modalities involved in the VQA task

In [None]:
class AnswerDecoder(nn.Module):
    def __init__(self, embedding_size, hidden_size, num_layers, vocab_size):
        super(AnswerDecoder, self).__init__()
        self.embedding_size = embedding_size
        self.hidden_size = hidden_size
        self.num_layers = num_layers
        self.vocab_size = vocab_size

        self.embedding = nn.Embedding(vocab_size, embedding_size)
        self.positional_embedding = nn.Parameter(torch.randn(1, 77, embedding_size))

        self.msa = nn.MultiheadAttention(embedding_size, num_heads=8)
        self.ffn = nn.Sequential(
            nn.Linear(embedding_size, 4 * embedding_size),
            nn.ReLU(),
            nn.Linear(4 * embedding_size, embedding_size)
        )

        self.cross_msa = nn.MultiheadAttention(embedding_size, num_heads=8)

        self.dropout = nn.Dropout(0.1)

        self.output_layer = nn.Linear(embedding_size, vocab_size)

    def forward(self, input_sequence, multimodal_representation):
        # Add positional embedding
        pos_embedding = self.positional_embedding.expand(input_sequence.shape[0], -1, -1)
        input_embeddings = self.embedding(input_sequence) + pos_embedding

        # Masked self-attention
        mask = torch.triu(torch.ones(input_sequence.shape[1], input_sequence.shape[1]), diagonal=1)
        mask = mask.unsqueeze(0).unsqueeze(0).expand(input_embeddings.shape[0], -1, -1, -1)
        masked_msa_output = self.msa(input_embeddings, input_embeddings, input_embeddings, attn_mask=mask)[0]
        ffn_output = self.ffn(masked_msa_output)

        # Multi-head cross-attention
        cross_msa_output = self.cross_msa(ffn_output, multimodal_representation, multimodal_representation)[0]

        # Output layer
        output = self.output_layer(cross_msa_output)

        return output

In [None]:
class VQAModel(nn.Module):
    def __init__(self, image_encoder, question_encoder, answer_decoder):
        super(VQAModel, self).__init__()
        self.image_encoder = image_encoder
        self.question_encoder = question_encoder
        self.answer_decoder = answer_decoder

    def forward(self, image, question, answer=None):
        image_features = self.image_encoder(image)
        question_features = self.question_encoder(question)
        fused_features = torch.cat([image_features, question_features], dim=-1)
        if answer is not None:
            output = self.answer_decoder(answer, fused_features)
            return output
        else:
            return fused_features

class VQATrainer:
    def __init__(self, model, criterion, optimizer, device):
        self.model = model
        self.criterion = criterion
        self.optimizer = optimizer
        self.device = device

    def train(self, image, question, answer):
        self.model.train()
        self.optimizer.zero_grad()
        image = image.to(self.device)
        question = question
        answer = answer
        output = self.model(image, question, answer)
        loss = self.criterion(output.reshape(-1, output.shape[-1]), answer.reshape(-1))
        loss.backward()
        self.optimizer.step()
        return loss.item()
        #return output

    def generate_answer(self, image, question, max_length=77):
        self.model.eval()
        image = image.to(self.device)
        question = question.to(self.device)
        fused_features = self.model(image, question)
        start_token = torch.tensor([[self.model.answer_decoder.vocab_size - 1]], device=self.device).unsqueeze(1)
        input_sequence = start_token
        answer = []
        for i in range(max_length):
            output = self.model(image, question, input_sequence)
            probabilities = nn.functional.softmax(output[:, -1], dim=-1)
            next_token = torch.multinomial(probabilities, 1)
            input_sequence = torch.cat([input_sequence, next_token], dim=-1)
            if next_token.item() == self.model.answer_decoder.vocab_size - 2:
                break
            answer.append(next_token.item())
        return answer
image_encoder = ImageEncoder()
question_encoder = QuestionEncoder()


# Initialize the answer decoder
embedding_size = 512
hidden_size = 512
num_layers = 2
vocab_size = 49408
answer_decoder = AnswerDecoder(embedding_size, hidden_size, num_layers, vocab_size)

# Initialize the VQA model
model = VQAModel(image_encoder, question_encoder, answer_decoder)

# Initialize the criterion, optimizer, and device
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=1e-5)
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
model.to(device)

# Initialize the VQA trainer
trainer = VQATrainer(model, criterion, optimizer, device)

In [None]:

num_epochs = 35
# Define a transform to convert images to tensors
transform = transforms.Compose([transforms.ToTensor()])
embedding = nn.Embedding(num_embeddings=10000, embedding_dim=32)

for epoch in range(num_epochs):
    for i in range(len(train_images)):
        # Convert the image to a tensor using the transform
        image_tensor = transform(train_images[i])
        image = image_tensor
        question = train_questions[i]
        answer = train_answers[i]
        loss = trainer.train(image, question, answer)
        print(f'Epoch {epoch + 1}/{num_epochs}, Step {i + 1}/{len(train_images)}, Loss {loss:.4f}')


In [None]:
# Test the model
test_image_path =datasets.ImageFolder("C:\Users\Ayushi\Desktop\Visual Question Answer- using medical imaging\Test_image", transform=transform)s
test_question ="What abnormalities are present in the lungs"
answer = trainer.generate_answer(test_image, test_question)
print(answer)

In [None]:
torch.save(model.state_dict(), "C:/Users/Ayushi/Desktop/Visual Question Answer- using medical imaging/vqa_model.pth")