# Visual Question Answering model

Importing phase and creation of local variables needed for subsequent cells

In [None]:
import json, os
import random
from matplotlib.image import imread
import numpy as np
import tensorflow as tf
from PIL import Image

classes = {'0': 0,
'1': 1,
'10': 2,
'2': 3,
'3': 4,
'4': 5,
'5': 6,
'6': 7,
'7': 8,
'8': 9,
'9': 10,
'no': 11,
'yes': 12}

bst= 150
bsv= 150

Definition of **CustomDataGenerator** class: in this case this generator reads the question and take the features of the correspondent image. It outputs the tokenized question and the features vector.

In [None]:
class CustomDataGenerator(tf.keras.utils.Sequence):

    def __init__(self, data, batch_size, tokenizer, featuresMap, seed=1234, num_classes=13, shuffle=True, test = False):
        self.data = data  # data on wich perform
        self.batch_size = batch_size  # batch size
        self.featuresMap = featuresMap  # features of the images obtained from a pretrained model
        self.seed = seed  # seed for the shuffle operations
        self.num_classes = num_classes  # number of classes (13 in our case)
        self.test = test
        self.shuffle = shuffle
        self.on_epoch_end()  # boolean to say if to perform shuffle on each batch or not
        self.tok = tokenizer
        # set the seed
        random.seed(self.seed)
        np.random.seed(self.seed)

    def __len__(self):
        'method for the lenght of the generator'
        return int(np.floor(len(self.data) / self.batch_size))

    def on_epoch_end(self):
        'Updates indexes after each epoch'
        self.indexes = np.arange(len(self.data))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)

    def __getitem__(self, index):
        'returns a batch of (image, question) and answer'
        indexes = self.indexes[index * self.batch_size: (index + 1) * self.batch_size]
        data_temp = [self.data[k] for k in indexes]
        X = self._generate_X(data_temp)
        if self.test == False:
            y = self._generate_y(data_temp)
            return X, y
        else:
            return X

    def _generate_X(self, data_temp):
        'generates the batch of (image,question)'
        img_array = np.empty((self.batch_size, 512))
        question_array = np.empty((self.batch_size, 41))
        for i, dictionary in enumerate(data_temp):
            filename = dictionary.get('image_filename')
            image = np.array(self.featuresMap[filename])
            img_array[i,] = image.squeeze()
            token = self.tok.texts_to_sequences([dictionary.get('question')])
            padded_sequence = tf.keras.preprocessing.sequence.pad_sequences(token, maxlen=41)
            padded_sequence = padded_sequence.squeeze()
            question_array[i,] = padded_sequence
        x1 = np.array(img_array)
        x2 = np.array(question_array)
        return [x1, x2]

    def _generate_y(self, data_temp):
        'generates the one hot encoding of the answer'
        answer_array = []
        for dictionary in data_temp:
            answer_array.append(
                tf.keras.utils.to_categorical(classes[dictionary.get('answer')], num_classes=self.num_classes))
        y = np.array(answer_array)
        return y

Reading of the necessary files

In [None]:
cwd = os.getcwd()
datasetName = '../input/ann-and-dl-vqa/dataset_vqa'
jsonFiles = '../input/json-files'
tensors = '../input/vgg19-tensors-gap'
trainJsonName = 'train_data.json'
validJsonName = 'valid_data.json'

imagesPath = os.path.join(datasetName, 'train')
trainJsonPath = os.path.join(jsonFiles, trainJsonName)
validJsonPath = os.path.join(jsonFiles, validJsonName)

trainTensors = os.path.join(tensors, "train_tensors_VGG19_GAP.json")
validTensors = os.path.join(tensors, "valid_tensors_VGG19_GAP.json")

seed=1234

with open(trainJsonPath,'r') as json_file_train, open (validJsonPath, 'r') as json_file_valid:
    data_train = json.load(json_file_train).get('questions')
    data_valid = json.load(json_file_valid).get('questions')
    json_file_train.close()
    json_file_valid.close()

train_size = len(data_train)
valid_size = len(data_valid)

with open(trainTensors,'r') as json_file_train, open (validTensors, 'r') as json_file_valid:
    train_features = json.load(json_file_train)
    valid_features = json.load(json_file_valid)
    json_file_train.close()
    json_file_valid.close()

Creation of the Tokenizer and instantiation of two CustomDataGenerator, one for training and the other for validation

