In [15]:
import os
import torch
import random
import gensim
import requests
import html2text
import wikipedia
import smart_open

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

from PIL import Image
from transformers import (
    VisionEncoderDecoderModel,
    ViTFeatureExtractor,
    AutoTokenizer,
    AutoModelForQuestionAnswering,
    pipeline,
)
from nltk import pos_tag, word_tokenize
from sklearn.feature_extraction.text import TfidfVectorizer
from IPython.display import HTML
from tqdm import tqdm

data_path = "../../data"

test_data_dir = os.path.join(gensim.__path__[0], 'test', 'test_data')
lee_train_file = os.path.join(test_data_dir, 'lee_background.cor')
lee_test_file = os.path.join(test_data_dir, 'lee.cor')

In [2]:
root_path = os.path.join(data_path, "okvqa")

test_questions_path = os.path.join(root_path, "OpenEnded_mscoco_val2014_questions.json")
test_annotations_path = os.path.join(root_path, "mscoco_val2014_annotations.json")
test_image_path = os.path.join(root_path, "val2014")
test_image_name_prefix = "COCO_val2014_000000"

with open(test_questions_path, "r") as f:
    test_questions_df = pd.DataFrame(json.load(f)["questions"])
    
with open(test_annotations_path, "r") as f:
    test_annotations_df = pd.DataFrame(json.load(f)["annotations"])
    
test_df = test_questions_df.merge(test_annotations_df)
test_df["image_path"] = test_df["image_id"].map(lambda image_id: os.path.join(test_image_path, f"{test_image_name_prefix}{image_id:06d}.jpg"))

In [3]:
qa_model = AutoModelForQuestionAnswering.from_pretrained("deepset/roberta-base-squad2")
qa_tokenizer = AutoTokenizer.from_pretrained("deepset/roberta-base-squad2")

ic_model = VisionEncoderDecoderModel.from_pretrained("nlpconnect/vit-gpt2-image-captioning")
ic_feature_extractor = ViTFeatureExtractor.from_pretrained("nlpconnect/vit-gpt2-image-captioning")
ic_tokenizer = AutoTokenizer.from_pretrained("nlpconnect/vit-gpt2-image-captioning")

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
qa_model = qa_model.to(device)
ic_model = ic_model.to(device)

In [37]:
def read_image(image_path):
    i_image = Image.open(image_path)
    if i_image.mode != "RGB":
        i_image = i_image.convert(mode="RGB")

    return i_image

def get_wikipedia_page(string):
    try:
        p = wikipedia.summary(string, auto_suggest=False)
    except wikipedia.DisambiguationError as e:
        s = e.options[0]
        p = wikipedia.summary(s, auto_suggest=False)

    return p

def get_context(caption, use_wikipedia=True):
    if use_wikipedia:
        words = word_tokenize(caption)
        tags = pos_tag(words)

        tags = [w for (w, t) in tags if t[0] == "N"]

        options = list(set([o for tag in tags for o in wikipedia.search(tag, results=1)]))
        pages = [caption] + [get_wikipedia_page(option) for option in options]
        
        return "\n".join(pages)
    
    return caption

def get_caption(
    ic_model,
    ic_feature_extractor,
    ic_tokenizer,
    image,
    max_length=16,
    num_beams=4,
):
    pixel_values = ic_feature_extractor(
        images=[image], return_tensors="pt"
    ).pixel_values
    pixel_values = pixel_values.to(ic_model.device)

    output_ids = ic_model.generate(
        pixel_values, max_length=max_length, num_beams=num_beams
    )

    preds = ic_tokenizer.batch_decode(output_ids, skip_special_tokens=True)
    preds = [pred.strip() for pred in preds]
    return preds[0]

def get_answer(qa_model, qa_tokenizer, question, context):
    qa_pipeline = pipeline("question-answering", model=qa_model, tokenizer=qa_tokenizer)
    result = qa_pipeline(question, context)

    return result

def run(
    ic_feature_extractor,
    ic_model,
    ic_tokenizer,
    qa_model,
    qa_tokenizer,
    image,
    question,
    max_length=16,
    num_beams=4,
    use_wikipedia=True
):
    caption = get_caption(
        ic_model,
        ic_feature_extractor,
        ic_tokenizer,
        image,
        max_length=max_length,
        num_beams=num_beams,
    )
    context = get_context(caption, use_wikipedia=use_wikipedia)
    answer = get_answer(qa_model, qa_tokenizer, question, context)

    return caption, answer, context

def mk_predictions(index, use_wikipedia=True):
    question, image_path, answers = test_df[["question", "image_path", "answers"]].iloc[index]
    image = read_image(image_path)
    caption, answer, context = run(ic_feature_extractor, ic_model, ic_tokenizer, qa_model, 
                                   qa_tokenizer, image, question, use_wikipedia=use_wikipedia)
    
    return answer, answers

def get_similarity(doc2vec, pair):
    answer, answers = pair
    
    result = 0
    for ans in list(set([a['answer'] for a in answers])):
        r = doc2vec.similarity_unseen_docs(word_tokenize(answer["answer"]), word_tokenize(ans))
        if r > result:
            result = r
    
    return result

def compute_score(doc2vec, predictions, threshold=0.5):
    total = len(predictions)
    count = 0
    
    for pair in predictions:
        if get_similarity(doc2vec, pair) >= threshold:
            count += 1
            
    return count / total

def compute_compare_score(doc2vec, predictions1, predictions2):
    total = len(predictions)
    count = 0
    
    for (answer1, answers), (answer2, answers) in zip(predictions1, predictions2):
        result1 = get_similarity(doc2vec, (answer1, answers))
        result2 = get_similarity(doc2vec, (answer2, answers))
        
        if result1 >= result2:
            count += 1
            
    return count / total

def read_corpus(fname, tokens_only=False):
    with smart_open.open(fname, encoding="iso-8859-1") as f:
        for i, line in enumerate(f):
            tokens = gensim.utils.simple_preprocess(line)
            if tokens_only:
                yield tokens
            else:
                # For training data, add tags
                yield gensim.models.doc2vec.TaggedDocument(tokens, [i])

In [5]:
train_corpus = list(read_corpus(lee_train_file)) + list(read_corpus(lee_test_file))

doc2vec = gensim.models.doc2vec.Doc2Vec(vector_size=50, min_count=2, epochs=40)
doc2vec.build_vocab(train_corpus)
doc2vec.train(train_corpus, total_examples=doc2vec.corpus_count, epochs=doc2vec.epochs)

In [21]:
predictions = []

for i in tqdm(range(100)):
    predictions.append(mk_predictions(i))

100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 100/100 [09:04<00:00,  5.44s/it]


In [22]:
compute_score(doc2vec, predictions)

0.3

In [23]:
compute_score(doc2vec, predictions, 0.8)

0.11

In [24]:
predictions_2 = []

for i in tqdm(range(100)):
    predictions_2.append(mk_predictions(i, use_wikipedia=False))

100%|███████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████████| 100/100 [04:58<00:00,  2.99s/it]


In [25]:
compute_score(doc2vec, predictions_2)

0.4

In [26]:
compute_score(doc2vec, predictions_2, 0.8)

0.14

In [38]:
compute_compare_score(doc2vec, predictions, predictions_2)

0.57