# Imports

In [1]:
import copy
import csv
import math
import matplotlib.pyplot as plt
import numpy as np
import os
import pandas as pd
import re
import sys
import torch
import torch.nn as nn
import torch.nn.functional as F
import json

from collections import Counter
from os import listdir
from os.path import isfile, join
from PIL import Image
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms
from torchvision.models import resnet152

# Images

In [2]:
class Dataset(Dataset):
    def __init__(self, img_dir, transform=None):
        super(Dataset, self).__init__()
        self.img_dir = img_dir
        self.transform = transform
        self.image_names = [image for image in listdir(self.img_dir) if isfile(join(self.img_dir, image))]

    def __len__(self):
        return len(self.image_names)
    
    def __getitem__(self, idx):
        img_path = self.image_names[idx]
        img = Image.open(join(self.img_dir, img_path)).convert('RGB')
        if self.transform is not None:
            img = self.transform(img)
        return (img_path, img)


class ResNet(torch.nn.Module):
    def __init__(self):
        super(ResNet, self).__init__()
        self.model = resnet152()
        self.model.avgpool = torch.nn.AdaptiveAvgPool2d(output_size=1)

    def forward(self, x):
        return self.model(x)

In [3]:
src_dir_train = '/content/drive/MyDrive/VisualQuestionAnswering/scene_img_abstract_v002_train2015'
dst_dir_train = '/content/drive/MyDrive/VisualQuestionAnswering/scene_img_feat_abstract_v002_train2015'

resize_dim = 448
batch_size = 6
num_workers = 2

transform = transforms.Compose([transforms.Resize((resize_dim, resize_dim)), transforms.ToTensor()])

dataset_train = Dataset(src_dir_train, transform=transform)

loader_train = DataLoader(dataset_train, batch_size=batch_size, num_workers=num_workers, shuffle=False)

if not os.path.exists(dst_dir_train):
    os.makedirs(dst_dir_train)
  
model = ResNet()

for i, (img_paths, images) in enumerate(loader_train):
    output = model(images)
    if i == 0:
        print(output.shape)

    for j in range(len(img_paths)):
        feat_name = img_paths[j].replace('.png', '.npy')
        feat_name = join(dst_dir_train, feat_name)
        print(feat_name)
        np.save(feat_name, output[j].data.numpy())

In [4]:
src_dir_val = '/content/drive/MyDrive/VisualQuestionAnswering/scene_img_abstract_v002_val2015'
dst_dir_val = '/content/drive/MyDrive/VisualQuestionAnswering/scene_img_feat_abstract_v002_val2015'

dataset_val = Dataset(src_dir_val, transform=transform)

loader_val = DataLoader(dataset_val, batch_size=batch_size, num_workers=num_workers, shuffle=False)

if not os.path.exists(dst_dir_val):
    os.makedirs(dst_dir_val)

for i, (img_paths, images) in enumerate(loader_val):
    output = model(images)
    if i == 0:
        print(output.shape)

    for j in range(len(img_paths)):
        feat_name = img_paths[j].replace('.png', '.npy')
        feat_name = join(dst_dir_val, feat_name)
        print(feat_name)
        np.save(feat_name, output[j].data.numpy())

# Questions and Annotations

In [5]:
# Interface for accessing the VQA dataset.

# This code is based on the code available at the following link: https://github.com/GT-Vision-Lab/VQA/blob/master/PythonHelperTools/vqaTools/vqa.py.


class VQA:

    def __init__(self, annotation_file=None, question_file=None):
        """ Constructor of VQA class for reading and visualizing questions and answers. """
        # load dataset
        self.dataset = {}
        self.questions = {}
        self.qa = {}
        self.qqa = {}
        self.imgToQA = {}
        if annotation_file is not None and question_file is not None:
            dataset = json.load(open(annotation_file, 'r'))
            questions = json.load(open(question_file, 'r'))
            self.dataset = dataset
            self.questions = questions
            self.createIndex()

    def createIndex(self):

        # create index
        imgToQA = {ann['image_id']: [] for ann in self.dataset['annotations']}
        qa = {ann['question_id']: [] for ann in self.dataset['annotations']}
        qqa = {ann['question_id']: [] for ann in self.dataset['annotations']}
        for ann in self.dataset['annotations']:
            imgToQA[ann['image_id']] += [ann]
            qa[ann['question_id']] = ann
        for ques in self.questions['questions']:
            qqa[ques['question_id']] = ques

        # create class members
        self.qa = qa
        self.qqa = qqa
        self.imgToQA = imgToQA

