In [None]:
import json
import os
import re
from collections import defaultdict
import glob

In [4]:
orig_path = "/content/drive/MyDrive/DL_Project/original"
dest_path = "/content/preprocessed"

# Maximum number of top answers to consider
max_answers = 1000

def generate_query_vocabulary():

    dataset_files = os.listdir(orig_path + '/questions')
    regex_pattern = re.compile(r'\\W+')

    query_words = []

    for file_name in dataset_files:
        file_path = os.path.join(orig_path, 'questions', file_name)

        try:
            with open(file_path, 'r') as file_obj:
                print(file_obj)
                question_data = json.load(file_obj)
                questions = question_data['questions']

        except (IOError, SyntaxError):
            break

        for idx, query in enumerate(questions):
            split_question = regex_pattern.split(query['question'].lower())
            cleaned_words = [word.strip() for word in split_question if len(word.strip()) > 0]

            query_words.extend(cleaned_words)

    # Remove duplicates and sort the vocabulary
    query_words = list(set(query_words))
    query_words.sort()

    # Add empty strings at the start of the vocabulary
    query_words.insert(0, '')
    query_words.insert(1, '')

    if not os.path.exists(dest_path):
        os.makedirs(dest_path)

    # Write the vocabulary to a file
    vocab_file_path = os.path.join(dest_path, 'Questions', 'question_vocabs.txt')
    with open(vocab_file_path, 'w') as file_obj:
        file_obj.writelines([word + '\\n' for word in query_words])

    print(f"Total words: {len(query_words)}")

generate_query_vocabulary()

<_io.TextIOWrapper name='/content/drive/MyDrive/Colab Notebooks/original/questions/v2_OpenEnded_mscoco_val2014_questions.json' mode='r' encoding='UTF-8'>
<_io.TextIOWrapper name='/content/drive/MyDrive/Colab Notebooks/original/questions/v2_OpenEnded_mscoco_train2014_questions.json' mode='r' encoding='UTF-8'>
<_io.TextIOWrapper name='/content/drive/MyDrive/Colab Notebooks/original/questions/v2_OpenEnded_mscoco_test2015_questions.json' mode='r' encoding='UTF-8'>
<_io.TextIOWrapper name='/content/drive/MyDrive/Colab Notebooks/original/questions/v2_OpenEnded_mscoco_test-dev2015_questions.json' mode='r' encoding='UTF-8'>
Total words: 331640


In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [7]:
def create_answer_vocabulary(max_answers_count):

    answer_counts = defaultdict(int)
    annotation_files = os.listdir(orig_path + '/annotations')

    for file_name in annotation_files:
        file_path = os.path.join(orig_path, 'annotations', file_name)

        try:
            with open(file_path, 'r') as file_obj:
                data = json.load(file_obj)
        except (IOError, SyntaxError):
            break

        annotations = data['annotations']

        for entry in annotations:
            answer = entry['multiple_choice_answer']
            if re.search(r'[^\w\s]', answer):
                continue

            answer_counts[answer] += 1

    sorted_answers = sorted(answer_counts, key=answer_counts.get, reverse=True)
    top_answers = [''] + sorted_answers[:max_answers_count - 1]

    annotation_vocab_dir = os.path.join(dest_path, 'Annotations')
    if not os.path.exists(annotation_vocab_dir):
        os.makedirs(annotation_vocab_dir)

    vocab_file_path = os.path.join(annotation_vocab_dir, 'annotation_vocabs.txt')
    with open(vocab_file_path, 'w') as file_obj:
        file_obj.writelines([ans + '\n' for ans in top_answers])

    print(f'The num of total words of answers: {len(sorted_answers)}')
    print(f'Keep top {max_answers_count}')

create_answer_vocabulary(1000)

The number of total words of answers: 26480
Keep top 1000 answers into vocabulary


In [8]:
if __name__ == "__main__":
  
    generate_query_vocabulary()  # Generate the vocabulary for questions
    create_answer_vocabulary(max_answers_count=max_answers)  # Generate the vocabulary for answers

