In [1]:
import numpy as np
import pandas
import matplotlib.pyplot as plt
import os
import json
import random
import cv2
from json import JSONEncoder
import shutil
import skimage

# VizWiz

[Data for quality issues](https://vizwiz.org/tasks-and-datasets/image-quality-issues/)  
[Data for text presence](https://vizwiz.org/tasks-and-datasets/image-captioning/)  
[Data for VQA](https://vizwiz.org/tasks-and-datasets/vqa/)  

## Data for binary classification model

In [2]:
# Annotations for text presence in image
annots_txtp_dir = '/media/arnau/SSD/VizWiz/data/captioning/annotations/'

# Annotattions for image quality issues
annots_qi_dir = '/media/arnau/SSD/VizWiz/data/quality_issues/annotations/'

# Annotations for VQA data (questions, answers...)
annots_vqa_dir = '/media/arnau/SSD/VizWiz/data/vqa/annotations/'

In [3]:
# Images dirs
train_dir = '/media/arnau/SSD/VizWiz/data/captioning/train/'
val_dir = '/media/arnau/SSD/VizWiz/data/captioning/val/'
test_dir = '/media/arnau/SSD/VizWiz/data/captioning/test/'

Train / Val / Test splits images **annotations**. Used to retrieve if there is **text present** in images

In [4]:
with open(annots_txtp_dir + "train.json", encoding='UTF-8') as tr_json_file:
    train_data = json.load(tr_json_file)
    train_annots = train_data['images']

with open(annots_txtp_dir + "val.json", encoding='UTF-8') as v_json_file:
    val_data = json.load(v_json_file)
    val_annots = val_data['images']

with open(annots_txtp_dir + "test.json", encoding='UTF-8') as ts_json_file:
    test_data = json.load(ts_json_file)
    test_annots = test_data['images']

In [5]:
# Load custom test data to ensure that val images are not in test set
test_vqa_data = '/media/arnau/SSD/VizWiz/models/hf_model_test_res.json'

with open(test_vqa_data) as user_file:
    test_data = json.load(user_file)

model_test_imgs = list(test_data.keys())

Images containing text for each split

In [6]:
train_imgs_w_text = []
val_imgs_w_text = []
test_imgs_w_text = []

annots = {'train': train_annots, 
        'val' : val_annots,
        'test' : test_annots}

for split, data in annots.items():
    for d in data:
        if d["text_detected"] == True and split == 'train':
            train_imgs_w_text .append(d["file_name"])
        elif d["text_detected"] == True and split == 'val':
            val_imgs_w_text .append(d["file_name"])
        elif d["text_detected"] == True and split == 'test':
            test_imgs_w_text .append(d["file_name"])

print(f"{len(train_imgs_w_text)} training images containing text")
print(f"{len(val_imgs_w_text)} training images containing text")
print(f"{len(test_imgs_w_text)} training images containing text")

14701 training images containing text
5018 training images containing text
5093 training images containing text


Train / Val / Test splits images **quality annotations**. Used to retrieve **quality flaws** in images

In [7]:
with open(annots_qi_dir + "train.json", encoding='UTF-8') as tqif:
    train_qi_data = json.load(tqif)
    
with open(annots_qi_dir + "val.json", encoding='UTF-8') as vqif:
    val_qi_data = json.load(vqif)
    
with open(annots_qi_dir + "test.json", encoding='UTF-8') as tsqif:
    test_qi_data = json.load(tsqif)

Train / Val / Test splits images **VQA annotations**. Used to retrieve **unanswerability** in images

In [8]:
with open(annots_vqa_dir + "train.json", encoding='UTF-8') as tqif:
    train_vqa_data = json.load(tqif)
    
with open(annots_vqa_dir + "val.json", encoding='UTF-8') as vqif:
    val_vqa_data = json.load(vqif)
    
with open(annots_vqa_dir + "test.json", encoding='UTF-8') as tsqif:
    test_vqa_data = json.load(tsqif)

**Specify flaw:**

In [9]:
flaw = "FRM"
lvl = 3

In [10]:
# If flaw == FRM we need unanswerability data
train_unanswerable_images = [data["image"] for data in train_vqa_data if data['answerable'] == 0]
val_unanswerable_images = [data["image"] for data in val_vqa_data if data['answerable'] == 0]

In [11]:
flawed_train_images_with_text = []
clear_train_images_with_text = []

flawed_val_images_with_text = []
clear_val_images_with_text = []

flawed_test_images_with_text = []
clear_test_images_with_text = []

annots_qi = {'train': train_qi_data, 
            'val' : val_qi_data}

for split, data in annots_qi.items():
    for d in data:
        # if image has text
        if d["image"] in train_imgs_w_text:
            # if flaw is out of frame (and unanswerable)
            if flaw == "FRM" and d["flaws"][flaw] >= lvl:
                if d["image"] in train_unanswerable_images:
                    flawed_train_images_with_text.append(d["image"])
            # if image is blurred
            elif flaw == "BLR" and d["flaws"][flaw] >= lvl:
                flawed_train_images_with_text.append(d["image"])
            # if image is clear
            elif d["flaws"]["NON"] >= lvl: 
                clear_train_images_with_text.append(d["image"])
        
        # Since we use a custom test set (extracted from val set) that also contains 
        # images from VizWiz, we have to check that the validation images are not in the test set. 
        # More on why we do this in vqa_hf notebook
        elif d["image"] in val_imgs_w_text and d["image"] not in model_test_imgs:
            # if image is out of frame (and unanswerable)
            if flaw == "FRM" and d["flaws"][flaw] >= lvl:
                if d["image"] in val_unanswerable_images:
                    flawed_val_images_with_text.append(d["image"])
            # if image is blurred
            elif flaw == "BLR" and d["flaws"][flaw] >= lvl:
                flawed_val_images_with_text.append(d["image"])
            # if image is clear
            elif d["flaws"]["NON"] >= lvl: 
                clear_val_images_with_text.append(d["image"])
            
            
print(f"{len(flawed_train_images_with_text)} training images {flaw} with text")
print(f"{len(clear_train_images_with_text)} training images clear with text")

print(f"{len(flawed_val_images_with_text)} val images {flaw} with text")
print(f"{len(clear_val_images_with_text)} val images clear with text")

1617 training images FRM with text
4496 training images clear with text
462 val images FRM with text
1049 val images clear with text


In [12]:
def balance(l):  
    """
    Balance a given list to have the same proportion of elements
    for each class
    """
    
    n = len(l) // 2
    arr = np.array(l, dtype=object)
    zeros = arr[arr[:,1] == 0]
    ones = arr[arr[:,1] == 1]
    np.random.shuffle(zeros)
    np.random.shuffle(ones)
    final_arr = np.concatenate((zeros[:n], ones[:n]))
    final_list = list(map(tuple, final_arr))
    
    return final_list

### Train data

Train set containing flawed and non-flawed (clear) images

In [13]:
flawed_train_dataset = np.asarray(list(map(lambda im : (im, 1), flawed_train_images_with_text)), dtype=object)
clear_train_dataset = np.asarray(list(map(lambda im : (im, 0), clear_train_images_with_text)), dtype=object)

vw_train_set = np.asarray(balance(
                                np.vstack((flawed_train_dataset, clear_train_dataset)
                                         )), 
                          dtype=object)

print(f"VizWiz TRAIN set size {vw_train_set.shape[0]}")

VizWiz TRAIN set size 4673


## Val + Test

In [14]:
flawed_val_dataset = np.asarray(list(map(lambda im : (im, 1), flawed_val_images_with_text)), dtype=object)
clear_val_dataset = np.asarray(list(map(lambda im : (im, 0), clear_val_images_with_text)), dtype=object)

test_n_val_set = np.asarray(balance(
                                np.vstack((flawed_val_dataset, clear_val_dataset))
                        ), 
                        dtype=object)

In [15]:
np.random.shuffle(test_n_val_set)
# Half data for val set, half data for test set
test_set = test_n_val_set[: len(test_n_val_set) // 2]
val_set = test_n_val_set[len(test_n_val_set) // 2: ]

In [16]:
vw_test_set = np.asarray(balance(test_set), dtype=object)
vw_val_set = np.asarray(balance(val_set), dtype=object)

print(f"VizWiz TEST set size {vw_test_set.shape[0]}")
print(f"VizWiz VAL set size {vw_val_set.shape[0]}")

VizWiz TEST set size 544
VizWiz VAL set size 526


In [17]:
# Ensure there is not test data in val set
for img in list(vw_test_set[:, 0]):
    assert img not in list(vw_val_set[:, 0]), "ERROR"

Save results

In [18]:
total_size = len(vw_test_set) + len(vw_val_set) + len(vw_train_set)

np.random.shuffle(vw_test_set)
np.random.shuffle(vw_train_set)
np.random.shuffle(vw_val_set)

vw_test_set = vw_test_set[: int(total_size * 0.1)]
vw_val_set = vw_val_set[: int(total_size * 0.1)]
vw_train_set = vw_train_set[: int(total_size * 0.8)]

vw_data = {'train' : vw_train_set,
           'val' : vw_val_set,
           'test' : vw_test_set,
          }

class NumpyArrayEncoder(JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.ndarray):
            return obj.tolist()
        return JSONEncoder.default(self, obj)

with open(f'/home/arnau/tfg/GED-TFG/data/vw_{flaw}_dataset.json', 'w') as outfile:
    json.dump(vw_data, outfile, cls=NumpyArrayEncoder)
print("Files saved")

Files saved


## Multiclass data

In [19]:
blurred_train_images_with_text = []
blurred_val_images_with_text = []

clean_train_images_with_text = []
clean_val_images_with_text = []

frm_train_images_with_text = []
frm_val_images_with_text = []

frm_blr_train_images_with_text = []
frm_blr_val_images_with_text = []


annots_qi = {'train': train_qi_data, 
            'val' : val_qi_data}

for split, data in annots_qi.items():
    for d in data:
        # if image has text
        if d["image"] in train_imgs_w_text: 
            # if image is blur (and not out of frame)
            if d["flaws"]["BLR"] >= lvl and d["flaws"]["FRM"] <= lvl: 
                data = [d["image"], 0, 1, 0] # [img, clean, blr, frm]
                blurred_train_images_with_text.append(data) 
            # if image is clear 
            elif d["flaws"]["NON"] >= lvl: 
                data = [d["image"], 1, 0, 0]
                clean_train_images_with_text.append(data)
            # if image is out of frame and unanswerable
            elif d["flaws"]["FRM"] >= lvl and d["flaws"]["BLR"] <= lvl:
                if d["image"] in train_unanswerable_images:
                    data = [d["image"], 0, 0, 1]
                    frm_train_images_with_text.append(data)

        elif d["image"] in val_imgs_w_text and d["image"] not in model_test_imgs: 
            if d["flaws"]["BLR"] >= lvl and d["flaws"]["FRM"] <= lvl:
                data = [d["image"], 0, 1, 0]
                blurred_val_images_with_text.append(data)
            elif d["flaws"]["NON"] >= lvl: 
                data = [d["image"], 1, 0, 0]
                clean_val_images_with_text.append(data)
            elif d["flaws"]["FRM"] >= lvl and d["flaws"]["BLR"] <= lvl:
                if d["image"] in val_unanswerable_images:
                    data = [d["image"],0, 0, 1]
                    frm_val_images_with_text.append(data)
                

In [20]:
def undersample(arr, min_len_array):
    to_remove = (len(arr) - min_len_array) // 2
    
    idx_delete_0 = np.where(arr[:, 1] == 0)[0][: to_remove]
    idx_delete_1 = np.where(arr[:, 1] == 1)[0][: to_remove]
    idx_to_remove = np.hstack([idx_delete_0, idx_delete_1])
    res = np.delete(arr, idx_to_remove, axis=0)
    
    # Equate shapes removing random element
    if len(res) != min_len_array:
        diff = len(res) - min_len_array 
        rand_idx_remove = random.sample(range(0, len(res)), diff)
        res = np.delete(res, rand_idx_remove, axis=0)  
    
    return np.array(res, dtype=object)

In [21]:
train_data = np.vstack([np.array(blurred_train_images_with_text, dtype=object),
                        np.array(frm_train_images_with_text, dtype=object),
                        np.array(clean_train_images_with_text, dtype=object)])

val_data = np.vstack([np.array(blurred_val_images_with_text[: len(blurred_val_images_with_text) // 2], dtype=object),
                        np.array(frm_val_images_with_text[: len(frm_val_images_with_text) // 2], dtype=object),
                        np.array(clean_val_images_with_text[: len(clean_val_images_with_text) // 2], dtype=object)])

test_data = np.vstack([np.array(blurred_val_images_with_text[len(blurred_val_images_with_text) // 2:], dtype=object),
                        np.array(frm_val_images_with_text[len(frm_val_images_with_text) // 2:], dtype=object),
                        np.array(clean_val_images_with_text[len(clean_val_images_with_text) // 2:], dtype=object)])


Number of samples per class

In [22]:
classess = {1 : "clean", 2 : "BLR", 3 : "FRM"}

# data augmentation taking clean class as reference
print("TRAIN")
for cidx, classname in classess.items():
    class_samples = train_data[np.where(train_data[:, cidx] == 1)]
    print(f"{classname} -- n_samples = {class_samples.shape[0]}")
    
print("\nVAL")
for cidx, classname in classess.items():
    class_samples = val_data[np.where(val_data[:, cidx] == 1)]
    print(f"{classname} -- n_samples = {class_samples.shape[0]}")
    
print("\nTEST")
for cidx, classname in classess.items():
    class_samples = test_data[np.where(test_data[:, cidx] == 1)]
    print(f"{classname} -- n_samples = {class_samples.shape[0]}")

TRAIN
clean -- n_samples = 4496
BLR -- n_samples = 3218
FRM -- n_samples = 990

VAL
clean -- n_samples = 524
BLR -- n_samples = 475
FRM -- n_samples = 126

TEST
clean -- n_samples = 525
BLR -- n_samples = 476
FRM -- n_samples = 126


In [23]:
for split in ["train", "val", "test"]:
    if split == "train":
        data = train_data
        direc = train_dir
    elif split == "val":
        data = val_data
        direc = val_dir
    elif split == "test":
        data = test_data
        direc = val_dir # *!*
        
    for img_data in data:
        img_name = img_data[0]
        source_file = direc + img_name
        destination_folder = f'/media/arnau/SSD/VizWiz/models/multiclass/{split}/'
    
        if img_name not in os.listdir(destination_folder):
            shutil.copy2(source_file, destination_folder)
        

In [24]:
total_size = len(train_data) + len(val_data) + len(test_data)

np.random.shuffle(train_data)
np.random.shuffle(val_data)
np.random.shuffle(test_data)

# **No need to split (80/10/10) since it is alredy that proportion**

vw_mc_data = {'train' : train_data,
           'val' : val_data,
           'test' : test_data,
          }

class NumpyArrayEncoder(JSONEncoder):
    def default(self, obj):
        if isinstance(obj, np.ndarray):
            return obj.tolist()
        return JSONEncoder.default(self, obj)

In [25]:
with open(f'/home/arnau/tfg/GED-TFG/data/vw_MC_dataset.json', 'w') as outfile:
    json.dump(vw_mc_data, outfile, cls=NumpyArrayEncoder)
print("Files saved")

Files saved
