In [None]:
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"
import os
import tensorflow as tf
from tensorflow import keras
import numpy as np
import json
import cv2
from datetime import datetime
import PIL
from math import ceil, floor
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.preprocessing.text import Tokenizer
import tqdm.notebook as tq
from gensim.parsing.preprocessing import remove_stopwords
from nltk.stem import WordNetLemmatizer
import nltk
nltk.download()
#download wordnet package

NLTK Downloader
---------------------------------------------------------------------------
    d) Download   l) List    u) Update   c) Config   h) Help   q) Quit
---------------------------------------------------------------------------
Downloader> d

Download which package (l=list; x=cancel)?
  Identifier> wordnet
    Downloading package wordnet to /root/nltk_data...
      Unzipping corpora/wordnet.zip.

---------------------------------------------------------------------------
    d) Download   l) List    u) Update   c) Config   h) Help   q) Quit
---------------------------------------------------------------------------
Downloader> q


True

In [None]:
#### Constants ####
TRAIN = True
SEED = 1234
img_h = 200
img_w = 350
n_channels = 3
bs = 256;
K = 8 
num_questions = 0

imgs_path = os.path.join('/content/VQA_Dataset', 'Images')
train_json_path = os.path.join('/content/VQA_Dataset', 'train_questions_annotations.json')
test_json_path = os.path.join('/content/VQA_Dataset', 'test_questions.json')

dictionary = {
        '0': 0,
        '1': 1,
        '2': 2,
        '3': 3,
        '4': 4,
        '5': 5,
        'apple': 6,
        'baseball': 7,
        'bench': 8,
        'bike': 9,
        'bird': 10,
        'black': 11,
        'blanket': 12,
        'blue': 13,
        'bone': 14,
        'book': 15,
        'boy': 16,
        'brown': 17,
        'cat': 18,
        'chair': 19,
        'couch': 20,
        'dog': 21,
        'floor': 22,
        'food': 23,
        'football': 24,
        'girl': 25,
        'grass': 26,
        'gray': 27,
        'green': 28,
        'left': 29,
        'log': 30,
        'man': 31,
        'monkey bars': 32,
        'no': 33,
        'nothing': 34,
        'orange': 35,
        'pie': 36,
        'plant': 37,
        'playing': 38,
        'red': 39,
        'right': 40,
        'rug': 41,
        'sandbox': 42,
        'sitting': 43,
        'sleeping': 44,
        'soccer': 45,
        'squirrel': 46,
        'standing': 47,
        'stool': 48,
        'sunny': 49,
        'table': 50,
        'tree': 51,
        'watermelon': 52,
        'white': 53,
        'wine': 54,
        'woman': 55,
        'yellow': 56,
        'yes': 57
}
inverse_dictionary = {value:key for key, value in dictionary.items()}
N_CLASSES = len(dictionary)

tf.random.set_seed(SEED)
np.random.seed(SEED)

cwd = os.getcwd()

In [None]:
from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


In [None]:
!unzip drive/MyDrive/Assignment_3/anndl-2020-vqa.zip -d ./

In [None]:
if 'tokenizer' in globals():        # only if it does not exists yet
  del tokenizer

tokenizer = Tokenizer(num_words=500)
wordnet_lemmatizer = WordNetLemmatizer()

with open(train_json_path, 'r') as f:
  data = json.load(f)
  num_questions = len(data) # extracting num of questions for later

  print("Fitting tokenizer on questions...")
  for key in tq.tqdm(data):
    quest = data[key]['question'].lower().replace("?", "").replace("'s", "")
    quest = quest.split(" ")
    out = []
    for word in quest:
      out.append(wordnet_lemmatizer.lemmatize(word, pos="v"))
    
    tokenizer.fit_on_texts(out)


words_number = len(tokenizer.word_index) + 1 # No adding <eos>?

print(words_number)

Fitting tokenizer on questions...


HBox(children=(FloatProgress(value=0.0, max=58832.0), HTML(value='')))


3557


