In [None]:
import tensorflow as tf

# You'll generate plots of attention in order to see which parts of an image
# our model focuses on during captioning
import matplotlib.pyplot as plt

# Scikit-learn includes many helpful utilities
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle

import re
import numpy as np
import os
import time
import json
import csv
from glob import glob
from PIL import Image
import pickle

## PART 1

In this part we are using the code provided that generates the captions. At the end a captions file is created.

In [None]:
annotation_folder = '/annotations/'
if not os.path.exists(os.path.abspath('.') + annotation_folder):
  annotation_zip = tf.keras.utils.get_file('captions.zip',
                                          cache_subdir=os.path.abspath('.'),
                                          origin = 'http://images.cocodataset.org/annotations/annotations_trainval2014.zip',
                                          extract = True)
  annotation_file = os.path.dirname(annotation_zip)+'/annotations/captions_train2014.json'
  os.remove(annotation_zip)

# Download image files
image_folder = '/train2014/'
if not os.path.exists(os.path.abspath('.') + image_folder):
  image_zip = tf.keras.utils.get_file('train2014.zip',
                                      cache_subdir=os.path.abspath('.'),
                                      origin = 'http://images.cocodataset.org/zips/train2014.zip',
                                      extract = True)
  PATH = os.path.dirname(image_zip) + image_folder
  os.remove(image_zip)
else:
  PATH = os.path.abspath('.') + image_folder


Downloading data from http://images.cocodataset.org/annotations/annotations_trainval2014.zip
Downloading data from http://images.cocodataset.org/zips/train2014.zip


In [None]:
annotation_file = './annotations/captions_train2014.json'
PATH = './train2014/'

with open(annotation_file, 'r') as f:
    annotations = json.load(f)


# Store captions and image names in vectors
all_captions = []
all_img_name_vector = []

for annot in annotations['annotations']:
    caption = '<start> ' + annot['caption'] + ' <end>'
    image_id = annot['image_id']
    full_coco_image_path = PATH + 'COCO_train2014_' + '%012d.jpg' % (image_id)

    all_img_name_vector.append(full_coco_image_path)
    all_captions.append(caption)

# Shuffle captions and image_names together
# Set a random state

train_captions, img_name_vector_jnk = shuffle(all_captions,
                                          all_img_name_vector,
                                          random_state=1)

# Select the first 30000 captions from the shuffled set
num_examples = 30000
train_captions = train_captions[:num_examples]
img_name_vector = img_name_vector_jnk[:num_examples]


In [None]:
 def load_image(image_path):
    img = tf.io.read_file(image_path)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.image.resize(img, (299, 299))
    img = tf.keras.applications.inception_v3.preprocess_input(img)
    return img, image_path

image_model = tf.keras.applications.InceptionV3(include_top=False,
                                                weights='imagenet')
new_input = image_model.input
hidden_layer = image_model.layers[-1].output

image_features_extract_model = tf.keras.Model(new_input, hidden_layer)

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/inception_v3/inception_v3_weights_tf_dim_ordering_tf_kernels_notop.h5


In [None]:
encode_train = sorted(set(img_name_vector))

# Feel free to change batch_size according to your system configuration
image_dataset = tf.data.Dataset.from_tensor_slices(encode_train)
image_dataset = image_dataset.map(
  load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE).batch(16)

In [None]:
for img, path in image_dataset:
  batch_features = image_features_extract_model(img)
  batch_features = tf.reshape(batch_features,
                              (batch_features.shape[0], -1, batch_features.shape[3]))

  for bf, p in zip(batch_features, path):
    path_of_feature = p.numpy().decode("utf-8")
    np.save(path_of_feature, bf.numpy())
    #np.save('/content/drive/My Drive/Google TC/embedded_images/'+path_of_feature[12:], bf.numpy())

In [None]:
def calc_max_length(tensor):
    return max(len(t) for t in tensor)

top_k = 5000
tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=top_k,
                                                  oov_token="<unk>",
                                                  filters='!"#$%&()*+.,-/:;=?@[\]^_`{|}~ ')
tokenizer.fit_on_texts(train_captions)
train_seqs = tokenizer.texts_to_sequences(train_captions)

tokenizer.word_index['<pad>'] = 0
tokenizer.index_word[0] = '<pad>'

# Create the tokenized vectors
train_seqs = tokenizer.texts_to_sequences(train_captions)
cap_vector = tf.keras.preprocessing.sequence.pad_sequences(train_seqs, padding='post')
max_length = calc_max_length(train_seqs)

In [None]:
img_name_train, img_name_val, cap_train, cap_val = train_test_split(img_name_vector,
                                                                    cap_vector,
                                                                    test_size=0.0333,
                                                                    random_state=0)