In [6]:
manualMap = {'none': '0', 'zero': '0', 'one': '1', 'two': '2', 'three': '3', 'four': '4', 'five': '5', 'six': '6', 'seven': '7', 'eight': '8', 'nine': '9', 'ten': '10'}

articles = ['a', 'an', 'the']


def processDigitArticle(inText):
	outText = []
	tempText = inText.lower().split()  # Making all characters lowercase.
	for word in tempText:
		word = manualMap.setdefault(word, word)  # Converting number words to digits.
		# The setdefault() method returns the value of the item with the specified key. If the key does not exist, insert the key, with the specified value.
		if word not in articles:
			outText.append(word)  # Removing articles (a, an, the).
		else:
			pass
	outText = ' '.join(outText)
	# The join() method takes all items in an iterable and joins them into one string. ' ' is the separator.
	return outText


punct = [';', "/", '[', ']', '"', '{', '}', '(', ')', '=', '+', '\\', '_', '-', '>', '<', '@', '`', ',', '?', '!']  


def processPunctuation(inText):
	outText = inText
	for p in punct:  # Replacing all punctuation with a space character.
		if (p + ' ' in inText or ' ' + p in inText):  
			outText = outText.replace(p, '')
		else:
			outText = outText.replace(p, ' ')
	return outText

In [7]:
# This code is inspired by the code available at the following link: https://github.com/Cyanogenoid/pytorch-vqa/blob/master/data.py.


class VqaTrainDataset(Dataset):

    """ Load the VQA dataset using the VQA class. """

    def __init__(self, question_json_file_path, annotation_json_file_path, image_filename_pattern, img_features_dir, vocab_json_filename):
          
        """
        Args:
            question_json_file_path (string): Path to the json file containing the questions
            annotation_json_file_path (string): Path to the json file containing the annotations
            image_filename_pattern (string): Pattern used by the filenames of the images in this dataset (eg "abstract_v002_train2015_{}.png")
            img_features_dir (string): Path to the directory with image features
            vocab_json_filename (string): Path to the vocabulary.
        """

        vqa_db = VQA(annotation_file=annotation_json_file_path, question_file=question_json_file_path)

        self.max_words_in_ques = -1  
        self.dataset = []

        ques_list = []
        ans_list = []

        for q_id, annotation in vqa_db.qa.items():
            entry = {}
            question = vqa_db.qqa[q_id]['question']
            question = processPunctuation(question)
            question = processDigitArticle(question)
            words = question.split(' ')
            if len(words) > self.max_words_in_ques:
                self.max_words_in_ques = len(words)
            ques_list += words
            entry['ques'] = words
            answer_objs = annotation['answers']

            possible_answers = [a['answer'] for a in answer_objs]

            entry['possible_answers'] = []
            for answer in possible_answers:
                mod_ans = processPunctuation(answer)
                mod_ans = processDigitArticle(mod_ans)
                ans_list.append(mod_ans)
                entry['possible_answers'].append(mod_ans)
      
            img_full_idx = "%012d" % annotation['image_id']  # '00000000000image_id'
            img_name = image_filename_pattern.replace('{}', img_full_idx)
            img_feature_loc = os.path.join(img_features_dir, img_name.replace('.png', '.npy'))
            entry['img_feat_loc'] = img_feature_loc

            self.dataset.append(entry)

        q_vocab = self.build_vocab(ques_list)
        a_vocab = self.build_vocab(ans_list)
        vocab = {'q': q_vocab, 'a': a_vocab}

        f = open(vocab_path, "w")
        json.dump(vocab, f)
        f.close()
        
        self.q_vocab = vocab['q']
        self.q_vocab_size = len(self.q_vocab.keys())
            
        self.a_vocab = vocab['a']
        self.a_vocab_size = len(self.a_vocab.keys())

    def build_vocab(self, data):
        counter = Counter(data)
        words = counter.keys()
        tokens = sorted(words, key=lambda x: (counter[x], x), reverse=True)  # reverse=True: sorts in a descending order.
        vocab = {t: i for i, t in enumerate(tokens)}
        return vocab

    def _get_q_encoding(self, questions):
        vec = torch.zeros(self.q_vocab_size)
        for question in questions:
            if question in self.q_vocab:
                vec[self.q_vocab[question]] += 1
        return vec, len(questions)

    def _get_a_encoding(self, answers):
        vec = torch.zeros(self.a_vocab_size)
        for answer in answers:
            if answer in self.a_vocab:
                vec[self.a_vocab[answer]] += 1
        return vec

    def __getitem__(self, idx):
        entry = self.dataset[idx]

        image_encoding = np.load(entry['img_feat_loc'])

        ques = entry['ques']
        ques_encoding, ques_len = self._get_q_encoding(ques)

        possible_answers = entry['possible_answers']
        ans_encoding = self._get_a_encoding(possible_answers)

        return {'image_enc': image_encoding, 'ques_enc': ques_encoding, 'ques_len': ques_len, 'ans_enc': ans_encoding}

    def __len__(self):
        return len(self.dataset)

