# IVA Arabic Visual Question Answering Deep Learning Model

A model based on the paper "Tips and Tricks for Visual Question Answering: Learnings from the 2017 Challenge": 
https://openaccess.thecvf.com/content_cvpr_2018/papers/Teney_Tips_and_Tricks_CVPR_2018_paper.pdf 

Trained using VQA translated annotations using Google Translate and COCO dataset: https://visualqa.org/vqa_v1_download.html

And using word embeddings of AraVec: https://github.com/bakrianoo/aravec

### First we import the required libraries

In [None]:
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
import re
import numpy as np
import os
import time
import json
from glob import glob
from PIL import Image
import pickle
import csv
import sys
import base64
import nltk
from keras.utils import to_categorical
import keras.backend as KB
import pandas as pd
from keras.preprocessing import image
import gensim
import nltk
import pandas as pd
import tqdm 

### A helper function to download our data

In [None]:
def download(folder_name, origin):
    if not os.path.exists(os.path.abspath('.') + '/' + folder_name):
        file_zip = tf.keras.utils.get_file(folder_name + '.zip',
                                           cache_subdir=os.path.abspath('.'),
                                           origin = origin,
                                           extract = True)
        PATH = os.path.dirname(file_zip)+'/' + folder_name + '/'
    else:
        PATH = os.path.abspath('.')+'/' + folder_name + '/'
    os.remove(folder_name + '.zip')

In [None]:
# Downloading word embeddings from AraVec resource
download('arabic', 'https://bakrianoo.s3-us-west-2.amazonaws.com/aravec/full_grams_cbow_300_twitter.zip')

In [None]:
# Download image files. Make sure to download them once only and to have 20 GB of free space!
download('train2014', 'http://images.cocodataset.org/zips/train2014.zip')
download('val2014', 'http://images.cocodataset.org/zips/val2014.zip')

### Function to clean strings

In [None]:
# This function is used by AraVec to convert ى to ي and all types of أ آ to ا and similar stuff 
# to avoid errors resulting from misspelling

def clean_str(text):
    search = ["أ","إ","آ","ة","_","-","/",".","،"," و "," يا ",'"',"ـ","'","ى","\\",'\n', '\t','&quot;','?','؟','!']
    replace = ["ا","ا","ا","ه"," "," ","","",""," و"," يا","","","","ي","",' ', ' ',' ',' ? ',' ؟ ',' ! ']
    
    #remove tashkeel
    p_tashkeel = re.compile(r'[\u0617-\u061A\u064B-\u0652]')
    text = re.sub(p_tashkeel,"", text)
    
    #remove longation
    p_longation = re.compile(r'(.)\1+')
    subst = r"\1\1"
    text = re.sub(p_longation, subst, text)
    
    text = text.replace('وو', 'و')
    text = text.replace('يي', 'ي')
    text = text.replace('اا', 'ا')
    
    for i in range(0, len(search)):
        text = text.replace(search[i], replace[i])
    
    #trim    
    text = text.strip()
   

    text = str(text)
    text = re.sub(r'\bال(\w\w+)', r'\1', text)

    return text

In [None]:
# Creating our embedding vector
word2vec = gensim.models.Word2Vec.load('full_grams_cbow_300_twitter.mdl')

## Reading the data

In [None]:
# These lists will hold all the data both training and validation
all_questions = []
all_answers = []
all_images = []
all_img_names = [] # Full path image name

In [None]:
train_data = pd.read_excel('data/train.xlsx', engine='openpyxl')

PATH = 'train2014/'
train_questions = train_data['questions']
train_answers = train_data['answers']
train_images = train_data['images']

for names in train_images:
    image_path = PATH + names + '.jpg'
    all_img_names.append(image_path)
    
train_data.head()

In [None]:
# Appending training data to the general list
for i in range(len(train_questions)):
    all_questions.append(train_questions[i])
    all_answers.append(train_answers[i])
    all_images.append(train_images[i])

In [None]:
# Sample:
index = 100
plt.imshow(image.load_img(all_img_names[index]))
all_questions[index], ' ', all_answers[index]

###### Of course one downside of the data is that it's open-ended and not based on simple visual descriptions, unfortunately half of the data 
###### is like this and the model could do a lot better to help visually-impaired people if it's trained on simpler samples, DL is not that smart yet!