In [None]:
def map_func(img_name, cap):
  img_tensor = np.load(img_name.decode('utf-8')+'.npy')
  return img_tensor, cap

class BahdanauAttention(tf.keras.Model):
  def __init__(self, units):
    super(BahdanauAttention, self).__init__()
    self.W1 = tf.keras.layers.Dense(units)
    self.W2 = tf.keras.layers.Dense(units)
    self.V = tf.keras.layers.Dense(1)

  def call(self, features, hidden):
    # features(CNN_encoder output) shape == (batch_size, 64, embedding_dim)

    # hidden shape == (batch_size, hidden_size)
    # hidden_with_time_axis shape == (batch_size, 1, hidden_size)
    hidden_with_time_axis = tf.expand_dims(hidden, 1)

    # score shape == (batch_size, 64, hidden_size)
    score = tf.nn.tanh(self.W1(features) + self.W2(hidden_with_time_axis))

    # attention_weights shape == (batch_size, 64, 1)
    # you get 1 at the last axis because you are applying score to self.V
    attention_weights = tf.nn.softmax(self.V(score), axis=1)

    # context_vector shape after sum == (batch_size, hidden_size)
    context_vector = attention_weights * features
    context_vector = tf.reduce_sum(context_vector, axis=1)

    return context_vector, attention_weights

class CNN_Encoder(tf.keras.Model):
    # Since you have already extracted the features and dumped it using pickle
    # This encoder passes those features through a Fully connected layer
    def __init__(self, embedding_dim):
        super(CNN_Encoder, self).__init__()
        # shape after fc == (batch_size, 64, embedding_dim)
        self.fc = tf.keras.layers.Dense(embedding_dim)

    def call(self, x):
        x = self.fc(x)
        x = tf.nn.relu(x)
        return x


class RNN_Decoder(tf.keras.Model):
  def __init__(self, embedding_dim, units, vocab_size):
    super(RNN_Decoder, self).__init__()
    self.units = units

    self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
    self.gru = tf.keras.layers.GRU(self.units,
                                   return_sequences=True,
                                   return_state=True,
                                   recurrent_initializer='glorot_uniform')
    self.fc1 = tf.keras.layers.Dense(self.units)
    self.fc2 = tf.keras.layers.Dense(vocab_size)

    self.attention = BahdanauAttention(self.units)

  def call(self, x, features, hidden):
    # defining attention as a separate model
    context_vector, attention_weights = self.attention(features, hidden)

    # x shape after passing through embedding == (batch_size, 1, embedding_dim)
    x = self.embedding(x)

    # x shape after concatenation == (batch_size, 1, embedding_dim + hidden_size)
    x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)

    # passing the concatenated vector to the GRU
    output, state = self.gru(x)

    # shape == (batch_size, max_length, hidden_size)
    x = self.fc1(output)

    # x shape == (batch_size * max_length, hidden_size)
    x = tf.reshape(x, (-1, x.shape[2]))

    # output shape == (batch_size * max_length, vocab)
    x = self.fc2(x)

    return x, state, attention_weights

  def reset_state(self, batch_size):
    return tf.zeros((batch_size, self.units))



Model

In [None]:
BATCH_SIZE = 64
BUFFER_SIZE = 1000
embedding_dim = 256
units = 512
vocab_size = top_k + 1
num_steps = len(img_name_train) // BATCH_SIZE
# Shape of the vector extracted from InceptionV3 is (64, 2048)
# These two variables represent that vector shape
features_shape = 2048
attention_features_shape = 64

In [None]:
dataset = tf.data.Dataset.from_tensor_slices((img_name_train, cap_train))

# Use map to load the numpy files in parallel
dataset = dataset.map(lambda item1, item2: tf.numpy_function(
          map_func, [item1, item2], [tf.float32, tf.int32]),
          num_parallel_calls=tf.data.experimental.AUTOTUNE)

# Shuffle and batch
dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

In [None]:
encoder = CNN_Encoder(embedding_dim)
decoder = RNN_Decoder(embedding_dim, units, vocab_size)

optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')

def loss_function(real, pred):
  mask = tf.math.logical_not(tf.math.equal(real, 0))
  loss_ = loss_object(real, pred)

  mask = tf.cast(mask, dtype=loss_.dtype)
  loss_ *= mask

  return tf.reduce_mean(loss_)

In [None]:
checkpoint_path = "/content/drive/My Drive/Google TC/checkpoint"
ckpt = tf.train.Checkpoint(encoder=encoder,
                           decoder=decoder,
                           optimizer = optimizer)
ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=5)

