In [1]:
from tensorflow.keras.layers import Input, Conv2D, MaxPooling2D, Flatten, Dense
from easy_vqa import get_train_questions, get_test_questions, get_answers, get_train_image_paths, get_test_image_paths
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.models import Model
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.preprocessing.image import load_img, img_to_array
from tensorflow.keras.utils import to_categorical
from tensorflow.keras.callbacks import ModelCheckpoint
import numpy as np

os.environ['TF_FORCE_GPU_ALLOW_GROWTH'] = 'true'

In [2]:
# The CNN
im_input = Input(shape=(64, 64, 3))
x1 = Conv2D(8, 3, padding='same')(im_input)
x1 = MaxPooling2D()(x1)
x1 = Conv2D(16, 3, padding='same')(x1)
x1 = MaxPooling2D()(x1)
x1 = Flatten()(x1)
# Add a final fully-connected layer after the CNN for good measure
x1 = Dense(32, activation='tanh')(x1)

In [3]:
# Read questions
# train_qs and test_qs are just arrays of question strings
# (we'll use the other variables later)
train_qs, train_answers, train_image_ids = get_train_questions()
test_qs, test_answers, test_image_ids = get_test_questions()

all_answers = get_answers()
num_answers = len(all_answers)


# Fit tokenizer on the training questions
tokenizer = Tokenizer()
tokenizer.fit_on_texts(train_qs)

# Convert questions to BOW
train_X_seqs = tokenizer.texts_to_matrix(train_qs)
test_X_seqs = tokenizer.texts_to_matrix(test_qs)

# Example BOW:
# [0 0 1 1 0 1 0 0 1 0 0 1 1 0 0 0 0 0 0 0 0 0 0 0 0 0 0]
print(train_X_seqs[0])

[0. 1. 1. 1. 0. 0. 0. 0. 1. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0. 0.
 0. 1. 0.]


In [4]:
# We add one because the Keras Tokenizer reserves index 0 and never uses it.
vocab_size = len(tokenizer.word_index) + 1

# The question network
q_input = Input(shape=(vocab_size,))
x2 = Dense(32, activation='tanh')(q_input)
x2 = Dense(32, activation='tanh')(x2)

In [5]:
from tensorflow.keras.layers import Multiply
out = Multiply()([x1, x2])

In [6]:
out = Dense(32, activation='tanh')(out)
# num_answers will be defined below
out = Dense(num_answers, activation='softmax')(out)

In [7]:
model = Model(inputs=[im_input, q_input], outputs=out)
model.compile(
  Adam(lr=2e-4), # somewhat arbitrarily chosen
  loss='categorical_crossentropy',
  metrics=['accuracy'],
)

In [8]:
def load_and_proccess_image(image_path):
  # Load image, then scale and shift pixel values to [-0.5, 0.5]
  im = img_to_array(load_img(image_path))
  return im / 255 - 0.5

def read_images(paths):
  # paths is a dict mapping image ID to image path
  # Returns a dict mapping image ID to the processed image
  ims = {}
  for image_id, image_path in paths.items():
    ims[image_id] = load_and_proccess_image(image_path)
  return ims

train_ims = read_images(get_train_image_paths())
test_ims = read_images(get_test_image_paths())

In [9]:
# Create model input images
train_X_ims = [train_ims[id] for id in train_image_ids]
test_X_ims = [test_ims[id] for id in test_image_ids]

train_X_ims = np.array(train_X_ims)
test_X_ims = np.array(test_X_ims)

# Create model outputs
train_answer_indices = [all_answers.index(a) for a in train_answers]
test_answer_indices = [all_answers.index(a) for a in test_answers]
train_Y = to_categorical(train_answer_indices)
test_Y = to_categorical(test_answer_indices)

train_Y = np.array(train_Y)
test_Y = np.array(test_Y)

In [10]:
checkpoint = ModelCheckpoint('model.h5', save_best_only=True)

In [12]:
# Train the model!
model.fit(
  
  [train_X_ims, train_X_seqs],
  train_Y,
  validation_data=([test_X_ims, test_X_seqs], test_Y),
  shuffle=True,
  epochs=100,
  callbacks=[checkpoint],
)