In [None]:
import nltk
nltk.download("wordnet")
nltk.download("omw-1.4")

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
import os

# Check current working directory
os.chdir('/content/drive/My Drive/Handxray_Dataset/handxray')
print("Current Working Directory:", os.getcwd())


In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms, models
from torch.nn.utils.rnn import pad_sequence
from PIL import Image
import pandas as pd
import ast

In [None]:
# Check device
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print("Using device:", device)

In [None]:
# Load the DataFrame
file_path = 'VQA_QAEncoded.xlsx'
df = pd.read_excel(file_path)

In [None]:
# Ensure question_encoded and answer_encoded are properly formatted
df['question_encoded'] = df['question_encoded'].apply(lambda x: ast.literal_eval(x) if isinstance(x, str) else x)
df['answer_encoded'] = df['answer_encoded'].astype(int)


In [None]:
#Ensure all answers in your dataset are integers and lie in the range [0, answer_vocab_size - 1]
print("Unique answer labels:", df['answer_encoded'].unique())
print("Answer vocabulary size:", len(df['answer_encoded'].unique()))


In [None]:
# Build question_vocab from unique tokens in questions
question_vocab = set(token for q in df['question_encoded'] for token in q)
question_vocab = {token: idx for idx, token in enumerate(question_vocab)}

In [None]:
#check the final layer's output dimension:
print("Answer vocab size used in model:", len(df['answer_encoded'].unique()))


In [None]:
# Define the custom dataset
class VQADataset(Dataset):
    def __init__(self, dataframe, transform=None):
        self.dataframe = dataframe
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        # Load Image
        img_path = self.dataframe.iloc[idx]['image_path']
        image = Image.open(img_path).convert("RGB")
        if self.transform:
            image = self.transform(image)

        # Load Question and Answer
        question = torch.tensor(self.dataframe.iloc[idx]['question_encoded'], dtype=torch.long)
        answer = torch.tensor(self.dataframe.iloc[idx]['answer_encoded'], dtype=torch.long)


        return image, question, answer

In [None]:
print("Answer range:", df['answer_encoded'].min(), df['answer_encoded'].max())
print("Question vocab size:", len(question_vocab))


In [None]:
print(question_vocab)

In [None]:
# Define transformations
transform = transforms.Compose([
    transforms.Resize((224, 224)),
    transforms.ToTensor(),
    transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
])

In [None]:
# Create the dataset and dataloader
dataset = VQADataset(df, transform=transform)

# Collate function for padding questions
def collate_fn(batch):
    images = []
    questions = []
    answers = []

    for item in batch:
        images.append(item[0])
        questions.append(item[1])
        answers.append(item[2])

        # Stack images and pad questions
    images = torch.stack(images)
    questions = pad_sequence(questions, batch_first=True, padding_value=0)
    answers = torch.tensor(answers, dtype=torch.long)

    return images, questions, answers

#dataloader = DataLoader(dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)

In [None]:
from torch.utils.data import DataLoader, random_split

# Split dataset (80% train, 20% test)
train_size = int(0.8 * len(dataset))
test_size = len(dataset) - train_size
train_dataset, test_dataset = random_split(dataset, [train_size, test_size])

# Define DataLoaders
train_loader = DataLoader(train_dataset, batch_size=32, shuffle=True, collate_fn=collate_fn)
test_loader = DataLoader(test_dataset, batch_size=32, shuffle=False, collate_fn=collate_fn)

In [None]:
import torch
import torch.nn as nn
import torchvision.models as models

class VQAModel(nn.Module):
    def __init__(self, question_vocab_size, hidden_size, answer_vocab_size, resnet_type='resnet152'):
        super(VQAModel, self).__init__()

        # Load the specified ResNet model
        self.resnet = self._get_resnet_model(resnet_type, hidden_size)

        # Embedding for questions
        self.embedding = nn.Embedding(question_vocab_size, hidden_size)

        # LSTM for question processing
        self.lstm = nn.LSTM(input_size=hidden_size, hidden_size=hidden_size, batch_first=True)

        # Final fully connected layer to predict the answer
        self.fc = nn.Linear(hidden_size * 2, answer_vocab_size)

        print(f"[INFO] Initialized VQAModel with {resnet_type}, hidden size {hidden_size}")

    def _get_resnet_model(self, resnet_type, hidden_size):
        # Dynamically fetch the ResNet model
        resnet_constructor = getattr(models, resnet_type)
        resnet = resnet_constructor(pretrained=True)

        # Replace the last fully connected layer to output hidden_size
        resnet.fc = nn.Linear(resnet.fc.in_features, hidden_size)
        return resnet

    def forward(self, images, questions):
        print("Forward pass started.")
        # Get image features
        image_features = self.resnet(images)

        # Process questions
        question_embedding = self.embedding(questions)
        print(f"Question embedding shape: {question_embedding.shape}")

        _, (question_features, _) = self.lstm(question_embedding)
        print(f"Question features from LSTM shape: {question_features.shape}")

        # Concatenate image and question features
        combined_features = torch.cat((image_features, question_features[-1]), dim=1)
        print(f"Combined features shape: {combined_features.shape}")

        # Get the final output (answer prediction)
        output = self.fc(combined_features)
        print(f"Model output shape: {output.shape}")

        return output


