# VISUAL QUESTION ANSWERING
We started from a base model which comprised a convolutional net in parallel with a recurrent Net. The convolutional section dealt with images while the convolutional part delt with questions (natural language).  
Approaches:
- Base ensemble model: we trained 3 different networks for each question category:
  - yes/no
  - counting
  - other
  on top of this we trained a classifier which would recognise the class to which each question belonged. The classifier was a recurrent network with Bidirectional GRU.
  Accuracy 0.034 (probably due to an error at the prediction phase)
- Transfer Model: we used NasNetMobile (Large) as the convolutional network and left GRUs on the recurrent network. We left the whole transfer model frozen and then we ran another training round with the model unfrozen and a low learning rate (fine tuning)
Accuracy 0.363 (not fine tuned)
Accuracy 0.365 (fine tuned)
- Transfer Model V2: on top of the transfer model we also pre-loaded glove weights on the embedding layer and marked it as non trainable. 
Accuracy 0.25 (not fine tuned)
- Transfer Model V2 (ensemble): we followed the same procedure of the base ensemble model but we used the transfer model v2 as a base, moreover we tweaked for each of the 3 question models regularization in order to better fit the different problems
Accuracy 0.12


In [None]:
%tensorflow_version 2.x
from google.colab import drive
drive.mount('/content/drive')
!cp "/content/drive/My Drive/VQA_Dataset.zip" .
!mkdir Results
!unzip -q VQA_Dataset.zip

Mounted at /content/drive


In [None]:
!pip install focal-loss

In [None]:
# Cell output set up for Jupyter
from pathlib import Path
from IPython.core.interactiveshell import InteractiveShell
InteractiveShell.ast_node_interactivity = "all"

In [None]:
import tensorflow as tf
import json
import random 
import math

SEED = 1234
tf.random.set_seed(SEED) 


# Config

In [None]:
from dataclasses import dataclass
from typing import Dict, Union, Optional, Callable

@dataclass
class Config: 
  max_length: int = None
  batch_size: int = 64
  split: float = 0.8
  dataset_name: Path = Path("VQA_Dataset")
  augmentation: Dict[str, Union[bool,int,Optional[Callable],str]] = None
  # 2 in case of yes_or_no, 6 in case of counting, 50 in case of other, 58 in case of all
  num_classes: int = 58
  img_w: int = 350
  img_h: int = 200
  wtoi: int = None
  #Questions can be 'all','yes_or_no','counting','other'
  questions: str = 'all'
  tokeinizer = None
config = Config()

In [None]:
labels_dict = {
        '0': 0,
        '1': 1,
        '2': 2,
        '3': 3,
        '4': 4,
        '5': 5,
        'apple': 6,
        'baseball': 7,
        'bench': 8,
        'bike': 9,
        'bird': 10,
        'black': 11,
        'blanket': 12,
        'blue': 13,
        'bone': 14,
        'book': 15,
        'boy': 16,
        'brown': 17,
        'cat': 18,
        'chair': 19,
        'couch': 20,
        'dog': 21,
        'floor': 22,
        'food': 23,
        'football': 24,
        'girl': 25,
        'grass': 26,
        'gray': 27,
        'green': 28,
        'left': 29,
        'log': 30,
        'man': 31,
        'monkey bars': 32,
        'no': 33,
        'nothing': 34,
        'orange': 35,
        'pie': 36,
        'plant': 37,
        'playing': 38,
        'red': 39,
        'right': 40,
        'rug': 41,
        'sandbox': 42,
        'sitting': 43,
        'sleeping': 44,
        'soccer': 45,
        'squirrel': 46,
        'standing': 47,
        'stool': 48,
        'sunny': 49,
        'table': 50,
        'tree': 51,
        'watermelon': 52,
        'white': 53,
        'wine': 54,
        'woman': 55,
        'yellow': 56,
        'yes': 57
}

In [None]:
if config.questions == 'yes_or_no':
  for key in set(labels_dict.keys()).difference({'yes','no'}):
    del labels_dict[key]
