In [1]:
import tensorflow as tf
import os
import collections
import time
import random
import re
import json
import numpy as np
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import pickle
import math

from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
from glob import glob
from PIL import Image
from tqdm import tqdm

#Custom Classes
from prep import Preparation
from gru_encoder import GRU_Encoder
from cnn_encoder import CNN_Encoder

In [117]:
image_model = tf.keras.applications.ResNet50(include_top=False, weights='imagenet')
image_input = image_model.input
hidden_layer = image_model.layers[-1].output
image_features_extract_model = tf.keras.Model(image_input, hidden_layer)

prep = Preparation()

In [118]:
#Import the pre-trained listener (Works with 97%+ Accuracy)
gru_encoder = GRU_Encoder(512, 512, 46)
encoder = CNN_Encoder(512)
optimizer_l = tf.keras.optimizers.Adam()
loss_object_l = tf.keras.losses.CategoricalCrossentropy()

loss_plot_l = []

#Load Pre-Trained Listener Model
l_checkpoint_path = "./checkpointslistener/train"
l_ckpt = tf.train.Checkpoint(encoder=encoder,
                             gru_encoder=gru_encoder,
                             optimizer_l=optimizer_l)
l_ckpt_manager = tf.train.CheckpointManager(l_ckpt, l_checkpoint_path, max_to_keep=30)
l_ckpt.restore(l_ckpt_manager.latest_checkpoint)

<tensorflow.python.training.tracking.util.CheckpointLoadStatus at 0x7faa18016810>

In [2]:
## RANDOM ##
#Import all the captions
with open("../dataset/captions.json", 'r') as jf:
    data_all = json.loads(jf.read())
data_all.pop('lstm_labels')
img_paths = list(data_all.keys())

edited_captions = []

for img_path in img_paths:
    captions = data_all[img_path]
    for c in captions:
        ed = f'<start> {c} <end>'
        edited_captions.append(ed)

#Tokenize
top_v = 45

tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=top_v,
                                                  oov_token="<unk>",
                                                  filters='!"#$%&()*+.,-/:;=?@[\]^_`{|}~ ')
tokenizer.fit_on_texts(edited_captions)
tokenizer.word_index['<pad>'] = 0
tokenizer.index_word[0] = '<pad>'

# Create the tokenized vectors
train_seqs = tokenizer.texts_to_sequences(edited_captions)

cap_vector = tf.keras.preprocessing.sequence.pad_sequences(train_seqs, padding='post')

In [10]:
def map_func_test(img_name, img_name_2):
  img_tensor = np.load('../dataset/prep_data/'+img_name.decode('utf-8')+'.jpg.npy')
  img_tensor_2 = np.load('../dataset/prep_data/'+img_name_2.decode('utf-8')+'.jpg.npy')
  uno = int(img_name)

  return img_tensor, img_tensor_2, uno

def map_func_disc(img_name, img_name_2, cap):
  img_tensor = np.load('../dataset/prep_data/'+img_name.decode('utf-8')+'.jpg.npy')
  img_tensor_2 = np.load('../dataset/prep_data/'+img_name_2.decode('utf-8')+'.jpg.npy')

  return img_tensor, img_tensor_2, cap

In [11]:
BATCH_SIZE = 32
BUFFER_SIZE = 1000
vocab_size = top_v + 1

In [12]:
## BEST CAPT ##

with open("../dataset/best_captions.json", "r") as jfec:
    data = json.loads(jfec.read())

target_data = data['target_paths']
distractor_data = data['distractor_paths']
captions_data = data['best_captions']
edited_captions_data = []

for c in captions_data:
    edited_captions_data.append('<start> {c} <end>')

test_slice_index = int(len(target_data)*0.9)

targ_test = target_data[test_slice_index:]
dis_test = distractor_data[test_slice_index:]
cap_test = edited_captions_data[test_slice_index:]