In [None]:
# Now we read validation data the same way and append it to the general list as well
val_data = pd.read_excel('data/val.xlsx', engine='openpyxl')

PATH = 'val2014/'
val_questions = val_data['questions']
val_answers = val_data['answers']
val_images = val_data['images']

for names in val_images:
    image_path = PATH + names + '.jpg'
    all_img_names.append(image_path)

In [None]:
for i in range(len(val_questions)):
    all_questions.append(val_questions[i])
    all_answers.append(val_answers[i])
    all_images.append(val_images[i])

### Cleaning the strings in all questions and answers

In [None]:
for i in range(0, len(all_answers)):
    all_questions[i] = clean_str(all_questions[i])
    try:
        all_answers[i] = clean_str(all_answers[i])
    except TypeError:
        continue

In [None]:
# An optional step in case you want to save the data to load it and save the processing time later

df = pd.DataFrame()
df['questions'] = all_questions
df['answers'] = all_answers
df['images'] = all_images
df.to_excel('data/all_cleaned.xlsx', index=False)

#### The model doesn't do well with counting. Hence, this step is aimed at replacing all numbers greater than three with the word كثير

In [None]:
cnt = 0
for i in range(0, len(all_answers)):
    try:
        if int(all_answers[i]) > 3:
            all_answers[i] = 'كثير'
    except ValueError:
        continue

## Data Processing

### Word tokenization

In [None]:
#Choose the top 40000 words from the vocabulary
top_k = 40000  
#we make one hot encoding (vector with a length = dictionary length) for the words from the dictionary 
ques_tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=top_k,
                                                  oov_token="<unk>",
                                                  filters='؟!?"#$%&()*+.,-/:;=@[\]^_`{|}~ ')
ques_tokenizer.fit_on_texts(train_questions)

#word_index is the dictionary or vocaulary of words 
ques_word_index = ques_tokenizer.word_index
ques_index_word = ques_tokenizer.index_word
print('Found %s unique tokens.' % len(ques_word_index))

ques_tokenizer.word_index['<pad>'] = 0
ques_tokenizer.index_word[0] = '<pad>'

# Create the tokenized vectors
question_seqs = ques_tokenizer.texts_to_sequences(train_questions)

# Pad each vector to the max_length of the captions
# If you do not provide a max_length value, pad_sequences calculates it automatically
question_vector = tf.keras.preprocessing.sequence.pad_sequences(question_seqs, padding='post', truncating='post', maxlen=15)

In [None]:
# Sample:
ind = 1002
train_data['questions'][ind], '  ', question_vector[ind] 

In [None]:
# We save the tokenizer to use it later in testing or when creating an API
tokenizer_json = ques_tokenizer.to_json()
with open('data/vqa_tokenizer.json', 'w', encoding='utf-8') as f:
    f.write(json.dumps(tokenizer_json, ensure_ascii=False))

### Creating the embedding matrix

In [None]:
EMBEDDING_DIM = 300
embedding_matrix = np.zeros((len(ques_word_index) + 1, EMBEDDING_DIM))
for word, i in tqdm(ques_word_index.items()):
    if word in word2vec:
        embedding_vector = word2vec[word]
        embedding_matrix[i] = embedding_vector
    # We ignore words not in the embedding vector

#### Creating a dictionary holding the frequency of each answer, and then sorting the dictionary to take only the most common 1000 answers
This approach has proven to provide the best result and about 30,000 samples from 440,000 samples weren’t
included if we only take questions with answers residing in these 1000

In [None]:
ans_freq = {}
for answer in train_answers:
    if answer not in ans_freq:
        ans_freq[answer] = 1
    else:
        ans_freq[answer] += 1

In [None]:
ans_freq = {k: v for k, v in sorted(ans_freq.items(), key=lambda item: item[1], reverse=True)}

In [None]:
# tokenizing the answers manually, topAnsWordIndex holds the mapping from an answer to its encoded value
# topAnsIndexWord maps the index number to its encoded answer

topAnsWordIndex = {}
topAnsIndexWord = []

cnt = 0
for ans in ans_freq.keys():
    topAnsWordIndex[ans] = cnt
    topAnsIndexWord.append(ans)
    
    cnt += 1
    if cnt==1000:
        break