elif config.questions == 'counting':
  for key in set(labels_dict.keys()).difference({'0','1','2','3','4','5'}):
    del labels_dict[key]
elif config.questions != 'all':
  for key in {'yes','no','0','1','2','3','4','5'}:
    del labels_dict[key]
counter = 0
for k in labels_dict.keys():
  labels_dict[k] = counter
  counter += 1 
labels_dict

{'0': 0,
 '1': 1,
 '2': 2,
 '3': 3,
 '4': 4,
 '5': 5,
 'apple': 6,
 'baseball': 7,
 'bench': 8,
 'bike': 9,
 'bird': 10,
 'black': 11,
 'blanket': 12,
 'blue': 13,
 'bone': 14,
 'book': 15,
 'boy': 16,
 'brown': 17,
 'cat': 18,
 'chair': 19,
 'couch': 20,
 'dog': 21,
 'floor': 22,
 'food': 23,
 'football': 24,
 'girl': 25,
 'grass': 26,
 'gray': 27,
 'green': 28,
 'left': 29,
 'log': 30,
 'man': 31,
 'monkey bars': 32,
 'no': 33,
 'nothing': 34,
 'orange': 35,
 'pie': 36,
 'plant': 37,
 'playing': 38,
 'red': 39,
 'right': 40,
 'rug': 41,
 'sandbox': 42,
 'sitting': 43,
 'sleeping': 44,
 'soccer': 45,
 'squirrel': 46,
 'standing': 47,
 'stool': 48,
 'sunny': 49,
 'table': 50,
 'tree': 51,
 'watermelon': 52,
 'white': 53,
 'wine': 54,
 'woman': 55,
 'yellow': 56,
 'yes': 57}

# Dataset Split


In [None]:
def question_split():
  annotations = json.load(config.dataset_name.joinpath("train_questions_annotations.json").open())
  yes_or_no_annotations = {}
  counting_annotations = {}
  other_annotations = {}
  class_occurrences = {}
  for label in labels_dict:
    class_occurrences[label] = 0
  
  for k,v in annotations.items():
    class_occurrences[v['answer']] = class_occurrences[v['answer']] + 1 
    if v['answer'] in {'yes','no'}:
      dict_to_write = yes_or_no_annotations
    elif v['answer'] in {'0','1','2','3','4','5'}:
      dict_to_write = counting_annotations
    else:
      dict_to_write = other_annotations
    dict_to_write[k] = v
  json.dump(yes_or_no_annotations,config.dataset_name.joinpath("yes_or_no.json").open("w+"))
  json.dump(counting_annotations,config.dataset_name.joinpath("counting.json").open("w+"))
  json.dump(other_annotations,config.dataset_name.joinpath("other.json").open("w+"))
  json.dump(class_occurrences,config.dataset_name.joinpath("class_occurrences.json").open("w+"))
question_split()




In [None]:

def split(config : Config,to_split : str): 
  
  train_validation_dict = json.load(config.dataset_name.joinpath(to_split).open())
  train_validation_key_set = set(train_validation_dict.keys())
  question_num = len(train_validation_key_set)
  question_num_train = math.floor(config.split * question_num)
  question_num_val = question_num - question_num_train
  validation_dict = {}
  for i in range(0,question_num_val):
    to_add_index = random.randint(0,question_num - i -1)
    to_add_value = list(train_validation_dict.values())[to_add_index]
    to_add_key = list(train_validation_dict.keys())[to_add_index]
    validation_dict[to_add_key] = to_add_value
    train_validation_dict.pop(to_add_key)

  train_dict = train_validation_dict
  json.dump(train_dict,config.dataset_name.joinpath("train.json").open("w+"))
  json.dump(validation_dict,config.dataset_name.joinpath("validation.json").open("w+"))

path = "train_questions_annotations.json" if config.questions == 'all' else "yes_or_no.json" if config.questions == 'yes_or_no' else "counting.json" if config.questions == 'counting' else 'other.json'
split(config,path)
  

