In [None]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [None]:
!touch '/content/drive/MyDrive/Deep_learning/epoch_statistics_baseline.csv'

In [None]:
!mkdir '/content/drive/MyDrive/Deep_learning/model_states'
!mkdir '/content/drive/MyDrive/Deep_learning/data'

mkdir: cannot create directory ‘/content/drive/MyDrive/Deep_learning/model_states’: File exists


In [None]:
!unzip '/content/drive/MyDrive/Deep_learning/data.zip'

## Imports

In [None]:
import torch
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
import torchvision.transforms as transforms
import numpy as np
import matplotlib.pyplot as plt
import json
import random
import os
from PIL import Image
from sklearn.preprocessing import LabelEncoder
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)

cuda


# Prepare Dataset and Dataloaders

## Load data

In [None]:
# Load training questions and annotations
with open("data/v2_OpenEnded_mscoco_train2014_questions.json", "r") as file:
    train_questions_json = json.load(file)
    train_questions = [item["question"] for item in train_questions_json["questions"]]

with open("data/v2_mscoco_train2014_annotations.json", "r") as file:
    train_annotations_json = json.load(file)
    train_answers = [
        item["multiple_choice_answer"] for item in train_annotations_json["annotations"]
    ]

# Image paths for training
train_image_dir = "data/train2014/"
train_images = [
    os.path.join(train_image_dir, "COCO_train2014_{:012d}.jpg".format(item["image_id"]))
    for item in train_annotations_json["annotations"]
]

# Load validation questions and annotations
with open("data/v2_OpenEnded_mscoco_val2014_questions.json", "r") as file:
    val_questions_json = json.load(file)
    val_questions = [item["question"] for item in val_questions_json["questions"]]

with open("data/v2_mscoco_val2014_annotations.json", "r") as file:
    val_annotations_json = json.load(file)
    val_answers = [
        item["multiple_choice_answer"] for item in val_annotations_json["annotations"]
    ]

# Image paths for validation
val_image_dir = "data/val2014/"
val_images = [
    os.path.join(val_image_dir, "COCO_val2014_{:012d}.jpg".format(item["image_id"]))
    for item in val_annotations_json["annotations"]
]

## Prepare images, questions, answers, and tokenizer

In [None]:
SAMPLE_SIZE_TRAIN = 15000
train_indices = random.sample(range(len(train_images)), SAMPLE_SIZE_TRAIN)
#train_indices = list(range(0, SAMPLE_SIZE_TRAIN-1))
train_selected_images = [train_images[i] for i in train_indices]
train_selected_questions = [train_questions[i] for i in train_indices]
train_selected_answers = [train_answers[i] for i in train_indices]