In [None]:
# We save this tokens list to use it later when testing the model or when creating an API
pickle.dump(topAnsIndexWord, open('data/topAnsIndexWord.pkl', 'wb'))

In [None]:
# Creating the output layer of the model holding the index token of the specific answer
# for example if the answer of the first question is "نعم" and this answer has a tokenization resulting from
# the manual approach above of "2" then ans_encoded will have a value of 2 corresponding to that question

ans_encoded = []
for answer in train_answers:
    if answer in topAnsWordIndex:
        ans_encoded.append(topAnsWordIndex[answer])
    else:
        ans_encoded.append(-1)

### Image Feature Extraction

Various image models have been tried, Xception had the better outcome by a small margin.

In [None]:
image_model = tf.keras.applications.xception.Xception(include_top=False, weights='imagenet')
new_input = image_model.input
hidden_layer = image_model.layers[-1].output

image_features_extract_model = tf.keras.Model(new_input, hidden_layer)

In [None]:
def load_image(image_path):
    img = tf.io.read_file(image_path)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.image.resize(img, (299, 299))
    img = tf.keras.applications.xception.preprocess_input(img)
    return img, image_path

In [None]:
# Get unique images feature matrix, make sure to have at least 70 GB of free space!

from tqdm import tqdm
encode = sorted(set(all_img_names))
# Feel free to change batch_size according to your system configuration
image_dataset = tf.data.Dataset.from_tensor_slices(encode)

image_dataset = image_dataset.map(
  load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE).batch(64)

for img, path in tqdm(image_dataset):
    batch_features = image_features_extract_model(img)
    batch_features = tf.reshape(batch_features,
                              (batch_features.shape[0], -1, batch_features.shape[3]))
  
    for bf, p in zip(batch_features, path):
        path_of_feature = p.numpy().decode("utf-8")
        np.save('features/'+path_of_feature[10:-4], bf.numpy())

In [None]:
# Xception sample dimensions
sample = np.load('features/COCO_train2014_000000000009.npy')
print(type(sample))
sample.shape

## The Model

This is the layout of the model used, a small modifications to the model used in the referenced paper:

![title](data/model_layout.jpg)

In [None]:
seq_length = 15        # Largest squence used for questions word count
embedding_dim = 300    # the dimension of our embedding matrix
hidden = 1024          # Number of hidden units
K = 100                # The K that is the second dimention of the image feature, from the sample above its (1, 100, 2048)
v_dim = 2048
OutputDim = len(topAnsWordIndex)  # Output layer dimentions

A data generator class to use batch training:

In [None]:
class DataGenerator(tf.keras.utils.Sequence):
    'Generates data for Keras'
    def __init__(self, list_IDs, batch_size=128, dim1=(K, v_dim), dim2=(seq_length,), dim3=(OutputDim,),
                 n_classes=OutputDim, shuffle=True):
        'Initialization'
        self.dim1 = dim1
        self.dim2 = dim2
        self.dim3 = dim3
        self.batch_size = batch_size
        self.list_IDs = list_IDs
        self.n_classes = n_classes
        self.shuffle = shuffle
        self.on_epoch_end()

    def __len__(self):
        'Denotes the number of batches per epoch'
        return int(np.floor(len(self.list_IDs) / self.batch_size))

    def __getitem__(self, index):
        'Generate one batch of data'
        # Generate indexes of the batch
        indexes = self.indexes[index*self.batch_size:(index+1)*self.batch_size]

        # Find list of IDs
        list_IDs_temp = [self.list_IDs[k] for k in indexes]

        # Generate data
        X1, X2, y = self.__data_generation(list_IDs_temp)

        return [X1, X2], y

    def on_epoch_end(self):
        'Updates indexes after each epoch'
        self.indexes = np.arange(len(self.list_IDs))
        if self.shuffle == True:
            np.random.shuffle(self.indexes)

    def __data_generation(self, list_IDs_temp):
        'Generates data containing batch_size samples' 
        # Initialization
        X1 = np.empty((self.batch_size, *self.dim1))
        X2 = np.empty((self.batch_size, *self.dim2))
        y = np.empty((self.batch_size, *self.dim3), dtype=int)

        # Generate data
        for i, ID in enumerate(list_IDs_temp):
            # Store sample
            X1[i, :, :] = np.load('features/' + images[int(ID)]+ '.npy')
            X2[i,] = question_vector[int(ID)]
            # Store class
            temp = ans_encoded[int(ID)]
            # Our output was encoded answers as one-number values, we convert it into a one-hot vector
            y[i,] = to_categorical(temp, OutputDim)

        return X1, X2, y