# VQA Custom Dataset


In [None]:
from PIL import Image
from tensorflow.keras.preprocessing.image import ImageDataGenerator
import string
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from collections import OrderedDict

def encode_input(data_dict):
  question_list = []
  question_max = 0
  for v in data_dict.values():
    question = v['question'].strip()
    question_words = len(question.translate(str.maketrans("","",string.punctuation)).split(' '))
    question_max = question_words if question_words > question_max else question_max
    question_list.append(question)
    #question_list_eos.append(question + '<eos>')
    #question_list_sos.append('<sos>' + question)
    # Create Tokenizer to convert words to integers
  tokenizer = Tokenizer(num_words= question_max)
  config.tokenizer = tokenizer
  tokenizer.fit_on_texts(question_list)
  tokenized = tokenizer.texts_to_sequences(question_list)
  config.wtoi = len(tokenizer.word_index)
  config.max_length = 14
  return pad_sequences(tokenized,maxlen=config.max_length)

class CustomDataset(tf.keras.utils.Sequence):

  def __init__(self,config : Config, which_subset : str, image_generator=None):
    self.config = config
    self.image_generator = image_generator
    self.which_subset = which_subset
    if self.which_subset == 'training':
      self.data_dict = json.load(Path(config.dataset_name).joinpath("train.json").open(),object_pairs_hook=OrderedDict)
    elif self.which_subset == 'validation':
      self.data_dict = json.load(Path(config.dataset_name).joinpath("validation.json").open(),object_pairs_hook=OrderedDict)
    else:
      raise Exception("Unsupported which subset: "+ str(which_subset))
    #question_list_eos = []
    #question_list_sos = []
    self.encoder_inputs = encode_input(self.data_dict)
  def __len__(self):
    return len(self.data_dict)

  def __getitem__(self, index):
    item_key = list(self.data_dict.keys())[index]
    item_value = self.data_dict[item_key]
    item_image_id = item_value['image_id']
    item_image = Image.open(Path(self.config.dataset_name).joinpath("Images",item_image_id +  ".png"))
    item_image = item_image.convert("RGB")
    item_image = item_image.resize((config.img_w,config.img_h))
    item_image_arr = np.array(item_image).transpose((1, 0, 2))
    if self.which_subset == 'training' and self.image_generator is not None:
      transform = self.image_generator.get_random_transform(item_image_arr.shape, seed=SEED)
      item_image_arr = self.image_generator.apply_transform(item_image_arr, transform)
    target = int(labels_dict[item_value['answer']])
    return {'image':item_image_arr , 'question':self.encoder_inputs[index]} , target

In [None]:
config.augmentation = {
  'rotation_range':20,
  'width_shift_range':0.1,
  'height_shift_range':0.1,
  'zoom_range':0.1,
  'horizontal_flip':True,
  'fill_mode':"nearest",
  'rescale':1./255,
  'preprocessing_function': None
}

image_generator = ImageDataGenerator(**config.augmentation)

train_dataset = CustomDataset(config,"training",image_generator=image_generator)
val_dataset = CustomDataset(config,"validation")

In [None]:
import numpy as np
train_gen = tf.data.Dataset.from_generator(lambda: train_dataset,
                                               output_signature=({'image':tf.TensorSpec(shape=(config.img_w, config.img_h, 3),dtype=np.uint8),
                                                                 'question':tf.TensorSpec(shape=(config.max_length),dtype=np.int32)},
                                                                 tf.TensorSpec(shape=(),dtype=np.int32)
                                                                 ))

train_gen = train_gen.batch(config.batch_size)

train_gen = train_gen.repeat()

valid_gen = tf.data.Dataset.from_generator(lambda: val_dataset,
                                              output_signature=({'image':tf.TensorSpec(shape=(config.img_w, config.img_h, 3),dtype=np.uint8),
                                                                 'question':tf.TensorSpec(shape=(config.max_length),dtype=np.int32)},
                                                                 tf.TensorSpec(shape=(),dtype=np.int32)
                                                                 ))
