# Data preparation

In [1]:
import os
import tensorflow as tf
import numpy as np
import pandas as pd
import random

SEED = 1234
tf.random.set_seed(SEED)
random.seed(SEED)

test_name = 'test_2'

In [2]:
# Set the base directory for Colab and non Colab environment
if 'google.colab' in str(get_ipython()):
  from google.colab import drive
  drive.mount('/content/drive')
  base_dir = '/content/drive/My Drive/AN2DL/homework_3'
else:
  base_dir = os.getcwd()

# Unzip the data set
dataset_dir = 'VQA_Dataset'
if not os.path.exists(dataset_dir):
    !unzip '/content/drive/MyDrive/AN2DL/homework_3/anndl-2020-vqa.zip'

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
from tensorflow.keras.preprocessing.image import ImageDataGenerator

# Create training ImageDataGenerator object for images
train_img_data_gen = ImageDataGenerator(rotation_range=10,
                                        width_shift_range=10,
                                        height_shift_range=10,
                                        zoom_range=0.3,
                                        horizontal_flip=True,
                                        vertical_flip=True,
                                        fill_mode='nearest',
                                        rescale=1./255)

# Create validation ImageDataGenerator object
valid_img_data_gen = ImageDataGenerator(rescale=1./255)

In [4]:
import json
from PIL import Image
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences

# Define a custom dataset class extending Keras Sequence
class CustomDataset(tf.keras.utils.Sequence):
  """
    CustomDataset inheriting from tf.keras.utils.Sequence.

    3 main methods:
      - __init__: save dataset params
      - __len__: return the total number of samples in the dataset
      - __getitem__: return a sample from the dataset
  """


  LABELS_DICT = {
    '0': 0,
    '1': 1,
    '2': 2,
    '3': 3,
    '4': 4,
    '5': 5,
    'apple': 6,
    'baseball': 7,
    'bench': 8,
    'bike': 9,
    'bird': 10,
    'black': 11,
    'blanket': 12,
    'blue': 13,
    'bone': 14,
    'book': 15,
    'boy': 16,
    'brown': 17,
    'cat': 18,
    'chair': 19,
    'couch': 20,
    'dog': 21,
    'floor': 22,
    'food': 23,
    'football': 24,
    'girl': 25,
    'grass': 26,
    'gray': 27,
    'green': 28,
    'left': 29,
    'log': 30,
    'man': 31,
    'monkey bars': 32,
    'no': 33,
    'nothing': 34,
    'orange': 35,
    'pie': 36,
    'plant': 37,
    'playing': 38,
    'red': 39,
    'right': 40,
    'rug': 41,
    'sandbox': 42,
    'sitting': 43,
    'sleeping': 44,
    'soccer': 45,
    'squirrel': 46,
    'standing': 47,
    'stool': 48,
    'sunny': 49,
    'table': 50,
    'tree': 51,
    'watermelon': 52,
    'white': 53,
    'wine': 54,
    'woman': 55,
    'yellow': 56,
    'yes': 57
  }


  def __init__(self, which_subset, dataset_items, tokenizer=None, max_question_length=None, img_generator=None, out_shape=None):
    """
      Initialize the object.

      Keyword arguments:
      which_subset -- 'training' for the training set, else 'validation'
      tokenizer -- tokenizer object for the 'validatio' set, in case of 'training' the object will be created
      max_question_length -- max length of a question of the training set (used for tokenizing 'validation'), in case of 'training' the parameter will be calculated
      img_generator -- ImageDataGenerator objet to apply to the images or None
      out_shape -- output shape for the images, a tuple (height, width) or None for original size
    """

    # Splitting training and validation
    training_count = int(0.8 * len(dataset_items))
    validation_count = len(dataset_items) - training_count

    training_items = dataset_items[:training_count]
    validation_items = dataset_items[-validation_count:]

    if which_subset == 'training':
      # Creating Tokenizer
      tokenizer = Tokenizer()
      train_questions = list(map(lambda x: x['question'], training_items))

      # Fit tokenizer and tokenize training questions
      tokenizer.fit_on_texts(train_questions)
      tokenized_train_questions = tokenizer.texts_to_sequences(train_questions)

      max_question_length = max(len(question) for question in tokenized_train_questions)

      question_inputs = pad_sequences(tokenized_train_questions, maxlen=max_question_length, padding='post')
      items = training_items

    else:
      # Using tokenizer from parameters to tokenize validation questions
      valid_questions = list(map(lambda x: x['question'], validation_items))
      tokenized_valid_questions = tokenizer.texts_to_sequences(valid_questions)
      question_inputs = pad_sequences(tokenized_valid_questions, maxlen=max_question_length, padding='post')
      items = validation_items
    
    # Setting dimension of the dictionary
    dictionary_dim = len(tokenizer.word_index)
    
    # Set class properties
    self.dataset_file = dataset_file
    self.which_subset = which_subset
    self.items = items
    self.question_inputs = question_inputs
    self.max_question_length = max_question_length
    self.dictionary_dim = dictionary_dim
    self.tokenizer = tokenizer
    self.img_generator = img_generator
    self.out_shape = out_shape

  def __len__(self):
    """
      Return the length of the dataset.
    """
    return len(self.items)

  def __getitem__(self, index):
    """
      Return an item from the set.

      Keyword arguments:
      index -- index of the item to return
    """

    # Read Image
    curr_item = self.items[index]
    img_id = curr_item['image_id']
    img = Image.open(os.path.join(dataset_dir, 'Images', img_id + '.png'))

    # Convert image from RGBA to RGB
    img = img.convert('RGB')

    # Resize image
    if self.out_shape is not None:
      img = img.resize(self.out_shape)

    img_arr = np.array(img)

    if self.img_generator is not None:
      # Perform data augmentation
      # Get a random transformation from the ImageDataGenerator and we can apply it to the image
      img_t = self.img_generator.get_random_transform(img_arr.shape, seed=SEED)
      img_arr = self.img_generator.apply_transform(img_arr, img_t)

    # Convert answer to one-hot
    answer = np.zeros(len(self.LABELS_DICT))
    answer[self.LABELS_DICT[curr_item['answer']]] = 1

    return (self.question_inputs[index], img_arr/255), answer