In [None]:
class DataGenerator(tf.keras.utils.Sequence):
  def __init__(self, answers, imageIDs, input_questions, batch_size, training, max_length,
               shuffle=True, img_h=128, img_w=128, channels=3, img_generator=None):
    self.answers = answers
    self.imageIDs = imageIDs
    self.input_questions = input_questions
    self.batch_size = batch_size
    self.shuffle = shuffle
    self.indexes = np.arange(len(self.answers)) # list of indexes on complete dataset
    self.max_length = max_length
    self.training = training
    self.img_h = img_h
    self.img_w = img_w
    self.channels = channels # RGB image
    self.img_generator = img_generator
    self.on_epoch_end()

  def __len__(self):
    return int(np.floor(len(self.imageIDs) / self.batch_size))

  def __getitem__(self, index):
    bs_index_start = index * self.batch_size;
    bs_index_end = bs_index_start + self.batch_size - 1; 
    indexes = self.indexes[bs_index_start:(bs_index_end+1)]

    input_x =  self._generate_x(indexes)
    
    if self.training: # if training, return input and also ground truth
      output_y = self._generate_y(indexes)
      return (input_x, output_y)
    
    else: # if testing, return input only
      return input_x

  def on_epoch_end(self):
    if self.shuffle:
      np.random.shuffle(self.indexes)

  def _generate_x(self, indexes):
    RGBimages = np.empty((self.batch_size, self.img_h, self.img_w, self.channels))
    questions = np.empty((self.batch_size, self.max_length))

    for i, ID in enumerate(indexes):
      RGBimages[i, ] = self._load_image(self.imageIDs[ID], self.img_w, self.img_h)
      questions[i, ] = (self.input_questions[ID]).tolist() 

    return [RGBimages, questions]
  
  def _generate_y(self, indexes):
    y = np.empty((self.batch_size, N_CLASSES), dtype=int)
    
    indexed_answers = [self.answers[i] for i in indexes]
    categorical = tf.keras.utils.to_categorical(indexed_answers, num_classes=N_CLASSES)

    for i, elem in enumerate(categorical):
      y[i] = elem;

    return y

  def _load_image(self, img_name, img_w, img_h):
    rgba_image = PIL.Image.open(imgs_path + '/' + img_name + ".png")
    rgb_image = rgba_image.convert('RGB')
    image = cv2.resize(np.array(rgb_image), (img_w, img_h))
    if self.img_generator is not None:
      img_t = self.img_generator.get_random_transform(image.shape, seed=SEED)
      image = self.img_generator.apply_transform(image, img_t)   
    image = image/ 255.
    return image