start_epoch = 0
if ckpt_manager.latest_checkpoint:
  start_epoch = int(ckpt_manager.latest_checkpoint.split('-')[-1])
  # restoring the latest checkpoint in checkpoint_path
  ckpt.restore(ckpt_manager.latest_checkpoint)

Training


In [None]:
loss_plot = []
@tf.function
def train_step(img_tensor, target):
  loss = 0

  # initializing the hidden state for each batch
  # because the captions are not related from image to image
  hidden = decoder.reset_state(batch_size=target.shape[0])

  dec_input = tf.expand_dims([tokenizer.word_index['<start>']] * target.shape[0], 1)

  with tf.GradientTape() as tape:
      features = encoder(img_tensor)

      for i in range(1, target.shape[1]):
          # passing the features through the decoder
          predictions, hidden, _ = decoder(dec_input, features, hidden)

          loss += loss_function(target[:, i], predictions)

          # using teacher forcing
          dec_input = tf.expand_dims(target[:, i], 1)

  total_loss = (loss / int(target.shape[1]))

  trainable_variables = encoder.trainable_variables + decoder.trainable_variables

  gradients = tape.gradient(loss, trainable_variables)

  optimizer.apply_gradients(zip(gradients, trainable_variables))

  return loss, total_loss

  

In [None]:
EPOCHS = 20

for epoch in range(start_epoch, EPOCHS):
    start = time.time()
    total_loss = 0

    for (batch, (img_tensor, target)) in enumerate(dataset):
        batch_loss, t_loss = train_step(img_tensor, target)
        total_loss += t_loss

        if batch % 100 == 0:
            print ('Epoch {} Batch {} Loss {:.4f}'.format(
              epoch + 1, batch, batch_loss.numpy() / int(target.shape[1])))
    # storing the epoch end loss value to plot later
    loss_plot.append(total_loss / num_steps)

    if epoch % 5 == 0:
      ckpt_manager.save()

    print ('Epoch {} Loss {:.6f}'.format(epoch + 1,
                                         total_loss/num_steps))
    print ('Time taken for 1 epoch {} sec\n'.format(time.time() - start))

Epoch 1 Batch 0 Loss 2.0136
Epoch 1 Batch 100 Loss 1.2058
Epoch 1 Batch 200 Loss 0.9206
Epoch 1 Batch 300 Loss 0.9265
Epoch 1 Batch 400 Loss 0.8730
Epoch 1 Loss 1.004596
Time taken for 1 epoch 490.27657866477966 sec

Epoch 2 Batch 0 Loss 0.8471
Epoch 2 Batch 100 Loss 0.8239
Epoch 2 Batch 200 Loss 0.7927
Epoch 2 Batch 300 Loss 0.7866
Epoch 2 Batch 400 Loss 0.8002
Epoch 2 Loss 0.775938
Time taken for 1 epoch 427.7195129394531 sec

Epoch 3 Batch 0 Loss 0.7639
Epoch 3 Batch 100 Loss 0.7760
Epoch 3 Batch 200 Loss 0.6795
Epoch 3 Batch 300 Loss 0.7339
Epoch 3 Batch 400 Loss 0.6436
Epoch 3 Loss 0.710464
Time taken for 1 epoch 446.94011759757996 sec

Epoch 4 Batch 0 Loss 0.7000
Epoch 4 Batch 100 Loss 0.6170
Epoch 4 Batch 200 Loss 0.6576
Epoch 4 Batch 300 Loss 0.7046
Epoch 4 Batch 400 Loss 0.6604
Epoch 4 Loss 0.668746
Time taken for 1 epoch 448.7081129550934 sec

Epoch 5 Batch 0 Loss 0.6978
Epoch 5 Batch 100 Loss 0.7047
Epoch 5 Batch 200 Loss 0.6620
Epoch 5 Batch 300 Loss 0.6342
Epoch 5 Batch 40

In [None]:
def evaluate(image):
    attention_plot = np.zeros((max_length, attention_features_shape))

    hidden = decoder.reset_state(batch_size=1)

    temp_input = tf.expand_dims(load_image(image)[0], 0)
    img_tensor_val = image_features_extract_model(temp_input)
    img_tensor_val = tf.reshape(img_tensor_val, (img_tensor_val.shape[0], -1, img_tensor_val.shape[3]))

    features = encoder(img_tensor_val)

    dec_input = tf.expand_dims([tokenizer.word_index['<start>']], 0)
    result = []

    for i in range(max_length):
        predictions, hidden, attention_weights = decoder(dec_input, features, hidden)

        attention_plot[i] = tf.reshape(attention_weights, (-1, )).numpy()

        predicted_id = tf.random.categorical(predictions, 1)[0][0].numpy()
        result.append(tokenizer.index_word[predicted_id])

        if tokenizer.index_word[predicted_id] == '<end>':
            return result, attention_plot

        dec_input = tf.expand_dims([predicted_id], 0)

    attention_plot = attention_plot[:len(result), :]
    return result, attention_plot