In [5]:
# Set the sizes for the reshaped images
red = 0.8
img_w = int(700*red)
img_h = int(400*red)

num_classes = 58

# Set batch size
bs = 16

dataset_filepath = os.path.join(dataset_dir, 'train_questions_annotations.json')
with open(dataset_filepath, 'r') as f:
  dataset_file = json.load(f)

items = list(map(lambda x: x[1], dataset_file.items()))
random.shuffle(items)

# Create training and validation set generators
dataset = CustomDataset('training', dataset_items=items, img_generator=train_img_data_gen, out_shape=(img_w, img_h))
dataset_valid = CustomDataset('validation', dataset_items=items, tokenizer=dataset.tokenizer, max_question_length=dataset.max_question_length, img_generator=valid_img_data_gen, out_shape=(img_w, img_h))

In [6]:
# Create training data set from the generator
train_dataset = tf.data.Dataset.from_generator(lambda: dataset,
                                               output_types=((np.int32, np.float32), np.int32),
                                               output_shapes=(([dataset.max_question_length,], [img_h, img_w, 3]), (58)))
train_dataset = train_dataset.batch(bs)

train_dataset = train_dataset.repeat()

# Create validation data set from the generator
valid_dataset = tf.data.Dataset.from_generator(lambda: dataset_valid,
                                               output_types=((np.int32, np.float32), np.int32),
                                               output_shapes=(([dataset.max_question_length,], [img_h, img_w, 3]), (58)))
valid_dataset = valid_dataset.batch(1)

valid_dataset = valid_dataset.repeat()

# Model

In [7]:
# Load GloVe embedding dictionary
embeddings_index = {}
f = open(os.path.join(base_dir, 'glove.6B', 'glove.6B.300d.txt'))
for line in f:
    values = line.split()
    word = values[0]
    coefs = np.asarray(values[1:], dtype='float32')
    embeddings_index[word] = coefs
f.close()

In [8]:
# Create embedding matrix from data set tokenizer and GloVe
embedding_matrix = np.zeros((dataset.dictionary_dim + 1, 300))
for word, i in dataset.tokenizer.word_index.items():
    embedding_vector = embeddings_index.get(word)
    if embedding_vector is not None:
        # words not found in embedding index will be all-zeros.
        embedding_matrix[i] = embedding_vector