In [None]:
questions = [el['question'] for el in data_train]
words = set()
maxLength = 0
for q in questions:
    seq = tf.keras.preprocessing.text.text_to_word_sequence(q)
    if maxLength < len(seq): maxLength = len(seq)
    for x in seq:
        words.add(x)
# number of different words in our sequences or vocaboulary size
n_words = len(words)
# Tokenizer and indexes creation
tok = tf.keras.preprocessing.text.Tokenizer(num_words=n_words)
tok.fit_on_texts(questions)

gen_train = CustomDataGenerator(data_train,bst, tok,train_features,seed,num_classes = 13,shuffle = True,test = False)
gen_val = CustomDataGenerator(data_valid,bsv, tok,valid_features,seed,num_classes = 13, shuffle = False, test = False)

Definition of the structure of the Neural Network and train

In [None]:
#Structure of the CNN and of the RNN

#Convolutional Neural Network

inp1 = tf.keras.Input(shape = (512))

dense1 = tf.keras.layers.Dense(units=256, activation= tf.keras.activations.relu, kernel_initializer = 'he_uniform')(inp1)

#Recurrent Neural Network with LSTM
inp2 = tf.keras.Input(name='input_LSTM', shape=(41))
r = tf.keras.layers.Embedding(input_dim=71, output_dim=32)(inp2)
r = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(units=256, return_sequences=True))(r)
r = tf.keras.layers.Bidirectional(tf.keras.layers.LSTM(units=256, return_sequences=False))(r)
dense2 = tf.keras.layers.Dense(units=256, activation = tf.keras.activations.relu, kernel_initializer = 'he_uniform')(r)

conc = tf.keras.layers.Concatenate()([dense1, dense2])
d = tf.keras.layers.Dense(units=1024, activation=tf.keras.activations.relu, kernel_initializer = 'he_uniform')(conc)
d = tf.keras.layers.Dropout(0.3)(d)
d = tf.keras.layers.Dense(units=1024, activation=tf.keras.activations.relu, kernel_initializer = 'he_uniform')(d)
d = tf.keras.layers.Dropout(0.3)(d)
out = tf.keras.layers.Dense(units=13, activation=tf.keras.activations.softmax)(d)

model = tf.keras.Model([inp1, inp2], out)
model.summary()

loss = tf.keras.losses.CategoricalCrossentropy()
optimizer = tf.keras.optimizers.Adam(learning_rate = 1e-3)
metrics = ['accuracy']

model.compile(metrics=metrics, optimizer=optimizer, loss=loss)

model.fit_generator(gen_train, steps_per_epoch=len(gen_train), validation_data=gen_val, validation_steps=len(gen_val), epochs = 10 , workers = 4)#,callbacks = [callback_chkpt])

from datetime import datetime

modelName = 'model_' + str(datetime.now().strftime('%b%d_%H-%M-%S')) + '.h5'
model.save(modelName)

Testing phase of the Neural Network and creation of the .csv file for submission 

In [None]:
def create_csv(results, results_dir='./'):

    csv_fname = 'results_'
    csv_fname += datetime.now().strftime('%b%d_%H-%M-%S') + '.csv'

    with open(os.path.join(results_dir, csv_fname), 'w') as f:

        f.write('Id,Category\n')

        for key, value in results.items():
            f.write(str(key) + ',' + str(value) + '\n')

testJsonName = 'test_data.json'

testJsonPath = os.path.join(datasetName, testJsonName)

testTensors = os.path.join(tensors, "test_tensors_VGG19_GAP.json")
seed=1234

with open(testJsonPath,'r') as json_file_test:
    data_test = json.load(json_file_test).get('questions')
    json_file_test.close()

with open(testTensors,'r') as json_file_test:
    test_features = json.load(json_file_test)
    json_file_test.close()

print('Test set length:' + str(len(data_test)))
test_gen = CustomDataGenerator(data_test,1,tok,test_features, seed = seed, shuffle=False,test = True)

predictions = model.predict_generator(test_gen)
print('Predictions vector length:' + str(len(predictions)))

results = {}

work_pr = []
for i in range(len(predictions)):
    work_pr.append(tf.argmax(predictions[i], axis=-1).numpy())


for i in range(len(data_test)):
    results[data_test[i].get('question_id')] = work_pr[i]

create_csv(results)
print('CSV written!')