In [8]:
# This code is inspired by the code available at the following link: https://github.com/Cyanogenoid/pytorch-vqa/blob/master/data.py.


class VqaValDataset(Dataset):

    """
    Load the VQA dataset using the VQA class.
    """

    def __init__(self, question_json_file_path, annotation_json_file_path, image_filename_pattern, img_features_dir, vocab_json_filename):
                 
        """
        Args:
            question_json_file_path (string): Path to the json file containing the questions
            annotation_json_file_path (string): Path to the json file containing the annotations
            image_filename_pattern (string): Pattern used by the filenames of the images in this dataset (eg "abstract_v002_val2015_{}.png")
            img_features_dir (string): Path to the directory with image features
            vocab_json_filename (string): Path to the vocabulary.
        """

        vqa_db = VQA(annotation_file=annotation_json_file_path, question_file=question_json_file_path)

        self.max_words_in_ques = -1
            
        self.dataset = []

        for q_id, annotation in vqa_db.qa.items():
            entry = {}
            question = vqa_db.qqa[q_id]['question']
            question = processPunctuation(question)
            question = processDigitArticle(question)
            words = question.split(' ')
            if len(words) > self.max_words_in_ques:
                self.max_words_in_ques = len(words)
            entry['ques'] = words
            answer_objs = annotation['answers']

            possible_answers = [a['answer'] for a in answer_objs]

            entry['possible_answers'] = []
            for answer in possible_answers:
                mod_ans = processPunctuation(answer)
                mod_ans = processDigitArticle(mod_ans)
                entry['possible_answers'].append(mod_ans)
                
            img_full_idx = "%012d" % annotation['image_id']  # '00000000000image_id'
            img_name = image_filename_pattern.replace('{}', img_full_idx)
            img_feature_loc = os.path.join(img_features_dir, img_name.replace('.png', '.npy'))
            entry['img_feat_loc'] = img_feature_loc
            
            self.dataset.append(entry)

        vocab = json.load(open(vocab_json_filename, "r"))

        self.q_vocab = vocab['q']
        self.q_vocab_size = len(self.q_vocab.keys())
            
        self.a_vocab = vocab['a']
        self.a_vocab_size = len(self.a_vocab.keys())

    def _get_q_encoding(self, questions):
        vec = torch.zeros(self.q_vocab_size)
        for question in questions:
            if question in self.q_vocab:
                vec[self.q_vocab[question]] += 1
        return vec, len(questions)

    def _get_a_encoding(self, answers):
        vec = torch.zeros(self.a_vocab_size)
        for answer in answers:
            if answer in self.a_vocab:
                vec[self.a_vocab[answer]] += 1
        return vec

    def __getitem__(self, idx):
        entry = self.dataset[idx]

        image_encoding = np.load(entry['img_feat_loc'])

        ques = entry['ques']
        ques_encoding, ques_len = self._get_q_encoding(ques)

        possible_answers = entry['possible_answers']
        ans_encoding = self._get_a_encoding(possible_answers)

        return {'image_enc': image_encoding, 'ques_enc': ques_encoding, 'ques_len': ques_len, 'ans_enc': ans_encoding}

    def __len__(self):
        return len(self.dataset)

