In [None]:
import os
import json
import numpy as np
from tqdm import tqdm
import pandas as pd
import nltk
nltk.download('punkt')
from nltk.tokenize import word_tokenize

import tensorflow as tf
from tensorflow import keras
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.layers.experimental.preprocessing import TextVectorization
from tensorflow.keras.preprocessing.image import ImageDataGenerator
from tensorflow.keras.preprocessing.text import Tokenizer
from tensorflow.keras.preprocessing.sequence import pad_sequences
from tensorflow.keras.layers import *
from tensorflow.keras import Model
from tensorflow.keras.applications.vgg16 import VGG16
from tensorflow.keras.preprocessing import image
from tensorflow.keras.optimizers import Adam

In [None]:
def gen_df(v_path, q_path, a_path, to_disk=None):
    V_PATH, Q_PATH, A_PATH = v_path, q_path, a_path

    id_tuples = []
    questions = {}
    train_data = []

    with open(A_PATH) as f:
        data = json.load(f)
        for annotation in data['annotations']:
            if annotation['answer_type'] == 'yes/no':
                id_tuples.append(
                    (annotation['image_id'], 
                    annotation['question_id'],  
                    annotation['multiple_choice_answer'])
                )
    
    with open(Q_PATH) as f:
        data = json.load(f)
        for question in data['questions']:
            questions[question['question_id']] = question['question']
            
    for id_tuple in tqdm(id_tuples):
        question = questions[id_tuple[1]]
        img = V_PATH + 'COCO_train2014_' + str(id_tuple[0]).zfill(12) + '.jpg'
        train_data.append((img, question, id_tuple[-1]))

    df = pd.DataFrame(data=train_data, columns=['Image', 'Question', 'Answer'])
    df = df[(df.Answer == 'yes') | (df.Answer == 'no')]
    if to_disk:
        df.to_pickle(to_disk)
    
    return df

def gen_embedding_matrix(df, glove_path, to_disk=None, verbose=True):
    EMBEDDING_DIM = 300

    vectorizer = TextVectorization(output_sequence_length=100)
    vectorizer.adapt(df['Question'].to_numpy())
    voc = vectorizer.get_vocabulary()
    word_index = dict(zip(voc, range(len(voc))))

    glove_df = pd.read_csv(glove_path, sep=" ", quoting=3, header=None, index_col=0)
    embeddings_index = {key: val.values for key, val in glove_df.T.items()}
    embedding_matrix = np.zeros((len(voc), EMBEDDING_DIM))

    misses = 0
    hits = 0
    for word, i in tqdm(word_index.items(), desc="Generating matrix", disable=not verbose):
        embedding_vector = embeddings_index.get(word)
        if embedding_vector is not None:
            # Words not found in embedding index will be all-zeros.
            # This includes the representation for "padding" and "OOV"
            embedding_matrix[i] = embedding_vector
            hits += 1
        else:
            misses += 1

    if verbose:
        print("Converted %d words (%d misses)" % (hits, misses))
        print("Shape of the embedding matrix is: ", end='')
        print(embedding_matrix.shape)
    if to_disk:
        np.save(to_disk, embedding_matrix)

    return embedding_matrix, vectorizer

def gen_data_iterator(df, vectorizer, img_target_size, batch_size):
    datagen = ImageDataGenerator(rescale=1./255)
    generator = datagen.flow_from_dataframe(
        dataframe=df, 
        directory='.', 
        x_col='Image', 
        y_col='Answer',  
        target_size=img_target_size,
        class_mode='binary',
        batch_size=batch_size,
        shuffle=False
    )
    for i, (image, answer) in enumerate(generator):
        questions = df[i*generator.batch_size:(i+1)*(generator.batch_size)]['Question'].to_numpy()
        questions = vectorizer(questions)
        yield (image, questions), answer

train_df = gen_df('./VQA/Images/train2014/',
    './VQA/Questions/v2_OpenEnded_mscoco_train2014_questions.json',
    './VQA/Annotations/v2_mscoco_train2014_annotations.json')
train_df.head()

mtx, vectorizer = gen_embedding_matrix(train_df, 'glove.6B.300d.txt', to_disk='./embedding_matrix', verbose=True)
print(mtx.shape)

data_gen = gen_data_iterator(train_df, vectorizer, (128, 128), 64)


In [None]:
import matplotlib.pyplot as plt
idx, b_idx = np.random.randint(len(train_df)), np.random.randint(64)
(i, q), a = next(data_gen)
print(q[b_idx], a[b_idx])
plt.imshow(i[b_idx])

In [None]:
img_input_shape = (128, 128, 3)
embedding_matrix = np.load('./embedding_matrix.npy')
print(embedding_matrix.shape)
embedding_layer = Embedding(
    embedding_matrix.shape[0],
    embedding_matrix.shape[1],
    embeddings_initializer=keras.initializers.Constant(embedding_matrix),
    trainable=False,
)
# Image 
vgg = VGG16(
    include_top=False,
    weights='imagenet',
    input_shape=img_input_shape,
    pooling=True
)

for layer in vgg.layers: layer.trainable = False

img_x = vgg.output
img_x = Flatten()(img_x)
img_x = BatchNormalization()(img_x)
img_x = Dense(1024, activation='relu')(img_x)
img_x = BatchNormalization()(img_x)
img_output = Dense(1024, activation='relu')(img_x)

# Question
qstn_input = Input(shape=(None,), dtype="int64")
qstn_x = embedding_layer(qstn_input)
qstn_x = LSTM(64, activation='tanh')(qstn_x)
qstn_x = BatchNormalization()(qstn_x)
qstn_x = Dense(1024, activation='relu')(qstn_x)
qstn_x = BatchNormalization()(qstn_x)
qstn_output = Dense(1024, activation='relu')(qstn_x)
# qstn_input = Input(shape=(1,), dtype=tf.string)
# qstn_x = vectorizer(qstn_input)
# qstn_x = embedding_layer(qstn_x)
# qstn_x = LSTM(64, activation='tanh')(qstn_x)
# qstn_x = BatchNormalization()(qstn_x)
# qstn_x = Dense(1024, activation='relu')(qstn_x)
# qstn_x = BatchNormalization()(qstn_x)
# qstn_output = Dense(1024, activation='relu')(qstn_x)

concat = Concatenate(axis=1)([img_output, qstn_output])
x = Dense(1024, activation='relu')(concat)
x = BatchNormalization()(x)
x = Dense(512, activation='relu')(x)
x = BatchNormalization()(x)
x = Dense(512, activation='relu')(x)
x = BatchNormalization()(x)
output = Dense(1, activation='sigmoid')(x)

model = Model(
    inputs=[vgg.input, qstn_input], 
    outputs=output, 
    name='BiModal_VQA'
)

model.summary()

In [None]:
model.compile(
    optimizer=Adam(lr=1e-6),
    loss='binary_crossentropy',
    metrics='accuracy'
)
model.fit(x=data_gen, epochs=1)

In [None]:
haha = 18
print(a[haha])
print(train_df['Question'][haha])
print(type(a))