In [1]:
import numpy as np
import json, nltk, keras
from tqdm import tqdm

Using TensorFlow backend.


In [2]:
# load vqa annotations
vqa_annot = {'train': {}, 'val': {}, 'test': {}}
for split in ['train', 'val', 'test']:
    with open('vqa_annotations/{}.json'.format(split), encoding="utf8") as file:
        annot = json.load(file)
        for entry in annot:
            temp = {'question': entry['question']} 
            if split != 'test':
                temp['answerable'] = float(entry['answerable'])
            vqa_annot[split][entry['image']] = temp

In [3]:
THRESHOLD = 2
# load quality annotations
quality_annot = {'train': {}, 'val': {}, 'test': {}}
for split in ['train', 'val', 'test']:
    with open('quality_annotations/{}.json'.format(split)) as file:
        annot = json.load(file)
        for entry in annot:
            temp = {} 
            if split != 'test':
                flaws = entry['flaws']
                # check flaws.keys() 
                temp['flaws'] = np.array(list(flaws.values())) >= THRESHOLD
                temp['recognizable'] = float(1 - (entry['unrecognizable'] >= THRESHOLD))
            quality_annot[split][entry['image']] = temp

In [4]:
# merge vqa and quality annotations
# note that vqa dataset is smaller than quality dataset and
# 1. vqa training set is NOT exactly a subset of quality training set
# 2. vqa validation set is a subset of quality validation set
# 3. vqa testing set is the same as quality testing set
merged_annot = {'train': {}, 'val': {}, 'test': {}}
for split in ['train', 'val']:
    vqa_split, quality_split = vqa_annot[split], quality_annot[split]
    for fname in vqa_split:
        if quality_split.get(fname):
            merged_annot[split][fname] = {**vqa_split[fname], **quality_split[fname]}
merged_annot['test'] = vqa_annot['test'].copy()

In [5]:
# convert quality_annot/merged_annot from dictionary to array for further use
# First, we need a function to numerically encode questions

vocab = json.load(open('./utils/word2vocab_vizwiz.json'))
def encode_sentence(sentence, vocab=vocab, max_len = 14):
    unk_word = '<UNK>'
    tokens = nltk.word_tokenize(sentence.lower())
    tokens_id = [vocab.get(x, vocab[unk_word]) + 1 for x in tokens] # +1 to reserve 0 for zero paddings
    padded_tokens_id = keras.preprocessing.sequence.pad_sequences(
                        [tokens_id], maxlen=max_len, padding='post', truncating='post')

    return padded_tokens_id[0]

In [6]:
quality_annot_array = {}
for split in ['train', 'val', 'test']:
    annot = {'image': [], 'flaws': [], 'recognizable': []}
    _split = quality_annot[split]
    for fname in _split:
        annot['image'].append(fname)        
        if split != 'test':            
            annot['flaws'].append(_split[fname].get('flaws').tolist()) 
            annot['recognizable'].append([_split[fname].get('recognizable')])
    quality_annot_array[split] = annot
    
with open('data/quality.json', 'w') as outfile:
    json.dump(quality_annot_array, outfile)

In [7]:
merged_annot_array = {}
for split in ['train', 'val', 'test']:
    annot = {'image': [], 'answerable': [], 'flaws': [], 'question': [], 'recognizable': []}
    _split = merged_annot[split]
    for fname in _split:
        annot['image'].append(fname)
        annot['question'].append(encode_sentence(_split[fname].get('question')).tolist())
        if split != 'test':
            annot['answerable'].append([_split[fname].get('answerable')])
            annot['flaws'].append(_split[fname].get('flaws').tolist()) 
            annot['recognizable'].append([_split[fname].get('recognizable')])
    merged_annot_array[split] = annot
    
with open('data/vqa_quality_merger.json', 'w') as outfile:
    json.dump(merged_annot_array, outfile)