<_io.TextIOWrapper name='/content/drive/MyDrive/Colab Notebooks/original/questions/v2_OpenEnded_mscoco_val2014_questions.json' mode='r' encoding='UTF-8'>
<_io.TextIOWrapper name='/content/drive/MyDrive/Colab Notebooks/original/questions/v2_OpenEnded_mscoco_train2014_questions.json' mode='r' encoding='UTF-8'>
<_io.TextIOWrapper name='/content/drive/MyDrive/Colab Notebooks/original/questions/v2_OpenEnded_mscoco_test2015_questions.json' mode='r' encoding='UTF-8'>
<_io.TextIOWrapper name='/content/drive/MyDrive/Colab Notebooks/original/questions/v2_OpenEnded_mscoco_test-dev2015_questions.json' mode='r' encoding='UTF-8'>
Total words: 331640
The number of total words of answers: 26480
Keep top 1000 answers into vocabulary


In [9]:
output_path = "/user/dmandava/DL_Project1/preprocessed"

# Original questions directory path
questions_path = "/content/drive/MyDrive/DL_Project/original/questions"

# Original annotations directory path
annotations_path = "/content/drive/MyDrive/DL_Project/original/annotations"

In [10]:
def find_top_answer(annotation_answer):
    annotation_vocab_path = os.path.join(output_path, 'Annotations', 'annotation_vocabs.txt')

    with open(annotation_vocab_path, 'r') as file_obj:
        top_answers = [line.strip() for line in file_obj]

    if annotation_answer not in top_answers:
        annotation_answer = ''
        find_top_answer.unknown_answers_count += 1

    return annotation_answer

# Initialize the unknown answers count
find_top_answer.unknown_answers_count = 0

In [11]:
def preprocess_data(question_file, annotations_dir, is_labeled):

    with open(question_file, 'r') as file_obj:
        data = json.load(file_obj)

    questions = data['questions']

    if data['data_subtype'] == 'test-dev2015':
        file_prefix = 'test2015'
    else:
        file_prefix = data['data_subtype']

    if is_labeled:
        # For labeled data (training or validation set)
        annotation_pattern = os.path.join(annotations_dir, f'*{file_prefix}*.json')
        annotation_path = glob.glob(annotation_pattern)[0]

        with open(annotation_path, 'r') as file_obj:
            annotations = json.load(file_obj)['annotations']

        question_annotations = {ann['question_id']: ann for ann in annotations}

        find_top_answer.unknown_answers_count = 0

    dataset = [None] * len(questions)

    for idx, question in enumerate(questions):
        if (idx + 1) % 10000 == 0:
            print(f'Processing {data["data_subtype"]} data: {idx + 1}/{len(questions)}')

        question_id = question['question_id']
        question_sentence = question['question']
        image_id = question['image_id']
        image_name = f'COCO_{file_prefix}_{image_id:012d}.jpg'

        data_entry = [image_name, question_sentence]

        if is_labeled:
            annotation_answer = question_annotations[question_id]['multiple_choice_answer']
            answer = find_top_answer(annotation_answer)
            data_entry.append(answer)

        dataset[idx] = data_entry

    if is_labeled:
        print(f'Total {find_top_answer.unknown_answers_count} out of {len(questions)} answers are unknown')

    return dataset