In [9]:
class TransformerEncoder(tf.keras.layers.Layer):
    def __init__(self, embed_dim, num_heads, ff_dim, rate=0.1):
        super(TransformerEncoder, self).__init__()
        self.att = tf.keras.layers.MultiHeadAttention(num_heads, embed_dim)
        self.ffn = tf.keras.Sequential(
            [tf.keras.layers.Dense(ff_dim, activation="relu"),
             tf.keras.layers.Dropout(rate),
             tf.keras.layers.Dense(embed_dim)]
        )
        self.layernorm1 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.layernorm2 = tf.keras.layers.LayerNormalization(epsilon=1e-6)
        self.dropout1 = tf.keras.layers.Dropout(rate)

    def call(self, inputs, training=True):
        if len(inputs) == 3:
            attn_output = self.att(inputs[0], inputs[1], inputs[2])
        if len(inputs) == 2:
            attn_output = self.att(inputs[0], inputs[1], inputs[1])
        if len(inputs) == 1:
            attn_output = self.att(inputs[0], inputs[0], inputs[0])

        attn_output = self.dropout1(attn_output, training=training)
        out1 = self.layernorm1(inputs[0] + attn_output)
        
        ffn_output = self.ffn(out1)
        return self.layernorm2(out1 + ffn_output)

In [10]:
class TokenAndPositionEmbedding(tf.keras.layers.Layer):
    def __init__(self, maxlen, vocab_size, embed_dim, embedding_matrix):
        super(TokenAndPositionEmbedding, self).__init__()
        self.token_emb = tf.keras.layers.Embedding(input_dim=vocab_size,
                                                   output_dim=embed_dim,
                                                   weights=[embedding_matrix],
                                                   trainable=False)
        self.pos_emb = tf.keras.layers.Embedding(input_dim=maxlen, output_dim=embed_dim)

    def call(self, x):
        maxlen = tf.shape(x)[-1]
        positions = tf.range(start=0, limit=maxlen, delta=1)
        positions = self.pos_emb(positions)
        x = self.token_emb(x)
        return x + positions

In [11]:
class SelfAttention(tf.keras.layers.Layer):
    def __init__(self, ff_dim, rate=0.2):
        super(SelfAttention, self).__init__()
        self.att = tf.keras.layers.Attention()
        self.ffn = tf.keras.Sequential(
            [tf.keras.layers.Dense(ff_dim, activation="relu"),
             tf.keras.layers.Dropout(rate)]
        )
        self.layernorm = tf.keras.layers.LayerNormalization()

    def call(self, inputs, training=True):
        attn = self.att([inputs, inputs])
        ffn_output = self.ffn(attn)      
        return self.layernorm(inputs + ffn_output)

In [None]:
# Free up RAM in case the model definition cells were run multiple times
tf.keras.backend.clear_session()

input_length = dataset.max_question_length
vocab_size = dataset.dictionary_dim + 1
embed_dim = 300  # Embedding size for each token
num_heads = 8  # Number of attention heads
encode_dim = 300
ff_dim = 600  # Hidden layer size in feed forward network inside transformer

# CNN for extracing image features
image_input = tf.keras.layers.Input(shape=(img_h, img_w, 3))
image_enc = tf.keras.applications.ResNet101(input_tensor=image_input, include_top=False, weights='imagenet').output

image_enc = tf.keras.layers.GlobalAveragePooling2D()(image_enc)
image_enc = tf.keras.layers.Dropout(0.2)(image_enc)
image_enc = tf.keras.layers.Reshape((8,256))(image_enc)

image_enc = SelfAttention(256)(image_enc)
image_enc = tf.keras.layers.Dense(128, activation="relu")(image_enc)
image_enc = SelfAttention(128)(image_enc)
image_enc = tf.keras.layers.Dense(300, activation="relu")(image_enc)
image_enc = SelfAttention(300)(image_enc)

# Question embedding
question_input = tf.keras.layers.Input(shape=(input_length), dtype='int32')
question_embedding = TokenAndPositionEmbedding(maxlen=input_length,
                                               vocab_size=vocab_size,
                                               embed_dim=embed_dim,
                                               embedding_matrix=embedding_matrix)(question_input)
                                                                            
question_enc = TransformerEncoder(encode_dim, num_heads, ff_dim)([question_embedding])
question_enc = TransformerEncoder(encode_dim, num_heads, ff_dim)([question_enc])
question_enc = TransformerEncoder(encode_dim, num_heads, ff_dim)([question_enc])
question_enc = TransformerEncoder(encode_dim, num_heads, ff_dim)([question_enc])

attention_vector_1 = tf.keras.layers.Attention()([question_enc, image_enc])
attention_vector_2 = tf.keras.layers.Attention()([image_enc, question_enc])