In [None]:
# extracts (questions, imageIDs, answers) from training json
def parseFold(data, foldNum):
  trainImageIDs = []
  trainQuestions = []
  trainAnswers = []
  validImageIDs = []
  validQuestions = []
  validAnswers = []
  initFold = foldNum*(num_questions//K)
  endFold = (foldNum+1)*(num_questions//K)
  i = 0
  for key in list(data):
    quest = data[key]['question'].lower().replace("?", "").replace("'s", "")
    quest = quest.split(" ")
    out = []
    for word in quest:
      out.append(wordnet_lemmatizer.lemmatize(word, pos="v"))
    imageID = data[key]['image_id']
    answer = data[key]['answer']

    if i in range(initFold, endFold):
      validQuestions.append(out)
      validImageIDs.append(imageID)
      validAnswers.append(dictionary[answer])
    else:
      trainQuestions.append(out)
      trainImageIDs.append(imageID)
      trainAnswers.append(dictionary[answer])
    i += 1
  
  return trainImageIDs, trainQuestions, trainAnswers, validImageIDs, validQuestions, validAnswers

# extracts (questionIDs, questions, imageIDs) from test json
def parseTestJson(data):
  questionIDs = []
  imageIDs = []
  questions = []

  for key in data:
    questionIDs.append(key)
    imageID = data[key]['image_id']
    quest = data[key]['question'].lower().replace("?", "").replace("'s", "")
    quest = quest.split(" ")
    out = []
    for word in quest:
      out.append(wordnet_lemmatizer.lemmatize(word, pos="v"))

    imageIDs.append(imageID)
    questions.append(out)

  return questionIDs, questions, imageIDs

In [None]:
inputData = []
with open(train_json_path, 'r') as f:
  data = json.load(f) 
  for i in tq.tqdm(range(K)):
    info = {}
    info['trainImageIDs'], info['trainQuestions'], info['trainAnswers'], info['validImageIDs'], info['validQuestions'], info['validAnswers'] = parseFold(data, i)
    inputData.append(info)
 
max_length = 0
for i in tq.tqdm(inputData):
  i['trainQuestions'] = tokenizer.texts_to_sequences(i['trainQuestions'])
  i['validQuestions'] = tokenizer.texts_to_sequences(i['validQuestions'])
  max_length = max(max_length, max(len(sequence) for sequence in i['trainQuestions']), max(len(sequence) for sequence in i['validQuestions']))
    
generators = []
for i in tq.tqdm(inputData):
  gen = {}
  i['trainQuestions'] = pad_sequences(i['trainQuestions'], maxlen=max_length)
  i['validQuestions'] = pad_sequences(i['validQuestions'], maxlen=max_length)
  gen['train_generator'] = DataGenerator(answers=i['trainAnswers'], 
                                                    imageIDs=i['trainImageIDs'], 
                                                    input_questions=i['trainQuestions'],
                                                    batch_size=bs,
                                                    shuffle=True,
                                                    training=True,
                                                    img_h=img_h,
                                                    img_w=img_w,
                                                    channels=n_channels,
                                                    max_length=max_length)
  
  gen['valid_generator'] = DataGenerator(answers=i['validAnswers'], 
                                                    imageIDs=i['validImageIDs'], 
                                                    input_questions=i['validQuestions'],
                                                    batch_size=bs,
                                                    shuffle=False,
                                                    training=True,
                                                    img_h=img_h,
                                                    img_w=img_w,
                                                    channels=n_channels,
                                                    max_length=max_length)
  generators.append(gen)

HBox(children=(FloatProgress(value=0.0, max=8.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=8.0), HTML(value='')))




HBox(children=(FloatProgress(value=0.0, max=8.0), HTML(value='')))




In [None]:
def CNN(out_dim, drop_rate):
  cnn_model = tf.keras.Sequential()
  # 1st block
  cnn_model.add(tf.keras.layers.Conv2D(filters=64, kernel_size=(3, 3), padding='same', kernel_initializer='he_uniform', input_shape=(img_h, img_w, n_channels)))
  cnn_model.add(keras.layers.BatchNormalization())
  cnn_model.add(tf.keras.layers.Activation('relu'))
  cnn_model.add(tf.keras.layers.MaxPool2D(pool_size=(2, 2)))
  cnn_model.add(tf.keras.layers.Dropout(drop_rate, seed=SEED))
  
  # 2nd block
  cnn_model.add(tf.keras.layers.Conv2D(filters=128, kernel_size=(3, 3), padding='same', kernel_initializer='he_uniform'))
  cnn_model.add(keras.layers.BatchNormalization())
  cnn_model.add(tf.keras.layers.Activation('relu'))
  cnn_model.add(tf.keras.layers.MaxPool2D(pool_size=(2, 2)))
  cnn_model.add(tf.keras.layers.Dropout(drop_rate, seed=SEED))
  
  # 3rd block
  cnn_model.add(tf.keras.layers.Conv2D(filters=256, kernel_size=(3, 3), padding='same', kernel_initializer='he_uniform'))
  cnn_model.add(keras.layers.BatchNormalization())
  cnn_model.add(tf.keras.layers.Activation('relu'))
  cnn_model.add(tf.keras.layers.MaxPool2D(pool_size=(2, 2)))
  cnn_model.add(tf.keras.layers.Dropout(drop_rate, seed=SEED))
  
  cnn_model.add(tf.keras.layers.GlobalAveragePooling2D())
  cnn_model.add(tf.keras.layers.Dense(units=out_dim, kernel_initializer='he_uniform'))
  cnn_model.add(keras.layers.BatchNormalization())
  cnn_model.add(tf.keras.layers.Activation('relu'))
  cnn_model.add(tf.keras.layers.Dropout(drop_rate, seed=SEED))

  return cnn_model

def RNN(words_number, embed_dim, max_length, drop_rate, out_dim):  
  rnn_model = tf.keras.Sequential()
  rnn_model.add(tf.keras.layers.Embedding(input_dim=words_number, output_dim=embed_dim, input_length=max_length))
  rnn_model.add(tf.keras.layers.LSTM(out_dim//2, return_sequences=True))
  rnn_model.add(tf.keras.layers.Dropout(drop_rate, seed=SEED))
  rnn_model.add(tf.keras.layers.LSTM(out_dim//2, return_sequences=False))
  rnn_model.add(tf.keras.layers.Dropout(drop_rate, seed=SEED))
  rnn_model.add(tf.keras.layers.Dense(out_dim, activation='tanh'))

  return rnn_model

def VQA(out_dim = 1024):
  drop_rate = 0.5
  
  CNN_net = CNN(out_dim=out_dim, drop_rate=drop_rate)
  RNN_net = RNN(words_number=500, embed_dim=512,
                     max_length=max_length, drop_rate=drop_rate, 
                     out_dim=out_dim)

  merge = tf.keras.layers.Multiply()([CNN_net.output, RNN_net.output])
  dense = tf.keras.layers.Dense(units=out_dim, kernel_initializer='he_uniform')(merge)
  batch = tf.keras.layers.BatchNormalization()(dense)
  act = tf.keras.layers.Activation('relu')(batch)
  drop = tf.keras.layers.Dropout(drop_rate, seed=SEED)(act)
  out = tf.keras.layers.Dense(N_CLASSES, activation='softmax')(drop)
  VQA_model = tf.keras.models.Model(inputs=[CNN_net.input, RNN_net.input], outputs=out)

  return VQA_model

**Perform K-Fold Cross Validation**

In [None]:
networks = []
for i in range(0,8):
  net = VQA()
  loss = tf.keras.losses.CategoricalCrossentropy()
  optimizer = tf.keras.optimizers.Adam(learning_rate=1e-3)
  metrics = ['accuracy']
  net.compile(optimizer=optimizer, loss=loss, metrics=metrics)
  callbacks = []
  callbacks.append(tf.keras.callbacks.ReduceLROnPlateau(monitor='val_loss', factor=0.1, patience=3, min_lr=1e-7, verbose=1, cooldown=0))
  callbacks.append(tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=3, restore_best_weights=True))
  net.fit(x=generators[i]['train_generator'],
            epochs=30,
            steps_per_epoch=len(generators[i]['train_generator']),
            validation_data=generators[i]['valid_generator'],
            validation_steps=len(generators[i]['valid_generator']),
            callbacks=callbacks)
  name = "VQA_" + str(i) 
  net.save(os.path.join('/content/drive/My Drive/ANN/Models/VQA', name))
  networks.append(net)

**LOAD THE MODELS TO PERFORM PREDICTIONS**

In [None]:
with open(test_json_path, 'r') as f:
  test_data = json.load(f)
  (test_questionIDs, test_questions, test_imageIDs) = parseTestJson(test_data)

test_questions_tokenized = tokenizer.texts_to_sequences(test_questions)
test_input_questions = pad_sequences(test_questions_tokenized, maxlen=max_length)
test_generator = DataGenerator(answers=test_questionIDs, 
                                imageIDs=test_imageIDs, 
                                input_questions=test_input_questions,
                                batch_size=1,
                                shuffle=False,
                                training=False,
                                img_h=img_h,
                                img_w=img_w,
                                channels=n_channels,
                                max_length=max_length)

In [None]:
models = []
for i in tq.tqdm(range(K)):
  net = tf.keras.models.load_model('/content/drive/My Drive/ANN/Models/VQA/VQA_'+str(i))
  models.append(net)

In [None]:
def create_csv(results, results_dir='/content/drive/My Drive/ANN'):
    csv_fname = 'results_'
    csv_fname += datetime.now().strftime('%b%d_%H-%M-%S') + '.csv'

    with open(os.path.join(results_dir, csv_fname), 'w') as f:
        f.write('Id,Category\n')
        for key, value in results.items():
            f.write(key + ',' + str(value) + '\n')

In [None]:
predictions = []
results = {}

for network in tq.tqdm(models):
  print('Model ' + str(models.index(network)))
  pred = []
  for i in tq.tqdm(test_generator):
    pred.append(network.predict(i))
  predictions.append(np.concatenate(pred))

predictions = np.mean(predictions, axis = 0)
for i in tq.tqdm(range(len(pred))):
  results[test_generator.list_IDs[i]] = np.argmax(predictions[i])

create_csv(results)