def evaluate_without_plot(image):
    hidden = decoder.reset_state(batch_size=1)

    temp_input = tf.expand_dims(load_image(image)[0], 0)
    img_tensor_val = image_features_extract_model(temp_input)
    img_tensor_val = tf.reshape(img_tensor_val, (img_tensor_val.shape[0], -1, img_tensor_val.shape[3]))

    features = encoder(img_tensor_val)

    dec_input = tf.expand_dims([tokenizer.word_index['<start>']], 0)
    result = []

    for i in range(max_length):
        predictions, hidden, attention_weights = decoder(dec_input, features, hidden)
        predicted_id = tf.random.categorical(predictions, 1)[0][0].numpy()
        result.append(tokenizer.index_word[predicted_id])

        if tokenizer.index_word[predicted_id] == '<end>':
            return result

        dec_input = tf.expand_dims([predicted_id], 0)

    return result

In [None]:
real_captions = []
pred_captions = []

with open('/content/drive/My Drive/Google TC/all_captions.csv', 'w') as file:
    writer = csv.writer(file)
    writer.writerow(["true_caption", "pred_caption"])

    for idx in range(len(img_name_val)):
        r_cap = [tokenizer.index_word[i] for i in cap_val[idx] if i not in [0]][1:-1]
        p_cap = evaluate_without_plot(img_name_val[idx])[:-1]
    
        real_captions.append(r_cap)
        pred_captions.append(p_cap)

        writer.writerow([' '.join(r_cap), ' '.join(p_cap)])

# PART 2


In this part of the code we import the captions file  from the first part and work on the different embedding and similarity techniques.
First we try a $\texttt{tfidf}$ embedding and then different kinds of sentence embeddings. 






In [9]:
import pandas as pd
import numpy as np
import nltk
import csv
import cv2

In [10]:
# import the captions
all_captions = pd.read_csv("/content/drive/My Drive/Google TC/all_captions.csv", sep=',') 

# nested list of captions (list of words lists)
real_captions = [x.split() for x in all_captions['true_caption'].tolist()]
pred_captions = [x.split() for x in all_captions['pred_caption'].tolist()]

# list of captions (list of strings)
pred_captions_list = list(all_captions['pred_caption'].values)
real_captions_list = list(all_captions['true_caption'].values)

#### TFIDF and cosine similarity


Preprocessing step that lemmatize the words of the captions


In [None]:
from nltk.tokenize import word_tokenize
from nltk.corpus import wordnet
from nltk.stem import WordNetLemmatizer 
lemmatizer = WordNetLemmatizer()
import nltk
nltk.download('punkt')
nltk.download('wordnet')
nltk.download('averaged_perceptron_tagger')


# Lemmatize with POS Tag

def get_wordnet_pos(word):

    tag = nltk.pos_tag([word])[0][1][0].upper()
    tag_dict = {"J": wordnet.ADJ,
                "N": wordnet.NOUN,
                "V": wordnet.VERB,
                "R": wordnet.ADV}

    return tag_dict.get(tag, wordnet.NOUN)

def clean_text(sentence):

  words = word_tokenize(sentence)
  clean = ' '.join([lemmatizer.lemmatize(w, get_wordnet_pos(w)) for w in words])

  return clean


for i in range(len(pred_captions_list)):
  sentence = pred_captions_list[i]
  new = clean_text(sentence)
  pred_captions_list[i] = new
for i in range(len(real_captions_list)):
  sentence = real_captions_list[i]
  new = clean_text(sentence)
  real_captions_list[i] = new

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger to
[nltk_data]     /root/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger is already up-to-
[nltk_data]       date!


TfidfVectorizer transforms the captions to the tfidf embedding space

In [None]:
from sklearn.feature_extraction.text import TfidfVectorizer

word_vect = TfidfVectorizer(stop_words='english', strip_accents='ascii',sublinear_tf=True)
word_vect.fit(pred_captions_list)

pred_tfidf = word_vect.transform(pred_captions_list)
real_tfidf = word_vect.transform(real_captions_list)

pred_tfidf = pred_tfidf.toarray()
real_tfidf = real_tfidf.toarray()

Function that computes the cosine similarity, dot product only since the vectors are normalized already