# Models

## SimpleBaselineNet

In [9]:
class SimpleBaselineNet(nn.Module):

    """ Predicts an answer to a question about an image using the Simple Baseline for Visual Question Answering (Zhou et al, 2017) paper. """
    
    def __init__(self, img_feat_size, q_vocab_size, a_vocab_size):
        super().__init__()

        self.img_feat_size = img_feat_size
        self.q_vocab_size = q_vocab_size
        self.a_vocab_size = a_vocab_size
        self.q_embedding_size = 1024

        self.linear_layer = nn.Linear(self.q_vocab_size, self.q_embedding_size, bias=False)  # Applies a linear transformation to the incoming data: y = xA^T + b.
        self.classifier = nn.Linear(self.img_feat_size + self.q_embedding_size, self.a_vocab_size, bias=False)

    def forward(self, image_encoding, question_encoding):

        image_encoding = image_encoding.view(image_encoding.shape[0], -1)  # image_encoding.size(): torch.Size([24, 1000]).

        question_embedding = self.linear_layer(question_encoding)

        x = torch.cat((image_encoding, question_embedding), dim=-1)  # Concatenates the given sequence of tensors (image_encoding, question_embedding) in the given dimension (dim=-1).

        out = self.classifier(x)

        return out

## My Net

In [10]:
class QuestionProcessor(nn.Module):
    def __init__(self, vocab_size, embedding_size):
        super(QuestionProcessor, self).__init__()

        self.word_embedding = 300

        self.embedding = nn.Embedding(vocab_size, self.word_embedding, padding_idx=0)
        self.lstm = nn.LSTM(input_size=self.word_embedding, hidden_size=embedding_size, num_layers=2)   

    def forward(self, encoding, length):      
        embedding = self.embedding(encoding.long())  # long() converts a torch.FloatTensor to a torch.LongTensor.
                
        embedding = nn.utils.rnn.pack_padded_sequence(embedding, length, batch_first=True, enforce_sorted=False)  # nn.utils.rnn.pack_padded_sequence() packs a Tensor containing padded sequences of variable length.
        # enforce_sorted=False: the input will get sorted unconditionally.

        _, out = self.lstm(embedding)
        return out[0][0]  # return the final hidden state of LSTM.

class MyNet(nn.Module):
    
    def __init__(self, img_feat_size, q_vocab_size, a_vocab_size):
        super().__init__()

        self.img_feat_size = img_feat_size
        self.q_vocab_size = q_vocab_size
        self.a_vocab_size = a_vocab_size
        self.q_embedding_size = 1024

        self.question_processor = QuestionProcessor(self.q_vocab_size, self.q_embedding_size)

        self.linear_1 = nn.Linear(self.img_feat_size + self.q_embedding_size, self.img_feat_size, bias=False)
        self.linear_2 = nn.Linear(self.img_feat_size, self.a_vocab_size, bias=False)

        self.activation = nn.ReLU()
        
    def forward(self, image_encoding, question_encoding, question_length):
      
        image_encoding = image_encoding.view(image_encoding.shape[0], -1)  # image_encoding.size(): torch.Size([_, 1000]).
        question_embedding = self.question_processor(question_encoding, question_length)  # question_embedding.size(): torch.Size([_, 1024])

        x = torch.cat((image_encoding, question_embedding), dim=-1)  # Concatenates the given sequence of tensors (image_encoding, question_embedding) in the given dimension (dim=-1).

        out = self.linear_2(self.activation(self.linear_1(x)))
        return out