In [None]:
# model = VQAModel(
#     question_vocab_size=5000,
#     hidden_size=512,
#     answer_vocab_size=1000,
#     resnet_type='resnet101'  # You can change this to resnet18, resnet34, etc.
# )


In [None]:
class VQAModel(nn.Module):
    def __init__(self, question_vocab_size, hidden_size, answer_vocab_size,resnet_type):
        super(VQAModel, self).__init__()

        # Use ResNet50 for image feature extraction
        self.resnet = models.resnet50(pretrained=True)
        self.resnet.fc = nn.Linear(self.resnet.fc.in_features, hidden_size)

        # Embedding for questions
        self.embedding = nn.Embedding(question_vocab_size, hidden_size)

        # LSTM for question processing
        self.lstm = nn.LSTM(input_size=hidden_size, hidden_size=hidden_size, batch_first=True)

        # Final fully connected layer to predict the answer
        self.fc = nn.Linear(hidden_size * 2, answer_vocab_size)

        # DEBUG: Print embedding layer dimensions
        print(f"Embedding layer initialized with vocab size {question_vocab_size} and hidden size {hidden_size}")

    def forward(self, images, questions):
        print("Forward pass started.")
        # Get image features
        image_features = self.resnet(images)

        # Process questions
        question_embedding = self.embedding(questions)
        print(f"Question embedding shape: {question_embedding.shape}")  # DEBUG

        _, (question_features, _) = self.lstm(question_embedding)
        print(f"Question features from LSTM shape: {question_features.shape}")  # DEBUG

        # Concatenate image and question features
        combined_features = torch.cat((image_features, question_features[-1]), dim=1)
        print(f"Combined features shape: {combined_features.shape}")  # DEBUG

        # Get the final output (answer prediction)
        output = self.fc(combined_features)
        print(f"Model output shape: {output.shape}")  # DEBUG

        return output


In [None]:
# Initialize model
hidden_size = 256
answer_vocab_size = 10  # As derived earlier
model = VQAModel(question_vocab_size=37, hidden_size=hidden_size, answer_vocab_size=answer_vocab_size,resnet_type='resnet152').to(device)

# model = VQAModel(
#     question_vocab_size=5000,
#     hidden_size=512,
#     answer_vocab_size=1000,
#     resnet_type='resnet101'  # You can change this to resnet18, resnet34, etc.
# )

# Define optimizer and loss function (AdamW)
optimizer = optim.Adam(model.parameters(), lr=0.001)
# Use weights assigned to each class
criterion = nn.CrossEntropyLoss()


In [None]:
for images, questions, answers in train_loader:
    images, questions, answers = images.to(device), questions.to(device), answers.to(device)


In [None]:
from nltk.corpus import wordnet
import numpy as np

def wup_similarity(pred, gt, threshold=0.8):
    #Compute Wu-Palmer Similarity between predicted and ground truth answers
    pred_synsets = wordnet.synsets(pred)
    gt_synsets = wordnet.synsets(gt)

    if not pred_synsets or not gt_synsets:
        return 0  # If no synsets are found, similarity is 0

    max_sim = max(wordnet.wup_similarity(p, g) or 0 for p in pred_synsets for g in gt_synsets)
    return 1 if max_sim >= threshold else max_sim

In [None]:
import torch
import matplotlib.pyplot as plt