# Select a subset for validation
SAMPLE_SIZE_VAL = 3000
eval_indices = random.sample(range(len(val_images)), SAMPLE_SIZE_VAL*2)
val_indices = eval_indices[:len(eval_indices)//2]
test_indices = eval_indices[len(eval_indices)//2:]

val_selected_images = [val_images[i] for i in val_indices]
val_selected_questions = [val_questions[i] for i in val_indices]
val_selected_answers = [val_answers[i] for i in val_indices]

test_selected_images = [val_images[i] for i in test_indices]
test_selected_questions = [val_questions[i] for i in test_indices]
test_selected_answers = [val_answers[i] for i in test_indices]

# Find training classes
train_classes = set(train_selected_answers)
print(f"Training classes: {train_classes}")
print(f"Number of training classes: {len(train_classes)}")

# Filter validation answers to include only those present in the training set
val_filtered_indices = [i for i, answer in enumerate(val_selected_answers) if answer in train_classes]
test_filtered_indices = [i for i, answer in enumerate(test_selected_answers) if answer in train_classes]

print(f'Number of validation samples: {len(val_filtered_indices)}')
print(f'Number of test samples: {len(test_filtered_indices)}')

val_filtered_images = [val_selected_images[i] for i in val_filtered_indices]
val_filtered_questions = [val_selected_questions[i] for i in val_filtered_indices]
val_filtered_answers = [val_selected_answers[i] for i in val_filtered_indices]

test_filtered_images = [test_selected_images[i] for i in test_filtered_indices]
test_filtered_questions = [test_selected_questions[i] for i in test_filtered_indices]
test_filtered_answers = [test_selected_answers[i] for i in test_filtered_indices]

# Merge training and validation questions
combined_questions = train_selected_questions + val_filtered_questions + test_filtered_questions

# Fit tokenizer on the combined set of questions
tokenizer = Tokenizer()
tokenizer.fit_on_texts(combined_questions)
tokenized_combined_questions = tokenizer.texts_to_sequences(combined_questions)
max_question_length = max(len(seq) for seq in tokenized_combined_questions)

# Tokenize and pad training questions
train_tokenized_questions = tokenizer.texts_to_sequences(train_selected_questions)
train_padded_questions = pad_sequences(train_tokenized_questions, maxlen=max_question_length)

# Tokenize and pad validation questions
val_tokenized_questions = tokenizer.texts_to_sequences(val_filtered_questions)
val_padded_questions = pad_sequences(val_tokenized_questions, maxlen=max_question_length)

# Tokenize and pad validation questions
test_tokenized_questions = tokenizer.texts_to_sequences(test_filtered_questions)
test_padded_questions = pad_sequences(test_tokenized_questions, maxlen=max_question_length)

# Convert answers to classes
label_encoder = LabelEncoder()
train_answer_classes = label_encoder.fit_transform(train_selected_answers)

# Convert filtered eval answers to classes using label encoder
val_answer_classes = label_encoder.transform(val_filtered_answers)
test_answer_classes = label_encoder.transform(test_filtered_answers)

Number of training classes: 2522
Number of validation samples: 2663
Number of test samples: 2660


## Augmentation

In [None]:
# Image transformations
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]

normalize = transforms.Normalize(mean=mean, std=std)
#image_transforms = transforms.Compose(
#    [transforms.Resize((224, 224)), transforms.ToTensor(), normalize]
#)
eval_transforms = transforms.Compose(
    [
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        normalize,
    ]
)

image_transforms = transforms.Compose(
    [
        transforms.RandomHorizontalFlip(p=0.5),
        transforms.RandomRotation(degrees=15),
        transforms.ColorJitter(brightness=0.2, contrast=0.2, saturation=0.2),
        transforms.Resize((224, 224)),
        transforms.ToTensor(),
        normalize,
    ]
)

## Prepare dataset and dataloader

In [None]:
class VQADataset(Dataset):
    def __init__(self, images, questions, answers, transform=None):
        self.images = images
        self.questions = questions
        self.answers = answers
        self.transform = transform

    def __len__(self):
        return len(self.images)

    def __getitem__(self, idx):
        image = Image.open(self.images[idx]).convert("RGB")
        if self.transform:
            image = self.transform(image)
        question = torch.tensor(self.questions[idx], dtype=torch.long)
        answer = torch.tensor(self.answers[idx], dtype=torch.long)
        return image, question, answer

train_dataset = VQADataset(
    train_selected_images,
    train_padded_questions,
    train_answer_classes,
    transform=image_transforms,
)

val_dataset = VQADataset(
    val_filtered_images,
    val_padded_questions,
    val_answer_classes,
    transform=eval_transforms,
)

test_dataset = VQADataset(
    test_filtered_images,
    test_padded_questions,
    test_answer_classes,
    transform=eval_transforms,
)

train_data_loader = DataLoader(train_dataset, batch_size=64, shuffle=True)
val_data_loader = DataLoader(val_dataset, batch_size=64, shuffle=True)
test_data_loader = DataLoader(val_dataset, batch_size=64, shuffle=True)

# Model architecture

In [None]:
import torchvision.models as models
import torch.nn as nn
import torch.nn.functional as F
from torchvision.models.detection import fasterrcnn_resnet50_fpn

class Attention(nn.Module):
    def __init__(self, image_dim, text_dim, hidden_dim):
        super(Attention, self).__init__()
        self.image_att = nn.Linear(image_dim, hidden_dim)
        self.text_att = nn.Linear(text_dim, hidden_dim)
        self.final_att = nn.Linear(hidden_dim, 1)
        self.dropout = nn.Dropout(p=0.1)  # Dropout after attention layer

    def forward(self, image_feats, text_feats):
        img_att = self.image_att(image_feats)  # [batch_size, num_pixels, hidden_dim]
        txt_att = self.text_att(text_feats)    # [batch_size, hidden_dim]
        combined_att = F.relu(img_att + txt_att.unsqueeze(1))  # Add text feats to each image feat
        #combined_att = self.dropout(combined_att)  # Dropout
        att_scores = F.softmax(self.final_att(combined_att), dim=1)  # [batch_size, num_pixels, 1]
        weighted_feats = (image_feats * att_scores).sum(dim=1)  # [batch_size, image_dim]
        return weighted_feats

class VQAModel(nn.Module):
    def __init__(self, vocab_size, embed_size, hidden_size, num_answers, image_feature_dim):
        super(VQAModel, self).__init__()

        # Initialize the ResNet model
        self.resnet = models.resnet50(pretrained=True)
        self.resnet = nn.Sequential(*list(self.resnet.children())[:-2])  # Use layers up to the last convolutional layer

        # Additional layer to pool image features
        self.avgpool = nn.AdaptiveAvgPool2d((1, 1))  # Pool to 1x1 feature per channel

        # Attention Layer
        self.attention = Attention(image_dim=image_feature_dim, text_dim=hidden_size, hidden_dim=512)

        # Text processing layers
        self.embedding = nn.Embedding(vocab_size, embed_size)
        self.lstm = nn.LSTM(embed_size, hidden_size, batch_first=True)
        self.dropout_lstm = nn.Dropout(p=0.5)  # Dropout after LSTM

        # Classifier
        self.fc1 = nn.Linear(hidden_size + image_feature_dim, 512)
        self.dropout = nn.Dropout(p=0.5)
        self.fc2 = nn.Linear(512, num_answers)

    def forward(self, images, questions):
        # Extract image features
        img_features = self.resnet(images)  # [batch_size, channels, height, width]
        img_features = self.avgpool(img_features)  # Pool features
        img_features = img_features.view(img_features.size(0), -1)  # Flatten to [batch_size, channels]

        # Process the text
        embedded = self.embedding(questions)
        lstm_out, _ = self.lstm(embedded)
        lstm_out = self.dropout_lstm(lstm_out)
        question_repr = lstm_out[:, -1]

        # Apply attention
        attended_img_feats = self.attention(img_features, question_repr)

        # Classifier
        combined = torch.cat([attended_img_feats, question_repr], dim=1)
        x = self.fc1(combined)
        x = F.relu(x)
        x = self.dropout(x)
        x = self.fc2(x)
        return x

# Model initialization
VQA_NET = VQAModel(
    vocab_size=len(tokenizer.word_index) + 1,
    embed_size=256,
    hidden_size=512,
    num_answers=len(train_classes),
    image_feature_dim=2048
)



In [None]:
print(VQA_NET)

VQAModel(
  (resnet): Sequential(
    (0): Conv2d(3, 64, kernel_size=(7, 7), stride=(2, 2), padding=(3, 3), bias=False)
    (1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (2): ReLU(inplace=True)
    (3): MaxPool2d(kernel_size=3, stride=2, padding=1, dilation=1, ceil_mode=False)
    (4): Sequential(
      (0): Bottleneck(
        (conv1): Conv2d(64, 64, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn1): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1), bias=False)
        (bn2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (conv3): Conv2d(64, 256, kernel_size=(1, 1), stride=(1, 1), bias=False)
        (bn3): BatchNorm2d(256, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
        (relu): ReLU(inplace=True)
        (downsample): Sequential(
          (0): Conv2d(64, 

# Training and Validation

In [None]:
import matplotlib.pyplot as plt
import numpy as np
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score, confusion_matrix

question_types = ['what', 'where', 'how many', 'which', 'is there', 'does the', 'are there']

def identify_question_type(question):
    for q_type in question_types:
        if question.lower().startswith(q_type):
            return q_type
    return 'other'

# Function for Gradient Clipping
def clip_gradients(model, clip_value):
    parameters = [p for p in model.parameters() if p.grad is not None]
    for p in parameters:
        p.grad.data.clamp_(-clip_value, clip_value)

# Function for Learning Rate Decay
def adjust_learning_rate(optimizer, epoch, decay_rate, initial_lr):
    lr = initial_lr * (decay_rate ** epoch)
    for param_group in optimizer.param_groups:
        param_group['lr'] = lr

def calculate_metrics(true_labels, predicted_labels):
    accuracy = accuracy_score(true_labels, predicted_labels)
    precision = precision_score(true_labels, predicted_labels, average="weighted")
    recall = recall_score(true_labels, predicted_labels, average="weighted", labels=np.unique(predicted_labels))
    f1 = f1_score(true_labels, predicted_labels, average="weighted", labels=np.unique(predicted_labels))
    return accuracy, precision, recall, f1

# Function to convert tensor to image
def tensor_to_image(tensor):
    tensor = tensor.cpu().clone()
    tensor = tensor.squeeze(0)
    tensor = unnormalize(tensor)
    tensor = tensor.numpy().transpose(1, 2, 0)
    tensor = np.clip(tensor, 0, 1)
    return tensor

# Unnormalize function
def unnormalize(tensor):
    for t, m, s in zip(tensor, mean, std):
        t.mul_(s).add_(m)    # unnormalize
    return tensor

## Single split training/validation

In [None]:
import warnings
import sklearn.exceptions
from torch.optim.lr_scheduler import ReduceLROnPlateau
import csv

warnings.filterwarnings("ignore", category=sklearn.exceptions.UndefinedMetricWarning)

# Training Parameters
EPOCHS = 100
LEARNING_RATE = 0.001
DECAY_FACTOR = 0.999
WEIGHT_DECAY = 0.001
CRITERION = nn.CrossEntropyLoss()
OPTIMIZER = optim.Adam(VQA_NET.parameters(), lr=LEARNING_RATE, weight_decay=WEIGHT_DECAY)
SCHEDULER = ReduceLROnPlateau(OPTIMIZER, mode='min', factor=0.1, patience=3, verbose=True)
VQA_NET.to(device)

best_val_loss = float('inf')
patience = 5
wait = 0

# Initialize storage for predictions and labels by category
category_predictions = {q_type: [] for q_type in question_types}
category_true_labels = {q_type: [] for q_type in question_types}

for epoch in range(1,EPOCHS+1):

    # Training phase
    VQA_NET.train()
    adjust_learning_rate(OPTIMIZER, epoch, DECAY_FACTOR, LEARNING_RATE)
    total_loss = 0.0
    for i, (images, questions, labels) in enumerate(train_data_loader):
        images, questions, labels = images.to(device), questions.to(device), labels.to(device)
        OPTIMIZER.zero_grad()
        outputs = VQA_NET(images, questions)
        loss = CRITERION(outputs, labels)
        loss.backward()
        #clip_gradients(VQA_NET, 10)  # Gradient clipping
        OPTIMIZER.step()
        total_loss += loss.item()

    # Avg train loss
    avg_loss = total_loss / len(train_data_loader)

    # Validation phase
    VQA_NET.eval()

    all_pred_labels = []
    all_true_labels = []
    total_val_loss = 0.0
    correct_answers = 0
    total_answers = 0

    with torch.no_grad():
        for images, questions, labels in val_data_loader:
            images, questions, labels = images.to(device), questions.to(device), labels.to(device)
            outputs = VQA_NET(images, questions)
            loss = CRITERION(outputs, labels)
            total_val_loss += loss.item()

            _, predicted = torch.max(outputs, 1)
            correct_answers += (predicted == labels).sum().item()
            total_answers += labels.size(0)
            _, predicted_labels = torch.max(outputs, dim=1)
            all_pred_labels.extend(predicted_labels.cpu().numpy())
            all_true_labels.extend(labels.cpu().numpy())

    # Evaluate validation performance after each epoch
    current_val_loss = total_val_loss
    if current_val_loss < best_val_loss:
        best_val_loss = current_val_loss
        best_epoch = epoch
        wait = 0  # Reset wait time if there's an improvement
        # Save the model checkpoint if this is the best model so far
        torch.save(VQA_NET.state_dict(), f'/content/drive/MyDrive/Deep_learning/model_states/new_model_baseline{epoch}.pth')
    else:
        wait += 1  # Increment wait time if no improvement

    # Scheduler update
    SCHEDULER.step(current_val_loss)

    # Compute and display metrics
    accuracy, precision, recall, f1 = calculate_metrics(all_true_labels, all_pred_labels)
    print(f"Epoch [{epoch}/{EPOCHS}] | Training Loss: {avg_loss:.4f}, Validation Loss: {current_val_loss:.4f}")
    print(f"Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1 Score: {f1:.4f}")

    # Write results to file for analysis later
    with open('/content/drive/MyDrive/Deep_learning/epoch_statistics_baseline.csv', 'a', newline='') as file:
        writer = csv.writer(file)
        writer.writerow([epoch, avg_loss, current_val_loss, accuracy, precision, recall, f1])

    # Early stopping check
    if wait >= patience:
        print(f"Stopping early at epoch {epoch}")
        break

# Load in best model
if current_val_loss!=best_val_loss:
    print(f'Loading model from epoch: {best_epoch}.')
    VQA_NET.load_state_dict(torch.load(f'/content/drive/MyDrive/Deep_learning/model_states/new_model_baseline{best_epoch}.pth'))

Epoch [1/100] | Training Loss: 4.3391, Validation Loss: 136.0140
Accuracy: 0.2561, Precision: 0.1182, Recall: 0.5194, F1 Score: 0.3044
Epoch [2/100] | Training Loss: 3.8428, Validation Loss: 132.4094
Accuracy: 0.2730, Precision: 0.1151, Recall: 0.5215, F1 Score: 0.3034
Epoch [3/100] | Training Loss: 3.7455, Validation Loss: 128.7316
Accuracy: 0.2749, Precision: 0.2294, Recall: 0.4738, F1 Score: 0.3202
Epoch [4/100] | Training Loss: 3.6637, Validation Loss: 125.2263
Accuracy: 0.2918, Precision: 0.1246, Recall: 0.7961, F1 Score: 0.4669
Epoch [5/100] | Training Loss: 3.5741, Validation Loss: 123.1397
Accuracy: 0.2843, Precision: 0.1187, Recall: 0.5160, F1 Score: 0.2972
Epoch [6/100] | Training Loss: 3.5142, Validation Loss: 120.4520
Accuracy: 0.2903, Precision: 0.2313, Recall: 0.4763, F1 Score: 0.3891
Epoch [7/100] | Training Loss: 3.4602, Validation Loss: 119.9377
Accuracy: 0.2884, Precision: 0.1578, Recall: 0.4964, F1 Score: 0.2876
Epoch [8/100] | Training Loss: 3.4153, Validation Loss:

FileNotFoundError: ignored

In [None]:
VQA_NET.load_state_dict(torch.load(f'/content/drive/MyDrive/Deep_learning/model_states/new_model_rcnn{29}.pth'))

<All keys matched successfully>

# Evaluation

## Metrics for best model

In [None]:
# Calculate and print the metrics for the best model
accuracy, precision, recall, f1 = calculate_metrics(all_true_labels, all_pred_labels)
print(f"Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1 Score: {f1:.4f}")

Accuracy: 0.2985, Precision: 0.2510, Recall: 0.4499, F1 Score: 0.3507


## Evaluation analysis on test data

In [None]:
import matplotlib.pyplot as plt

def categorize_answer(answer):
    if answer.lower() in ["yes", "no"]:
        return "yes/no"
    if answer.isdigit():
        return "numbers"
    return "others"

# Initialize storage for predictions and labels by category
category_metrics = {
    "yes/no": {"true_labels": [], "predicted_labels": []},
    "numbers": {"true_labels": [], "predicted_labels": []},
    "others": {"true_labels": [], "predicted_labels": []},
    "overall": {"true_labels": [], "predicted_labels": []}
}

# Example usage
num_samples_to_display = 15
samples_displayed = 0

test_pred_labels = []
test_true_labels = []
total_test_loss = 0.0
correct_answers = 0
total_answers = 0

VQA_NET.eval()
with torch.no_grad():
    for images, questions, labels in test_data_loader:
        images, questions, labels = images.to(device), questions.to(device), labels.to(device)
        outputs = VQA_NET(images, questions)
        loss = CRITERION(outputs, labels)
        total_test_loss += loss.item()

        _, predicted = torch.max(outputs, 1)
        correct_answers += (predicted == labels).sum().item()
        total_answers += labels.size(0)
        _, predicted_labels = torch.max(outputs, dim=1)
        test_pred_labels.extend(predicted_labels.cpu().numpy())
        test_true_labels.extend(labels.cpu().numpy())

        # Categorize answers and store results
        for true_label, predicted_label in zip(labels.cpu().numpy(), predicted_labels):
            true_answer = label_encoder.inverse_transform([true_label])[0]
            predicted_answer = label_encoder.inverse_transform([predicted_label.cpu()])[0]
            category = categorize_answer(true_answer)
            category_metrics[category]["true_labels"].append(true_label)
            category_metrics[category]["predicted_labels"].append(predicted_label)
            category_metrics["overall"]["true_labels"].append(true_label)
            category_metrics["overall"]["predicted_labels"].append(predicted_label)

        """
        for i in range(images.size(0)):
            if samples_displayed >= num_samples_to_display:
                break

            image = tensor_to_image(images[i])
            question_text = ' '.join([tokenizer.index_word.get(idx, '?') for idx in questions[i].tolist() if idx != 0])
            true_answer_text = label_encoder.inverse_transform([labels[i].item()])[0]
            predicted_answer_text = label_encoder.inverse_transform([predicted[i].item()])[0]

            ""Display the image with the question and predicted answer.""
            plt.imshow(image)
            plt.axis('off')
            plt.title(f"Q: {question_text}\nTrue: {true_answer_text}, Predicted: {predicted_answer_text}")
            plt.show()

            samples_displayed += 1
"""

# Compute and display metrics for each category
for category, data in category_metrics.items():
    print(category)
    #print(data['true_labels'])
    # Convert each tensor in the list to a NumPy array
    predicted_labels_cpu = [label.cpu().numpy() for label in data['predicted_labels']]
    # Flatten the list of arrays and then apply inverse_transform
    #print(label_encoder.inverse_transform(predicted_labels_cpu))

    accuracy, precision, recall, f1 = calculate_metrics(data["true_labels"], predicted_labels_cpu)
    print(f"Category: {category} - Accuracy: {accuracy:.4f}, Precision: {precision:.4f}, Recall: {recall:.4f}, F1 Score: {f1:.4f}")



yes/no
Category: yes/no - Accuracy: 0.5071, Precision: 0.2603, Recall: 0.5071, F1 Score: 0.3440
numbers
Category: numbers - Accuracy: 0.2278, Precision: 0.2370, Recall: 0.3083, F1 Score: 0.2062
others
Category: others - Accuracy: 0.1300, Precision: 0.0803, Recall: 0.2779, F1 Score: 0.1754
overall
Category: overall - Accuracy: 0.3038, Precision: 0.1694, Recall: 0.4147, F1 Score: 0.2752