In [None]:
def tfidf_score(idx, real_tfidf, pred_tfidf):
    target = real_tfidf[idx]
    idx_score = []
    for idx in range(1000):
        idx_score.append((idx, np.dot(target, pred_tfidf[idx])))
    idx_score.sort(key=lambda x: x[1], reverse=True)
    return idx_score

#### Word2Vec and WMD

Download the model and the nltk stopwords

In [None]:
from nltk.corpus import stopwords
from nltk import download
import gensim.downloader as api

model = api.load('word2vec-google-news-300')
download('stopwords')  # Download stopwords list.
stop_words = stopwords.words('english')

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


Remove stopwords

In [None]:
pred_captions_list_sw = []
real_captions_list_sw = []

for i in range(len(pred_captions_list)):
    pred_captions_list_sw.append([w for w in pred_captions_list[i].lower().split() if w not in stop_words])
    real_captions_list_sw.append([w for w in real_captions_list[i].lower().split() if w not in stop_words])

Function that returns the dissimilarity based on the wm distance 

In [None]:
model.init_sims(replace=True) 
def wmd_score(idx, real_captions, pred_captions):
    target = real_captions[idx]
    idx_score = []
    for idx in range(1000):
        idx_score.append((idx, model.wmdistance(target, pred_captions[idx])))
    idx_score.sort(key=lambda x: x[1])
    return idx_score

#### BERT

In [None]:
pip install sentence-transformers

In [None]:
from sentence_transformers import SentenceTransformer
sbert_model = SentenceTransformer('bert-base-nli-mean-tokens')

100%|██████████| 405M/405M [00:05<00:00, 78.7MB/s]


Encode the captions using BERT

In [None]:
pred_bert = sbert_model.encode(pred_captions_list)
real_bert = sbert_model.encode(real_captions_list)

Functions that return the score given by the cosine similarity or the l2 norm

In [19]:
def cos_sim(u, v):
    return np.dot(u, v) / (np.linalg.norm(u) * np.linalg.norm(v))
def l2_norm(u,v):
    return np.linalg.norm(u-v)
def bert_score(idx, real_captions, pred_captions):
    target = real_captions[idx]
    idx_score = []
    for idx in range(1000):
        idx_score.append((idx, cos_sim(target, pred_captions[idx])))
    idx_score.sort(key=lambda x: x[1], reverse=True)
    return idx_score

#### USE

In [13]:
import tensorflow_hub as hub

module_url = "https://tfhub.dev/google/universal-sentence-encoder/4" 
model = hub.load(module_url)

In [23]:
# text preprocessing: remowe stopwords and stemming

'''

nltk.download('punkt', quiet=True)
nltk.download('stopwords', quiet=True)

from nltk.corpus import stopwords
from nltk.stem import PorterStemmer

stop_words = stopwords.words('english')
stemmer = PorterStemmer()

def string_processing (string):
    return stemmer.stem(' '.join([word for word in string.split() if word not in stop_words]))

for i in range(len(pred_captions_list)):
    pred_captions_list[i] = string_processing(pred_captions_list[i])

for i in range(len(real_captions_list)):
    real_captions_list[i] = string_processing(real_captions_list[i])

'''

Embedding of the captions with USE

In [15]:
pred_use = model(pred_captions_list)
real_use = model(real_captions_list)

Functions that compute the scores based on cosine similarity 


In [16]:
def use_score(idx, real_captions, pred_captions):
    target = real_captions[idx]
    idx_score = []
    for idx in range(1000):
        idx_score.append((idx, cos_sim(target, pred_captions[idx])))
    idx_score.sort(key=lambda x: x[1], reverse=True)
    return idx_score
def use_score_euc(idx, real_captions, pred_captions):
    target = real_captions[idx]
    idx_score = []
    for idx in range(1000):
        idx_score.append((idx, l2_norm(target, pred_captions[idx])))
    idx_score.sort(key=lambda x: x[1])
    return idx_score   

#### SUBMISSION

Function that creates the submission file. In the code it is necessary to specify the score used. In this case we are using $\texttt{use_score_euc}$.

In [25]:
def create_submission_file(real_captions, pred_captions):

    with open('/content/drive/My Drive/Google TC/submission_use_stem.csv', 'w') as file:
        writer = csv.writer(file)
        writer.writerow(["caption", "image_list"])

        for idx in range(1000):

            b_score_res = use_score(idx, real_use, pred_use)

            writer.writerow([' '.join(real_captions[idx]), ' '.join(list(map(lambda x: str(x[0]), b_score_res[:5000])))])
            if (idx+1) % 100 ==0:
                print('rows written ',idx)

In [None]:
create_submission_file(real_captions, pred_captions)