def train_and_evaluate(model, train_loader, test_loader, criterion, optimizer, num_epochs, device, save_path):
    train_losses = []
    test_losses = []

    for epoch in range(num_epochs):
        model.train()
        total_train_loss = 0

        # 🔹 Training Loop
        for images, questions, answers in train_loader:
            images, questions, answers = images.to(device), questions.to(device), answers.to(device)

            optimizer.zero_grad()
            outputs = model(images, questions)
            loss = criterion(outputs, answers)
            loss.backward()
            optimizer.step()

            total_train_loss += loss.item()

        avg_train_loss = total_train_loss / len(train_loader)
        train_losses.append(avg_train_loss)

        # 🔹 Evaluation on Test Data
        model.eval()
        total_test_loss = 0

        with torch.no_grad():
            for images, questions, answers in test_loader:
                images, questions, answers = images.to(device), questions.to(device), answers.to(device)

                outputs = model(images, questions)
                loss = criterion(outputs, answers)

                total_test_loss += loss.item()

        avg_test_loss = total_test_loss / len(test_loader)
        test_losses.append(avg_test_loss)

        print(f"Epoch {epoch+1}/{num_epochs} | Train Loss: {avg_train_loss:.4f} | Test Loss: {avg_test_loss:.4f}")

    # Save the model
    torch.save(model.state_dict(), save_path)
    print(f"Model saved to {save_path}")

    # 🔹 Plot Training vs. Testing Loss Curve
    plt.figure(figsize=(8,6))
    plt.plot(range(1, num_epochs+1), train_losses, label="Training Loss", marker='o')
    plt.plot(range(1, num_epochs+1), test_losses, label="Testing Loss", marker='s')
    plt.xlabel("Epochs")
    plt.ylabel("Loss")
    plt.title("Training vs. Testing Loss")
    plt.legend()
    plt.grid(True)
    plt.show()

# 🔹 Run Training & Evaluation
num_epochs = 10
save_path = "vqa_LSTM.pth"
train_and_evaluate(model, train_loader, test_loader, criterion, optimizer, num_epochs, device, save_path)


In [None]:

import torch
from torch.nn.functional import softmax
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import numpy as np

def evaluate_model(model, dataloader, device, threshold=0.8):
    model.eval()
    all_results = []
    all_preds, all_labels, all_wups = [], [], []

    with torch.no_grad():
        for batch in dataloader:  #  Fix: Only one variable unpacking
            if len(batch) == 3:  # If dataset returns 3 elements
                images, questions, answers = batch
            elif len(batch) == 4:  # If dataset returns 4 elements
                images, questions, attention_masks, answers = batch
            else:
                raise ValueError(f"Unexpected batch size: {len(batch)} elements")

            #  Move tensors to GPU/CPU
            images, questions, answers = images.to(device), questions.to(device), answers.to(device)

            # 🔹 Get model outputs & probabilities
            outputs = model(images, questions)
            probabilities = softmax(outputs, dim=1).cpu().numpy()
            preds = np.argmax(probabilities, axis=1)

            labels = answers.cpu().numpy()

            # 🔹 Compute WUPS for each prediction
            batch_wups = [wup_similarity(str(p), str(l), threshold) for p, l in zip(preds, labels)]
            all_wups.extend(batch_wups)

            # 🔹 Store results per image
            for i in range(len(images)):
                result = {
                    "image_index": len(all_results),
                    "true_label": labels[i],
                    "predicted_label": preds[i],
                    "class_probabilities": probabilities[i].tolist(),
                    "wups_score": batch_wups[i]  #  WUPS added
                }
                all_results.append(result)
                all_preds.append(preds[i])
                all_labels.append(labels[i])

    #  Compute Metrics
    accuracy = accuracy_score(all_labels, all_preds)
    precision = precision_score(all_labels, all_preds, average="weighted", zero_division=0)
    recall = recall_score(all_labels, all_preds, average="weighted", zero_division=0)
    f1 = f1_score(all_labels, all_preds, average="weighted", zero_division=0)
    wups_score = np.mean(all_wups)

    #  Print Metrics
    print(f"Accuracy: {accuracy:.4f}")
    print(f"Precision: {precision:.4f}")
    print(f"Recall: {recall:.4f}")
    print(f"F1 Score: {f1:.4f}")
    print(f"WUPS Score (Threshold {threshold}): {wups_score:.4f}")

    return {
        "accuracy": accuracy,
        "precision": precision,
        "recall": recall,
        "f1": f1,
        "wups": wups_score,
        "results_per_image": all_results,
    }

# 🔹 Run Evaluation
metrics = evaluate_model(model, test_loader, device)


In [None]:
import pandas as pd

df_results = pd.DataFrame(metrics["results_per_image"])
print(df_results.to_string(index=False))