# Train and Validate 

In [15]:
train_question_path = '/content/drive/MyDrive/VisualQuestionAnswering/OpenEnded_abstract_train2015_questions.json'
train_annotation_path = '/content/drive/MyDrive/VisualQuestionAnswering/abstract_train2015_annotations.json'
train_img_feat_path = '/content/drive/MyDrive/VisualQuestionAnswering/scene_img_feat_abstract_v002_train2015'

val_question_path = '/content/drive/MyDrive/VisualQuestionAnswering/OpenEnded_abstract_val2015_questions.json'
val_annotation_path = '/content/drive/MyDrive/VisualQuestionAnswering/abstract_val2015_annotations.json'
val_img_feat_path = '/content/drive/MyDrive/VisualQuestionAnswering/scene_img_feat_abstract_v002_val2015'

vocab_path = '/content/drive/MyDrive/VisualQuestionAnswering/vocab.json'

# results = '/content/drive/MyDrive/VisualQuestionAnswering/SimpleBaselineNet.txt'
results = '/content/drive/MyDrive/VisualQuestionAnswering/MyNet.txt'

num_epochs = 15

batch_size = 6
num_data_loader_workers = 2

img_feat_size = 1000

In [12]:
def optimize(criterion, predicted_answer, optimizer, ground_truth_answer):

    majority_ans = torch.argmax(ground_truth_answer, dim=-1)
    loss = criterion(predicted_answer, majority_ans)  # loss = criterion(output_model, target)
    optimizer.zero_grad()  # zero_grad(): zeroes the grad attribute of all the parameters passed to the optimizer.
    loss.backward()  # backward(): performs the gradient of all the parameters for which require_grad = True and stores the gradient in parameter.grad attribute for every parameter.
    optimizer.step()  # step(): updates the value of all the parameters passed to the optimizer (based on parameter.grad).
    return loss


def validate(model, val_dataset_loader):

    correct_answers = 0
    total_answers = 0

    for batch_id, batch_data in enumerate(val_dataset_loader):
        image_encoding = batch_data['image_enc']
        question_encoding = batch_data['ques_enc']
        question_length = batch_data['ques_len']
        ground_truth_answer = batch_data['ans_enc']

        batch_size = ground_truth_answer.shape[0]

        logits = model(image_encoding, question_encoding, question_length)

        probs = F.softmax(logits, dim=-1)
        predicted_answer = torch.argmax(probs, dim=-1)

        counts = ground_truth_answer[torch.arange(batch_size), predicted_answer]  # torch.arange(): returns a 1-D tensor with values in the range [start, end) with start = 0 and end = batch_size.
        correct_answers = correct_answers + float(torch.sum(torch.min(counts/3, torch.ones(1))))
            
        total_answers = total_answers + batch_size

    return (correct_answers / total_answers) * 100

In [None]:
train_dataset = VqaTrainDataset(question_json_file_path=train_question_path,
                                annotation_json_file_path=train_annotation_path,
                                image_filename_pattern="abstract_v002_train2015_{}.png",
                                img_features_dir=train_img_feat_path,
                                vocab_json_filename=vocab_path)

val_dataset = VqaValDataset(question_json_file_path=val_question_path,
                            annotation_json_file_path=val_annotation_path,
                            image_filename_pattern="abstract_v002_val2015_{}.png",
                            img_features_dir=val_img_feat_path,
                            vocab_json_filename=vocab_path)


train_dataset_loader = DataLoader(train_dataset,
                                  batch_size=batch_size,  # batch_size: represents how many samples per batch to load.
                                  shuffle=True,
                                  num_workers=num_data_loader_workers)  # num_workers: represents how many subprocesses to use for loading data.

val_dataset_loader = DataLoader(val_dataset,
                                batch_size=batch_size,
                                shuffle=False,
                                num_workers=num_data_loader_workers)