train_seqs_2 = tokenizer.texts_to_sequences(cap_test)
cap_vector = tf.keras.preprocessing.sequence.pad_sequences(train_seqs_2, padding='post')

testset = tf.data.Dataset.from_tensor_slices((targ_test, dis_test, cap_vector))
testset = testset.map(lambda item1, item2, item3: tf.numpy_function(
          map_func_disc, [item1, item2, item3], [tf.float32, tf.float32, tf.int32]))

testset = testset.shuffle(BUFFER_SIZE).batch(1)

In [112]:
with open("../dataset/best_captions.json", "r") as jfec:
    data = json.loads(jfec.read())

target_data = data['target_paths']
distractor_data = data['distractor_paths']

val_slice_index = int(len(target_data)*0.8)
test_slice_index = int(len(target_data)*0.9)

targ_test = target_data[test_slice_index:]

dis_test = distractor_data[test_slice_index:]

testset = tf.data.Dataset.from_tensor_slices((targ_test, dis_test))
testset = testset.map(lambda item1, item2: tf.numpy_function(
          map_func_test, [item1, item2], [tf.float32, tf.float32, tf.int64]))

testset = testset.shuffle(BUFFER_SIZE).batch(1)

with open("../dataset/easy_captions.json", "r") as jfec:
    easy_data = json.loads(jfec.read())

target_easy_data = easy_data['target_paths']
distractor_easy_data = easy_data['distractor_paths']

val_slice_index = int(len(target_easy_data)*0.8)
test_slice_index = int(len(target_easy_data)*0.9)

targ_easy_test = target_easy_data[test_slice_index:]

dis_easy_test = distractor_easy_data[test_slice_index:]

testset_easy = tf.data.Dataset.from_tensor_slices((targ_easy_test, dis_easy_test))
testset_easy = testset_easy.map(lambda item1, item2: tf.numpy_function(
          map_func_test, [item1, item2], [tf.float32, tf.float32, tf.int64]))

testset_easy = testset_easy.shuffle(BUFFER_SIZE).batch(1)

In [113]:
def check_understanding_random(data):
    total_right = 0
    total_wrong = 0

    for (batch, (targ_tensor, dis_tensor, targ_name)) in enumerate(data):
        hidden_l = gru_encoder.reset_state(batch_size=targ_tensor.shape[0])

        #Find a random caption associated with the target
        targ_name = targ_name.numpy()[0]
        captions = data_all[str(targ_name)]
        seed = random.randint(0, len(captions)-1)
        caption = f"<start> {captions[seed]} <end>"
        m = tokenizer.texts_to_sequences([caption])
        m = tf.one_hot(m, vocab_size)

        features_t = encoder_l(targ_tensor)
        features_d = encoder_l(dis_tensor)

        left = features_t
        right = features_d

        v = gru_encoder(m, hidden_l)

        x = tf.norm(tf.keras.layers.dot([left, v],axes=2,normalize=True),axis=(1,2))
        y = tf.norm(tf.keras.layers.dot([right, v],axes=2,normalize=True),axis=(1,2))

        mask = tf.math.greater(x, y)
        total_right += np.sum(mask.numpy())
        total_wrong += np.sum(mask.numpy()==False)

    total = total_right + total_wrong

    acc = total_right / total

    return acc, total_right, total_wrong

In [13]:
def check_understanding_disc(data):
    total_right = 0
    total_wrong = 0

    for (batch, (targ_tensor, dis_tensor, cap)) in enumerate(data):
        hidden_l = gru_encoder.reset_state(batch_size=targ_tensor.shape[0])

        m = tf.one_hot(cap, vocab_size)

        features_t = encoder(targ_tensor)
        features_d = encoder(dis_tensor)

        left = features_t
        right = features_d

        v = gru_encoder(m, hidden_l)

        x = tf.norm(tf.keras.layers.dot([left, v],axes=2,normalize=True),axis=(1,2))
        y = tf.norm(tf.keras.layers.dot([right, v],axes=2,normalize=True),axis=(1,2))

        mask = tf.math.greater(x, y)
        total_right += np.sum(mask.numpy())
        total_wrong += np.sum(mask.numpy()==False)

    total = total_right + total_wrong

    acc = total_right / total

    return acc, total_right, total_wrong