In [12]:
def preprocess_data(question_file, annotations_dir, is_labeled):

    with open(question_file, 'r') as file_obj:
        data = json.load(file_obj)

    questions = data['questions']

    if data['data_subtype'] == 'test-dev2015':
        file_prefix = 'test2015'
    else:
        file_prefix = data['data_subtype']

    if is_labeled:
        # For labeled data (training or validation set)
        annotation_pattern = os.path.join(annotations_dir, f'*{file_prefix}*.json')
        annotation_path = glob.glob(annotation_pattern)[0]

        with open(annotation_path, 'r') as file_obj:
            annotations = json.load(file_obj)['annotations']

        question_annotations = {ann['question_id']: ann for ann in annotations}

        find_top_answer.unknown_answers_count = 0

    dataset = [None] * len(questions)

    for idx, question in enumerate(questions):
        if (idx + 1) % 10000 == 0:
            print(f'Processing {data["data_subtype"]} data: {idx + 1}/{len(questions)}')

        question_id = question['question_id']
        question_sentence = question['question']
        image_id = question['image_id']
        image_name = f'COCO_{file_prefix}_{image_id:012d}.jpg'

        data_entry = [image_name, question_sentence]

        if is_labeled:
            annotation_answer = question_annotations[question_id]['multiple_choice_answer']
            answer = find_top_answer(annotation_answer)
            data_entry.append(answer)

        dataset[idx] = data_entry

    if is_labeled:
        print(f'Total {find_top_answer.unknown_answers_count} out of {len(questions)} answers are unknown')

    return dataset

In [13]:
def process_question_files(questions_path, annotations_path):

    preprocessed_data = {}

    for file_name in os.listdir(questions_path):
        try:
            data_type = file_name[20:-19]
            print(data_type)

            is_labeled = "test" not in data_type

            question_file = os.path.join(questions_path, file_name)

            preprocessed_data[data_type] = preprocess_data(question_file, annotations_path, is_labeled)

        except (IOError, SyntaxError):
            pass

    print(preprocessed_data['train'][:3])
    return preprocessed_data

# Call the function to process question files
processed_data = process_question_files(questions_path, annotations_path)

val
Processing val2014 data: 10000/214354
Processing val2014 data: 20000/214354
Processing val2014 data: 30000/214354
Processing val2014 data: 40000/214354
Processing val2014 data: 50000/214354
Processing val2014 data: 60000/214354
Processing val2014 data: 70000/214354
Processing val2014 data: 80000/214354
Processing val2014 data: 90000/214354
Processing val2014 data: 100000/214354
Processing val2014 data: 110000/214354
Processing val2014 data: 120000/214354
Processing val2014 data: 130000/214354
Processing val2014 data: 140000/214354
Processing val2014 data: 150000/214354
Processing val2014 data: 160000/214354
Processing val2014 data: 170000/214354
Processing val2014 data: 180000/214354
Processing val2014 data: 190000/214354
Processing val2014 data: 200000/214354
Processing val2014 data: 210000/214354
Total 27454 out of 214354 answers are unknown
train
Processing train2014 data: 10000/443757
Processing train2014 data: 20000/443757
Processing train2014 data: 30000/443757
Processing tra

In [14]:
import numpy as np

def save_processed_data(processed_data, output_path):

    for data_type, data_entries in processed_data.items():
        # Convert the list of data entries to a NumPy array
        data_array = np.array(data_entries)
        output_file_path = os.path.join(output_path, f'{data_type}.npy')

        # Save the NumPy array to the output file
        np.save(output_file_path, data_array)

save_processed_data(processed_data, output_path)

In [15]:
import os
import shutil

preprocessed_data_path = "/content/drive/MyDrive/DL_Project/preprocessed"
vqa_preprocessed_path = "/content/drive/MyDrive/DL_Project/VQA_preprocessed"

def copy_preprocessed_data(src_dir, dest_dir):
    os.makedirs(dest_dir, exist_ok=True)

    # List of file names to copy
    file_names = ["test-dev.npy", "test.npy", "train.npy", "val.npy"]

    # Copy the preprocessed data files
    for file_name in file_names:
        src_path = os.path.join(src_dir, file_name)
        dest_path = os.path.join(dest_dir, file_name)
        shutil.copy(src_path, dest_path)

    # Copy the vocabulary files
    questions_dir = os.path.join(src_dir, "Questions")
    annotations_dir = os.path.join(src_dir, "Annotations")

    shutil.copy(os.path.join(questions_dir, "question_vocabs.txt"), dest_dir)
    shutil.copy(os.path.join(annotations_dir, "annotation_vocabs.txt"), dest_dir)

copy_preprocessed_data(preprocessed_data_path, vqa_preprocessed_path)