valid_gen = valid_gen.batch(config.batch_size)

valid_gen = valid_gen.repeat()

# Pre-Trained Embedding

In [None]:
!wget https://nlp.stanford.edu/data/wordvecs/glove.840B.300d.zip

In [None]:
!unzip glove.840B.300d.zip

In [None]:
%%capture
from pathlib import Path
import numpy as np
embeddings_index = dict()
f = Path('./glove.840B.300d.txt').open()

for line in f:
    values = line.split()
    word = values[0]
    try:
      coefs = np.asarray(values[1:], dtype='float32')
    except Exception:
      continue
    embeddings_index[word] = coefs

f.close()

size_of_vocabulary = config.wtoi+1
embedding_matrix = np.zeros((size_of_vocabulary, 300))

for word, i in config.tokenizer.word_index.items():
    embedding_vector = embeddings_index.get(word)
    if embedding_vector is not None:
        embedding_matrix[i] = embedding_vector

# VQA Base Model

In [None]:
# Import Keras 
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, GlobalAveragePooling2D , Input, LSTM, Embedding, Dense, Activation, BatchNormalization
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.constraints import MaxNorm
regularizer = tf.keras.regularizers.l2(1e-4)
constraint = MaxNorm(3)
# Define CNN for Image Input
vision_model = Sequential()
vision_model.add(Conv2D(64, (3, 3), padding='same', input_shape=(config.img_w, config.img_h, 3),kernel_regularizer=regularizer,use_bias=False,kernel_constraint=constraint))
vision_model.add(BatchNormalization())
vision_model.add(Activation('relu'))
vision_model.add(Conv2D(64, (3, 3),kernel_regularizer=regularizer,use_bias=False,kernel_constraint=constraint))
vision_model.add(BatchNormalization())
vision_model.add(Activation('relu'))
vision_model.add(MaxPooling2D((2, 2)))
vision_model.add(Conv2D(128, (3, 3), padding='same',kernel_regularizer=regularizer,use_bias=False,kernel_constraint=constraint))
vision_model.add(BatchNormalization())
vision_model.add(Activation('relu'))
vision_model.add(Conv2D(128, (3, 3),kernel_regularizer=regularizer,use_bias=False,kernel_constraint=constraint))
vision_model.add(BatchNormalization())
vision_model.add(Activation('relu'))
vision_model.add(MaxPooling2D((2, 2)))
vision_model.add(Conv2D(256, (3, 3), padding='same',kernel_regularizer=regularizer,use_bias=False,kernel_constraint=constraint))
vision_model.add(BatchNormalization())
vision_model.add(Activation('relu'))
vision_model.add(Conv2D(256, (3, 3),kernel_regularizer=regularizer,use_bias=False,kernel_constraint=constraint))
vision_model.add(BatchNormalization())
vision_model.add(Activation('relu'))
vision_model.add(Conv2D(256, (3, 3),kernel_regularizer=regularizer,use_bias=False,kernel_constraint=constraint))
vision_model.add(BatchNormalization())
vision_model.add(Activation('relu'))
#vision_model.add(MaxPooling2D((2, 2)))
#vision_model.add(Flatten())
vision_model.add(GlobalAveragePooling2D())

image_input = Input(shape=(config.img_w, config.img_h, 3),name='image')
encoded_image = vision_model(image_input)

# Define RNN for language input
question_input = Input(shape=(config.max_length), dtype='int32',name='question')
embedded_question = Embedding(input_dim=config.wtoi + 1, output_dim=256)(question_input)
encoded_question = LSTM(256,kernel_constraint=constraint,recurrent_constraint=constraint)(embedded_question)

# Combine CNN and RNN to create the final model
merged = tf.keras.layers.concatenate([encoded_question, encoded_image])
output = Dense(config.num_classes, activation='softmax',kernel_regularizer=regularizer,use_bias=False,kernel_constraint=constraint)(merged)
model = Model(inputs=[image_input, question_input], outputs=output)
model.summary()