In [16]:
## HUMAN EVALUATION RANDOM ##
with open('../human_evaluation/x.json', 'r') as jf:
    X = json.loads(jf.read())

def show_test_random(num, save):
    captions = data_all[str(target_data[num])]
    rand = random.randint(0, len(captions)-1)
    phrase = f"Utterance: {captions[rand]}"
    img_A = mpimg.imread(f"../dataset/prep_data/{target_data[num]}.jpg")
    img_B = mpimg.imread(f"../dataset/prep_data/{distractor_data[num]}.jpg")

    rand_n = random.uniform(0,1)

    if rand_n < 0.5:
        t = 0
        d = 1
    else:
        t = 1
        d = 0
    fig, ax = plt.subplots(1,2)
    ax[t].imshow(img_A)
    ax[d].imshow(img_B)
    fig.suptitle(phrase)
    plt.savefig(f'../human_evaluation/random/{save}.png')
    plt.clf()
    return t

correct = {}
for i in tqdm(range(len(X))):
    curr_ind = X[i]
    t = show_test_random(curr_ind, i)
    correct[i] = t

with open("../human_evaluation/random/correct.json", "w") as wj:
    json.dump(correct, wj)

100%|██████████| 100/100 [01:08<00:00,  1.47it/s]


<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

<Figure size 432x288 with 0 Axes>

In [24]:
## HUMAN EVALUATION BEST CAPTION ##
with open('../human_evaluation/x.json', 'r') as jf:
    X = json.loads(jf.read())

def show_test_discriminative(num, save):
    phrase = f"Utterance: {captions_data[num]}"
    img_A = mpimg.imread(f"../dataset/prep_data/{target_data[num]}.jpg")
    img_B = mpimg.imread(f"../dataset/prep_data/{distractor_data[num]}.jpg")

    rand_n = random.uniform(0,1)

    if rand_n < 0.5:
        t = 0
        d = 1
    else:
        t = 1
        d = 0
    fig, ax = plt.subplots(1,2)
    ax[t].imshow(img_A)
    ax[d].imshow(img_B)
    fig.suptitle(phrase)
    plt.savefig(f'../human_evaluation/discriminative/{save}.png')
    plt.clf()
    return t

correct = {}
for i in tqdm(range(len(X))):
    curr_ind = X[i]
    t = show_test_discriminative(curr_ind, i)
    correct[i] = t

with open("../human_evaluation/discriminative/correct.json", "w") as wj:
    json.dump(correct, wj)

<Figure size 432x288 with 0 Axes>

In [None]:
## HUMAN EVALUATION BEST CAPTION ##
with open('../human_evaluation/x.json', 'r') as jf:
    X = json.loads(jf.read())

def show_test_report(num, save):
    phrase = f"Utterance: {captions_data[num]}"
    img_A = mpimg.imread(f"../dataset/prep_data/{target_data[num]}.jpg")
    img_B = mpimg.imread(f"../dataset/prep_data/{distractor_data[num]}.jpg")

    rand_n = random.uniform(0,1)

    # if rand_n < 0.5:
    t = 0
    d = 1
    # else:
    #     t = 1
    #     d = 0
    fig, ax = plt.subplots(1,2)
    ax[t].imshow(img_A)
    ax[t].set_title("Target Image")
    ax[t].axis('off')
    ax[d].imshow(img_B)
    ax[d].set_title("Distractor Image")
    ax[d].axis('off')
    plt.rcParams['font.family'] = 'serif'
    # fig.suptitle(phrase)
    plt.savefig(f'../human_evaluation/base/{save}.png')
    plt.clf()
    return t

correct = {}

curr_ind = X[96]
t = show_test_report(curr_ind, 96)