attention_vector_1 = tf.keras.layers.GlobalAveragePooling1D()(attention_vector_1)
attention_vector_2 = tf.keras.layers.GlobalAveragePooling1D()(attention_vector_2)

merged = tf.keras.layers.Concatenate()([attention_vector_1, attention_vector_2])
merged = tf.keras.layers.Dropout(0.5)(merged)
output = tf.keras.layers.Dense(units=num_classes, activation='softmax')(merged)
model = tf.keras.models.Model(inputs=[question_input, image_input], outputs=output)

# Visualize created model
model.summary()

# Training

In [14]:
# Loss function
loss = tf.keras.losses.CategoricalCrossentropy() 

# Learning rate and oprimizer
lr = 1e-4
optimizer = tf.keras.optimizers.Adam(learning_rate=lr)

# Number of epochs
epochs = 100

# Validation metrics
metrics = ['accuracy']

# Compile Model
model.compile(optimizer=optimizer, loss=loss, metrics=metrics)

In [16]:
exp_dir = os.path.join(base_dir, test_name)
if not os.path.exists(exp_dir):
  os.makedirs(exp_dir)
    
callbacks = []

# Checkpoint callback, generate a checkpoint at each epoch
# There is only one checkpoint overwritten only if the new one has an lower validation loss
ckpt_dir = os.path.join(exp_dir, 'checkpoints')
if not os.path.exists(ckpt_dir):
  os.makedirs(ckpt_dir)
checkpoint = os.path.join(ckpt_dir, 'checkpoint_' + test_name + '.ckpt')
ckpt_callback = tf.keras.callbacks.ModelCheckpoint(filepath=checkpoint,
                                                   save_weights_only=True,
                                                   monitor='val_loss',
                                                   mode='min',
                                                   save_best_only=True)
callbacks.append(ckpt_callback)

# Callback for tensorboard logs
tb_dir = os.path.join(exp_dir, 'tb_logs')
if not os.path.exists(tb_dir):
  os.makedirs(tb_dir)
    
tb_callback = tf.keras.callbacks.TensorBoard(log_dir=tb_dir,
                                             profile_batch=0,
                                             histogram_freq=1)
callbacks.append(tb_callback)

# Callback for early stopping in order to optimize the number of epochs
es_callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', mode='min', patience=5)
callbacks.append(es_callback)

In [None]:
%load_ext tensorboard
%tensorboard --logdir '$base_dir'

In [None]:
# If there exists a checkpoint it's loaded, otherwise the model is trained
# If train is True and there exists a checkpoint the trainig continues from it

train = True
load = False

if os.path.exists(os.path.join(ckpt_dir, 'checkpoint')) and load:
  model.load_weights(checkpoint)

if train:
  model.fit(x=train_dataset,
            epochs=epochs,
            steps_per_epoch=len(dataset)/bs,
            validation_data=valid_dataset,
            validation_steps=len(dataset_valid),
            callbacks=callbacks)
  
  # Reload the weights to load the best checkpoint
  model.load_weights(checkpoint)

  # Save model as h5
  # model_file = os.path.join(exp_dir, test_name + '.h5')
  # model.save(model_file, include_optimizer=False)

# Prediction

In [None]:
import json

# Image data set path
test_filepath = os.path.join(dataset_dir, 'test_questions.json')

# Load test dictionary
with open(test_filepath, 'r') as f:
  test_items = json.load(f)

# Initialize submission dict
submission_dict = {}

for key, item in test_items.items():
  # Open test image
  img = Image.open(os.path.join(dataset_dir, 'Images', item['image_id'] + '.png'))

  # Convert image to RGB and resize
  img = img.convert('RGB')
  img = img.resize((img_w, img_h))
  img_arr = np.array(img)/255

  # Tokenize question
  question = item['question']
  tokenized_question = dataset.tokenizer.texts_to_sequences([question])
  question_input = pad_sequences(tokenized_question, maxlen=dataset.max_question_length, padding='post')

  # Prediction
  out_sigmoid = model.predict([question_input, np.expand_dims(img_arr, axis=0)])[0]

  prediction = tf.argmax(out_sigmoid, -1).numpy()

  submission_dict[key] = prediction

In [None]:
import os

csv_fname = 'submission.csv'

# Write submission dict as csv file
with open(os.path.join(exp_dir, csv_fname), 'w') as f:

    f.write('Id,Category\n')

    for key, value in submission_dict.items():
        f.write(key + ',' + str(value) + '\n')