Model: "model_1"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
question (InputLayer)           [(None, 14)]         0                                            
__________________________________________________________________________________________________
embedding_1 (Embedding)         (None, 14, 256)      608768      question[0][0]                   
__________________________________________________________________________________________________
image (InputLayer)              [(None, 350, 200, 3) 0                                            
__________________________________________________________________________________________________
lstm_1 (LSTM)                   (None, 256)          525312      embedding_1[0][0]                
____________________________________________________________________________________________

# Visual Transfer model

In [None]:
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, GlobalAveragePooling2D , Input, LSTM, Embedding, Dense, Activation, BatchNormalization
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.constraints import MaxNorm
regularizer = tf.keras.regularizers.l2(1e-4)
constraint = MaxNorm(3)
# Define CNN for Image Input
vision_model = tf.keras.applications.MobileNetV3Large(include_top=False, input_shape=(config.img_w, config.img_h, 3), pooling="avg")
for layer in vision_model.layers:
  layer.trainable = False
for i in range(-6, 0):
  vision_model.layers[i].trainable = True
image_input = Input(shape=(config.img_w, config.img_h, 3),name='image')
encoded_image = vision_model(image_input)

# Define RNN for language input
question_input = Input(shape=(config.max_length), dtype='int32',name='question')
embedded_question = Embedding(input_dim=config.wtoi + 1, output_dim=256)(question_input)
encoded_question = LSTM(256,kernel_constraint=constraint,recurrent_constraint=constraint)(embedded_question)

# Combine CNN and RNN to create the final model
merged = tf.keras.layers.concatenate([encoded_question, encoded_image])
output = Dense(config.num_classes, activation='softmax',kernel_regularizer=regularizer,use_bias=False,kernel_constraint=constraint)(merged)
model = Model(inputs=[image_input, question_input], outputs=output)
model.summary()

# Visual and Text Transfer Learning model

In [None]:
# Vision Model comes from: https://towardsdatascience.com/metastasis-detection-using-cnns-transfer-learning-and-data-augmentation-684761347b59
from tensorflow.keras.layers import GRU, Dropout, Conv2D, MaxPooling2D, Flatten, GlobalAveragePooling2D , GlobalMaxPooling2D, Input, LSTM, Embedding, Dense, Activation, BatchNormalization, Bidirectional
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.constraints import MaxNorm
regularizer = None
constraint = None
# Define CNN for Image Input
vision_model = tf.keras.applications.MobileNetV3Large(include_top=False, input_shape=(config.img_w, config.img_h, 3))
for layer in vision_model.layers:
  layer.trainable = False
#for i in range(-6, 0):
#  vision_model.layers[i].trainable = True

image_input = Input(shape=(config.img_w, config.img_h, 3),name='image')
encoded_image = vision_model(image_input)
pool_1 = GlobalAveragePooling2D()(encoded_image)
pool_2 = GlobalMaxPooling2D()(encoded_image)
#flatten = Flatten()(encoded_image)
concat = tf.keras.layers.concatenate([pool_1, pool_2])
encoded_image = Dropout(0.3)(concat)
# Define RNN for language input
question_input = Input(shape=(config.max_length), dtype='int32',name='question')
embedded_question = Embedding(input_dim=config.wtoi + 1, weights=[embedding_matrix], output_dim=300, trainable=False)(question_input)
encoded_question = Bidirectional(GRU(256,
                                     kernel_constraint=constraint,
                                     recurrent_constraint=constraint,
                                     bias_constraint=constraint, 
                                     kernel_regularizer=regularizer, 
                                     activity_regularizer=regularizer,
                                     recurrent_regularizer=regularizer,
                                     ))(embedded_question)

# Combine CNN and RNN to create the final model
merged = tf.keras.layers.concatenate([encoded_question, encoded_image])
x = Dense(256, kernel_regularizer=regularizer, kernel_constraint=constraint)(merged)
x = BatchNormalization()(x)
x = Activation("relu")(x)
output = Dense(config.num_classes, activation='softmax',kernel_regularizer=regularizer,use_bias=False,kernel_constraint=constraint)(x)
model = Model(inputs=[image_input, question_input], outputs=output)
model.summary()

Model: "model_5"
__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
image (InputLayer)              [(None, 350, 200, 3) 0                                            
__________________________________________________________________________________________________
MobilenetV3large (Functional)   (None, 11, 7, 1280)  4226432     image[0][0]                      
__________________________________________________________________________________________________
question (InputLayer)           [(None, 14)]         0                                            
__________________________________________________________________________________________________
global_average_pooling2d_7 (Glo (None, 1280)         0           MobilenetV3large[0][0]           
____________________________________________________________________________________________

# Questions Classification Custom Dataset

In [None]:
#yes_or_no is class 0, counting is class 1, other is class 2

from collections import defaultdict
import random

def create_class_questions_dict():
  train_validation_dict = json.load(config.dataset_name.joinpath("train_questions_annotations.json").open())
  class_questions_dict = defaultdict(dict)
  counter = 0
  for v in train_validation_dict.values():
    if v['answer'] in {'yes','no'}:
      class_questions_dict[counter]["question"] = v['question']
      class_questions_dict[counter]["target"] = 0
    elif v['answer'] in {'0','1','2','3','4','5'}:
      class_questions_dict[counter]["question"] = v['question']
      class_questions_dict[counter]["target"] = 1
    else:
      class_questions_dict[counter]["question"] = v['question']
      class_questions_dict[counter]["target"] = 2
    counter += 1

  return class_questions_dict

#We need to split it and then to take from validation and training

def split_class_questions(config : Config):
  questions = create_class_questions_dict()
  keys = list(questions.keys())
  random.shuffle(keys)
  training_samples = int(len(keys) * config.split)
  training_keys = keys[0:training_samples]
  validation_keys = keys[training_samples:]
  training_dict = {}
  validation_dict = {}
  for k in training_keys:
    training_dict[k] = questions[k]

  for k in validation_keys:
    validation_dict[k] = questions[k]
  
  return training_dict,validation_dict



In [None]:
class CustomDatasetQuestions(tf.keras.utils.Sequence):
  def __init__(self,config : Config,which_subset : str):
    self.config = config
    training_dict, validation_dict = split_class_questions(config)
    self.data_dict = training_dict if which_subset == 'training' else validation_dict
    self.encoder_inputs = encode_input(self.data_dict)
    print(list(self.data_dict.items())[0:20])
  def __len__(self):
    return len(self.data_dict)

  def __getitem__(self, index):
    item_key = list(self.data_dict.keys())[index]
    target = self.data_dict[item_key]["target"]
    return self.encoder_inputs[index] , target

In [None]:
train_dataset = CustomDatasetQuestions(config,"training")
val_dataset = CustomDatasetQuestions(config,"validation")

import numpy as np
train_gen = tf.data.Dataset.from_generator(lambda: train_dataset,
                                               output_signature=(
                                                                 tf.TensorSpec(shape=(config.max_length),dtype=np.int32),
                                                                 tf.TensorSpec(shape=(),dtype=np.int32)
                                                                 ))

train_gen = train_gen.batch(config.batch_size)

train_gen = train_gen.repeat()

valid_gen = tf.data.Dataset.from_generator(lambda: val_dataset,
                                               output_signature=(
                                                                 tf.TensorSpec(shape=(config.max_length),dtype=np.int32),
                                                                 tf.TensorSpec(shape=(),dtype=np.int32)
                                                                 ))

valid_gen = valid_gen.batch(config.batch_size)

valid_gen = valid_gen.repeat()

[(24453, {'question': 'Is the man scared?', 'target': 0}), (56254, {'question': 'What kind of ball is this?', 'target': 2}), (8605, {'question': 'Are all the plants the same height?', 'target': 0}), (13510, {'question': 'Is it sunny?', 'target': 0}), (27204, {'question': 'Is it possible that the man is in danger?', 'target': 0}), (50584, {'question': 'What color is the couch?', 'target': 2}), (39959, {'question': 'What is the weather outside?', 'target': 2}), (19790, {'question': 'Is the woman feeling lonely?', 'target': 0}), (17273, {'question': 'Is the girl falling?', 'target': 0}), (26327, {'question': 'What colors are in the area rug?', 'target': 2}), (36199, {'question': 'What is the old man holding?', 'target': 2}), (57715, {'question': 'What is next to the dog?', 'target': 2}), (24570, {'question': 'Is the woman alone?', 'target': 0}), (51807, {'question': 'Is there a person on the table top?', 'target': 0}), (48578, {'question': 'Are all of the individuals featured in this pict

# Question Classification Model

In [None]:
from tensorflow.keras.layers import Conv2D, MaxPooling2D, Flatten, GlobalAveragePooling2D , Input, LSTM, Embedding, Dense, Activation, BatchNormalization, Bidirectional,GRU
from tensorflow.keras.models import Model, Sequential
from tensorflow.keras.constraints import MaxNorm

constraint = MaxNorm(3)
regularizer = tf.keras.regularizers.l1_l2(1e-2)
question_input = Input(shape=(config.max_length), dtype='int32',name='question')
embedded_question = Embedding(input_dim=config.wtoi + 1, output_dim=256)(question_input)
encoded_question = Bidirectional(GRU(16,kernel_constraint=constraint,recurrent_constraint=constraint, bias_constraint=constraint, bias_regularizer=regularizer, recurrent_regularizer=regularizer, kernel_regularizer=regularizer, activity_regularizer=regularizer))(embedded_question)
output = Dense(3, activation='softmax',kernel_regularizer=regularizer,use_bias=False,kernel_constraint=constraint)(encoded_question)
model = Model(inputs=question_input, outputs=output)
model.summary()

Model: "model_7"
_________________________________________________________________
Layer (type)                 Output Shape              Param #   
question (InputLayer)        [(None, 14)]              0         
_________________________________________________________________
embedding_7 (Embedding)      (None, 14, 256)           627456    
_________________________________________________________________
bidirectional_5 (Bidirection (None, 32)                26304     
_________________________________________________________________
dense_7 (Dense)              (None, 3)                 96        
Total params: 653,856
Trainable params: 653,856
Non-trainable params: 0
_________________________________________________________________


# Compilation

In [None]:
from focal_loss import SparseCategoricalFocalLoss

In [None]:
from focal_loss import SparseCategoricalFocalLoss
# Optimization params
# -------------------

# Loss
loss = SparseCategoricalFocalLoss(gamma=2)

# learning rate
lr = 1e-3
optimizer = tf.keras.optimizers.Adam(learning_rate=lr)
# -------------------

# Validation metrics
# ------------------

metrics = ['sparse_categorical_accuracy']
# ------------------

# Compile Model
model.compile(optimizer=optimizer, loss=loss, metrics=metrics)

# Callback

In [None]:
from datetime import datetime
from tensorflow.keras.callbacks import ReduceLROnPlateau 
import os 

exps_dir = Path("/").joinpath("content", "drive", "MyDrive", "Colab Notebooks", "Homework3","Results")
exps_dir.mkdir(parents=True, exist_ok=True)

now = datetime.now().strftime('%b%d_%H-%M-%S')

model_name = 'NaiveModel_' + config.questions

exp_dir = Path(exps_dir).joinpath(model_name + '_' + str(now))
exp_dir.mkdir(parents=True, exist_ok=True)
    
callbacks = []

# Model checkpoint
ckpt_dir = Path(exp_dir).joinpath('ckpts')
ckpt_dir.mkdir(parents=True, exist_ok=True)

ckpt_callback = tf.keras.callbacks.ModelCheckpoint(filepath=os.path.join(ckpt_dir, 'cp_{epoch:02d}.ckpt'), 
                                                   save_weights_only=True)  # False to save the model directly
callbacks.append(ckpt_callback)

# Visualize Learning on Tensorboard
# ---------------------------------
tb_dir = Path(exp_dir).joinpath('tb_logs')
tb_dir.mkdir(parents=True, exist_ok=True)
    
# By default shows losses and metrics for both training and validation
tb_callback = tf.keras.callbacks.TensorBoard(log_dir=tb_dir,
                                             profile_batch=0,
                                             histogram_freq=0)  # if 1 shows weights histograms
callbacks.append(tb_callback)

# Early Stopping
# --------------
early_stop = True
if early_stop:
    es_callback = tf.keras.callbacks.EarlyStopping(monitor='val_loss', patience=5, restore_best_weights=True)
    callbacks.append(es_callback)

# Learning Rate Annhealing
learning_rate_reduction=ReduceLROnPlateau(monitor='val_sparse_categorical_accuracy', patience=3, verbose=1, factor=0.5, min_lr=1e-6)

lr_scheduling = True
if lr_scheduling:
    callbacks.append(learning_rate_reduction)

# Model Fit

In [None]:

steps_train = len(train_dataset) // config.batch_size 
steps_val = len(val_dataset) // config.batch_size 
model.fit(x=train_gen,
          epochs=100,  #### set repeat in training dataset
          steps_per_epoch=steps_train,
          validation_data=valid_gen,
          validation_steps=steps_val, 
          callbacks=callbacks)

# Model prediction

In [None]:
import os
from datetime import datetime
import numpy as np


def create_csv(results, results_dir='./'):

    csv_fname = 'results_'
    csv_fname += datetime.now().strftime('%b%d_%H-%M-%S') + '.csv'

    with open(os.path.join(results_dir, csv_fname), 'w') as f:

        f.write('Id,Category\n')

        for key, value in results.items():
            f.write(key + ',' + str(value) + '\n')

def prediction(model):
  test_questions = json.load(config.dataset_name.joinpath("test_questions.json").open())
  questions = encode_input(test_questions)
  counter = 0
  results = {}
  for k,v in test_questions.items():
    image = Image.open(Path(config.dataset_name).joinpath("Images",v["image_id"] + ".png"))
    item_image = image.convert("RGB")
    item_image = item_image.resize((config.img_w,config.img_h))
    item_image_arr = np.array(item_image).transpose((1, 0, 2))
    item_image_arr = np.float32(item_image_arr) / 255.0
    item_image_arr = tf.expand_dims(item_image_arr,axis=0)
    input_dict = {'image':item_image_arr , 'question':tf.expand_dims(questions[counter],axis=0)}
    results[k] = model.predict(input_dict).argmax(axis=-1)[0]
    counter +=1
  create_csv(results)

def prediction_ensemble(config,yes_or_no_model,counting_model,other_model,classifier_model):
  test_questions = json.load(config.dataset_name.joinpath("test_questions.json").open())
  questions = encode_input(test_questions)
  counter = 0
  results = {}
  for k,v in test_questions.items():
    image = Image.open(Path(config.dataset_name).joinpath("Images",v["image_id"]+".png"))
    item_image = image.convert("RGB")
    item_image = item_image.resize((config.img_w,config.img_h))
    item_image_arr = np.array(item_image).transpose((1, 0, 2))
    item_image_arr = np.float32(item_image_arr) / 255.0
    question_type = classifier_model.predict(questions[counter]).argmax(axis=-1)[0]
    input_dict = {'image':tf.expand_dims(item_image_arr,axis=0) , 'question':tf.expand_dims(questions[counter],axis=0)}
    if question_type == 0:
      results[k] = yes_or_no_model.predict(input_dict).argmax(axis=-1)[0]
    elif question_type == 1:
      results[k] = counting_model.predict(input_dict).argmax(axis=-1)[0]
    else:
      results[k] = other_model.predict(input_dict).argmax(axis=-1)[0]
    counter +=1
  create_csv(results)



In [None]:
prediction_ensemble(config,yes_or_no_model,model_counting,model_other,question_classifier)