In [None]:
# We only want samples that has answers in the top 1000, so we make a list of only these samples to use in the training
ALL = len(all_images)
lst = [str(i) for i in range(ALL) if ans_encoded[i] != -1]
ALL = len(lst)
ALL

In [None]:
# Creating batches for training and validation data, of course this division has nothing to do with the train and val data 
# we read, we basically stacked them all together and then are using a smaller subset as validation

training = DataGenerator(lst[:ALL-30000])
validation = DataGenerator(lst[ALL-30000:])

#### The model architecture implementation

In [None]:
image_input = tf.keras.layers.Input(shape=(K, v_dim), name='image_input')

question_input = tf.keras.layers.Input(shape=(seq_length,), dtype='int32', name='question_input')
embedding = tf.keras.layers.Embedding(embedding_matrix.shape[0], embedding_dim, weights=[embedding_matrix],
                                     input_length=seq_length, trainable=True, name='embedding')(question_input)

rnn = tf.keras.layers.LSTM(units=hidden, name='rnn')(embedding)
concat = tf.keras.layers.concatenate([image_input, tf.keras.backend.repeat(rnn, K)], name='concat')
fc1 = tf.keras.layers.Dense(hidden, activation='relu', name='fc1')(concat)
fc2 = tf.keras.layers.Dense(1, activation='softmax', name='fc2')(fc1)

add = tf.keras.layers.Dot(axes=1, name='add')([fc2, image_input])
add = tf.reshape(add, (-1, add.shape[2]))
drop1 = tf.keras.layers.Dropout(0.3, name='drop1')(add)
fc3 = tf.keras.layers.Dense(hidden, activation='relu', name='fc3')(drop1)
fc4 = tf.keras.layers.Dense(hidden, activation='relu', name='fc4')(rnn)

mul = tf.keras.layers.Multiply()([fc3, fc4])
fc5 = tf.keras.layers.Dense(hidden+hidden, activation='relu', name='fc5')(mul)
drop = tf.keras.layers.Dropout(0.5, name='dropout')(fc5)
fc6 = tf.keras.layers.Dense(OutputDim, activation='softmax', name='fc6')(drop)

model = tf.keras.models.Model(inputs=[image_input, question_input], outputs=fc6)
model.summary()

In [None]:
loss_object = tf.keras.losses.CategoricalCrossentropy()
model.compile(optimizer=tf.keras.optimizers.Adamax(), loss=loss_object, metrics=['accuracy'])

### *** It is recommended to use cloud resources for training as it will take a long time and can cause damage to even a good laptop, be careful. ***

In [None]:
model.fit(training, validation_data=validation, epochs=10, verbose=1)

In [None]:
# Well, we have to save it!
model.save('data/themodel.h5')

## Testing the Model

In [None]:
def test(img_path, question):
    plt.imshow(image.load_img(img_path))
    img = image.load_img(img_path, target_size=(299, 299))

    # Getting the features of the matrix using the same image model
    x = image.img_to_array(img)
    x = np.expand_dims(x, axis=0)
    x = tf.keras.applications.xception.preprocess_input(x)

    features = image_model.predict(x)
    features.shape
    X1 = features.reshape((1, 10*10, -1))

    # Tokenizing the question
    ques = clean_str(question)
    X2 = ques_tokenizer.texts_to_sequences([ques])
    X2 = tf.keras.preprocessing.sequence.pad_sequences(X2, padding='post',truncating='post',maxlen=15)

    pred = model.predict([X1, X2])
    print('Question: ', question)
    
    # Sorting the predictions based on higher probabilities and printing the top 3
    pred2 = pred[0].argsort()[-5:][::-1]
    print('ِProbability: \t\t Answer:')
    cnt = 0
    for i in pred2:
        if pred[0][i] > 0.01:
            print(pred[0][i], '\t\t', topAnsIndexWord[i])
        cnt += 1
        if cnt == 3:
            break

In [None]:
test('data/test_image.jpg', 'ماذا في الخلفية؟')

### Hope you found this useful and hope you can improve on it in the future!