q_vocab_size = train_dataset.q_vocab_size
a_vocab_size = train_dataset.a_vocab_size

# model = SimpleBaselineNet(img_feat_size, q_vocab_size, a_vocab_size)
model = MyNet(img_feat_size, q_vocab_size, a_vocab_size)

out_filename = open(results, "w")

for epoch in range(num_epochs):
    num_batches = len(train_dataset_loader)

    for batch_id, batch_data in enumerate(train_dataset_loader):
        model.train()  # Set the model to train mode

        image_encoding = batch_data['image_enc']
        question_encoding = batch_data['ques_enc']
        question_length = batch_data['ques_len']
        ground_truth_answer = batch_data['ans_enc']

        predicted_answer = model(image_encoding, question_encoding, question_length)
        
        criterion = torch.nn.CrossEntropyLoss()

        # optimizer = torch.optim.SGD([{'params': model.fc_ques.parameters(), 'lr': 0.8}, {'params': model.classifier.parameters(), 'lr': 0.01}])  # for SimpleBaselineNet 
        optimizer = torch.optim.SGD([{'params': model.linear_1.parameters(), 'lr': 0.01}, {'params': model.linear_2.parameters(), 'lr': 0.01}])

        loss = optimize(criterion, predicted_answer, optimizer, ground_truth_answer)

        if batch_id == (num_batches - 1):
            model.eval()  # Set the model to eval mode
            val_accuracy = validate(model, val_dataset_loader)
            print("Epoch {} has val accuracy: {}".format(epoch, val_accuracy))
            out_filename.write("Epoch {} has val accuracy: {}\n".format(epoch, val_accuracy))


out_filename.close() 

In [None]:
figure = plt.figure(figsize=(20, 15))

# Simple Baseline Net
plt.subplot(2, 2, 1)
epoch_list_SimpleBaselineNet = []
accuracy_list_SimpleBaselineNet = []

with open('/content/drive/MyDrive/VisualQuestionAnswering/SimpleBaselineNet.txt', 'r') as f:
    data = f.read().split("\n")
for i in range(len(data)-1):
    epoch_list_SimpleBaselineNet.append(int(data[i].split(" ")[1]))
    accuracy_list_SimpleBaselineNet.append(float(data[i].split(" ")[5]))

plt.plot(epoch_list_SimpleBaselineNet, accuracy_list_SimpleBaselineNet, label="Simple Baseline Net", color='blue', linewidth=3.0)
plt.xlabel('Epochs', fontsize=15)
plt.ylabel('Accuracy', fontsize=15)
plt.legend(loc='lower right', prop={'size': 15})
plt.savefig('/content/drive/MyDrive/VisualQuestionAnswering/SimpleBaselineNet.png')


# My Net
plt.subplot(2, 2, 2)
epoch_list_MyNet = []
accuracy_list_MyNet = []

with open('/content/drive/MyDrive/VisualQuestionAnswering/MyNet.txt', 'r') as f:
    data = f.read().split("\n")
for i in range(len(data)-1):
    epoch_list_MyNet.append(int(data[i].split(" ")[1]))
    accuracy_list_MyNet.append(float(data[i].split(" ")[5]))

plt.plot(epoch_list_MyNet, accuracy_list_MyNet, label="My Net", color='red', linewidth=3.0)
plt.xlabel('Epochs', fontsize=15)
plt.ylabel('Accuracy', fontsize=15)
plt.legend(loc='lower right', prop={'size': 15})
plt.savefig('/content/drive/MyDrive/VisualQuestionAnswering/MyNet.png')

In [None]:
df = pd.DataFrame({"Net": ["Simple Baseline Net", "My Net"],
                   "Epochs": [epoch_list_SimpleBaselineNet[-1]+1, epoch_list_MyNet[-1]+1],
                   "Accuracy": [accuracy_list_SimpleBaselineNet[-1], accuracy_list_MyNet[-1]]})

df.to_csv('/content/drive/MyDrive/EasyVisualQuestionAnswering/results.csv', index=False, encoding='utf-8')

print(df.to_string(index=False))