In [13]:
#All imports
import numpy as np
from mpl_toolkits.mplot3d import Axes3D
import matplotlib.pyplot as plt
import collections
from google.colab import drive
import os

#Load the dataset
drive.mount('/content/drive',force_remount=True)
directory_path = "/content/drive/My Drive/image_captions"

caption_path = os.path.join(directory_path, 'captions.txt')
caption_dict = {}
image_file_names = []
with open(caption_path, 'r') as file:
  lines = file.readlines()
  # because first line only contains heading image,caption
  # taking only 100 pics as whole set
  for line in lines[1:101]:
    line = line.strip()
    image_path, caption = line.split(',', maxsplit=1)
    caption_dict[image_path.split('.')[0]] = caption.split(';')
    image_file_names.append(image_path.split('.')[0])

print(caption_dict)
print(image_file_names)

Mounted at /content/drive
{'1000268201_693b08cb0e': ['A little girl in a pink dress going into a wooden cabin .'], '1001773457_577c3a7d70': ['Two dogs on pavement moving toward each other .'], '1002674143_1b742ab4b8': ['Young girl with pigtails painting outside in the grass .'], '1003163366_44323f5815': ['man laying on bench holding leash of dog sitting on ground'], '1007129816_e794419615': ['The man with pierced ears is wearing glasses and an orange hat .'], '1007320043_627395c3d8': ['The small child climbs on a red ropes on a playground .'], '1009434119_febe49276a': ['A dog runs on the green grass near a wooden fence .'], '1012212859_01547e3f17': ['White dog with brown ears standing near water with head turned to one side .'], '1015118661_980735411b': ['Smiling boy in white shirt and blue jeans in front of rock wall with man in overalls behind him .'], '1015584366_dfcec3c85a': ['The black dog jumped the tree stump .'], '101654506_8eb26cfb60': ['The white and brown dog is running over

In [10]:
import random

indices = list(range(len(image_file_names)))
random.shuffle(indices)
train_index = int(0.6*len(image_file_names))
validation_index = int(0.8*len(image_file_names))

# Spitting data
training_file_names = [image_file_names[i] for i in indices[: train_index]]
validation_file_names = [image_file_names[i] for i in indices[train_index : validation_index]]
testing_file_names = [image_file_names[i] for i in indices[validation_index:]]

In [11]:
import numpy as np
import tensorflow as tf

def load_image(image_path):
    img = tf.io.read_file(image_path)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.image.resize(img, (299, 299))
    img = tf.keras.applications.inception_v3.preprocess_input(img)
    return img, image_path

image_model = tf.keras.applications.InceptionV3(include_top=False, weights='imagenet')
new_input = image_model.input
hidden_layer = image_model.layers[-1].output

image_features_extract_model = tf.keras.Model(new_input, hidden_layer)

from tqdm import tqdm

image_dir = os.path.join(directory_path, 'Images/')
#because need both for training
training_image_paths = [image_dir + name + '.jpg' for name in (training_file_names+ validation_file_names)]

# Get unique images
encode_train = sorted(set(training_image_paths))

# Feel free to change batch_size according to your system configuration
image_dataset = tf.data.Dataset.from_tensor_slices(encode_train)
image_dataset = image_dataset.map(
  load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE).batch(16)

for img, path in tqdm(image_dataset):
  batch_features = image_features_extract_model(img)
  batch_features = tf.reshape(batch_features,
                              (batch_features.shape[0], -1, batch_features.shape[3]))

  for bf, p in zip(batch_features, path):
    path_of_feature = p.numpy().decode("utf-8")
    np.save(path_of_feature, bf.numpy())

100%|██████████| 2/2 [00:08<00:00,  4.10s/it]


In [14]:
from keras.preprocessing.text import Tokenizer
from keras.preprocessing.sequence import pad_sequences
import re

# Clean the captions data
#    Convert all words to lowercase.
#    Remove all punctuation.
#    Remove all words that are one character or less in length (e.g. ‘a’).
#    Remove all words with numbers in them.
def captions_clean (image_dict):
  for key, captions in image_dict.items():
    for i, caption in enumerate (captions):
      # Convert the caption to lowercase, and then removing all special characters from it
      caption_nopunct = re.sub(r"[^a-zA-Z0-9]+", ' ', caption.lower())
      # Split the caption into separate words, and collect all words which are more than
      # one character and which contain only alphabets (ie. discard words with mixed alpha-numerics)
      clean_words = [word for word in caption_nopunct.split() if ((len(word) > 1) and (word.isalpha()))]
      # Join those words into a string
      caption_new = ' '.join(clean_words)
      # Replace the old caption in the captions list with this new cleaned caption
      captions[i] = caption_new

# Add two tokens, 'startseq' and 'endseq' at the beginning and end respectively, of every caption
def add_token (captions):
  for i, caption in enumerate (captions):
    captions[i] = 'startseq ' + caption + ' endseq'
  return (captions)

# Given a set of training, validation or testing image names, return a dictionary
# containing the corresponding subset from the full dictionary of images with captions
# This returned subset has the same structure as the full dictionary
def subset_data_dict (image_dict, image_names):
  dict = { image_name:add_token(captions) for image_name,captions in image_dict.items() if image_name in image_names}
  return (dict)

def subset_data_dict_notoken (image_dict, image_names):
  dict = { image_name:captions for image_name,captions in image_dict.items() if image_name in image_names}
  return (dict)

# Flat list of all captions
def all_captions (data_dict):
  return ([caption for key, captions in data_dict.items() for caption in captions])

# Calculate the word-length of the caption with the most words
def max_caption_length(captions):
  return max(len(caption.split()) for caption in captions)

# Fitting a Keras tokenizer given caption descriptions
# The tokenizer uses the captions to learn a mapping from words to numeric word indices
def create_tokenizer(data_dict):
  captions = all_captions(data_dict)
  max_caption_words = max_caption_length(captions)
  # Initialise a Keras Tokenizer
  tokenizer = Tokenizer()
  # Fit it on the captions so that it prepares a vocabulary of all words
  tokenizer.fit_on_texts(captions)
  # Get the size of the vocabulary
  vocab_size = len(tokenizer.word_index) + 1
  return (tokenizer, vocab_size, max_caption_words)

print(caption_dict)
captions_clean(caption_dict)
print(caption_dict)
training_dict = subset_data_dict (caption_dict, training_file_names)
testing_dict = subset_data_dict_notoken (caption_dict, validation_file_names)
print(caption_dict)

# Prepare tokenizer
tokenizer, vocab_size, max_caption_words = create_tokenizer(training_dict)
print(vocab_size, max_caption_words)
print(training_dict)
print(testing_dict)

{'1000268201_693b08cb0e': ['A little girl in a pink dress going into a wooden cabin .'], '1001773457_577c3a7d70': ['Two dogs on pavement moving toward each other .'], '1002674143_1b742ab4b8': ['Young girl with pigtails painting outside in the grass .'], '1003163366_44323f5815': ['man laying on bench holding leash of dog sitting on ground'], '1007129816_e794419615': ['The man with pierced ears is wearing glasses and an orange hat .'], '1007320043_627395c3d8': ['The small child climbs on a red ropes on a playground .'], '1009434119_febe49276a': ['A dog runs on the green grass near a wooden fence .'], '1012212859_01547e3f17': ['White dog with brown ears standing near water with head turned to one side .'], '1015118661_980735411b': ['Smiling boy in white shirt and blue jeans in front of rock wall with man in overalls behind him .'], '1015584366_dfcec3c85a': ['The black dog jumped the tree stump .'], '101654506_8eb26cfb60': ['The white and brown dog is running over the surface of the snow .

In [15]:
print("Example tokens:")
for word, index in list(tokenizer.word_index.items())[:10]:
    print(f"{word}: {index}")
example_caption = next(iter(training_dict.values()))[0]  # Get the first caption from the training set
encoded_example_caption = tokenizer.texts_to_sequences([example_caption])[0]
print("Example caption:", example_caption)
print("Encoded example caption:", encoded_example_caption)

Example tokens:
startseq: 1
endseq: 2
the: 3
in: 4
dog: 5
on: 6
with: 7
man: 8
and: 9
white: 10
Example caption: startseq little girl in pink dress going into wooden cabin endseq
Encoded example caption: [1, 30, 19, 4, 31, 32, 33, 34, 20, 35, 2]


In [16]:
# Extend a list of text indices to a given fixed length
def pad_text (text, max_length):
  text = pad_sequences([text], maxlen=max_length, padding='post')[0]
  return (text)

def data_prep(data_dict, tokenizer, max_length, vocab_size):
  X, y = list(), list()

  # For each image and list of captions
  for image_name, captions in data_dict.items():
    image_name = image_dir + image_name + '.jpg'
    # For each caption in the list of captions
    for caption in captions:
      # Convert the caption words into a list of word indices
      word_idxs = tokenizer.texts_to_sequences([caption])[0]
      # Pad the input text to the same fixed length
      pad_idxs = pad_text(word_idxs, max_length)
      X.append(image_name)
      y.append(pad_idxs)
  return np.array(X), np.array(y)

train_X, train_y = data_prep(training_dict, tokenizer, max_caption_words, vocab_size)
print(train_X, train_y)
test_X, test_y = data_prep(testing_dict, tokenizer, max_caption_words, vocab_size)
print(test_X, test_y)

['/content/drive/My Drive/image_captions/Images/1000268201_693b08cb0e.jpg'
 '/content/drive/My Drive/image_captions/Images/1001773457_577c3a7d70.jpg'
 '/content/drive/My Drive/image_captions/Images/1002674143_1b742ab4b8.jpg'
 '/content/drive/My Drive/image_captions/Images/1003163366_44323f5815.jpg'
 '/content/drive/My Drive/image_captions/Images/1007129816_e794419615.jpg'
 '/content/drive/My Drive/image_captions/Images/1007320043_627395c3d8.jpg'
 '/content/drive/My Drive/image_captions/Images/1009434119_febe49276a.jpg'
 '/content/drive/My Drive/image_captions/Images/1012212859_01547e3f17.jpg'
 '/content/drive/My Drive/image_captions/Images/1015118661_980735411b.jpg'
 '/content/drive/My Drive/image_captions/Images/101654506_8eb26cfb60.jpg'
 '/content/drive/My Drive/image_captions/Images/101669240_b2d3e7f17b.jpg'
 '/content/drive/My Drive/image_captions/Images/1016887272_03199f49c4.jpg'
 '/content/drive/My Drive/image_captions/Images/1019077836_6fc9b15408.jpg'
 '/content/drive/My Drive/i

In [20]:
#BATCH_SIZE = 64
BATCH_SIZE = 10
BUFFER_SIZE = 1000

# Load the numpy files
def map_func(img_name, cap):
   img_tensor = np.load(img_name.decode('utf-8')+'.npy')
   return img_tensor, cap

dataset = tf.data.Dataset.from_tensor_slices((train_X, train_y))
# Use map to load the numpy files in parallel
dataset = dataset.map(lambda item1, item2: tf.numpy_function(map_func, [item1, item2], [tf.float32, tf.int32]),num_parallel_calls=tf.data.experimental.AUTOTUNE)
# Shuffle and batch
dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

test_dataset = tf.data.Dataset.from_tensor_slices((test_X, test_y))
# Use map to load the numpy files in parallel
test_dataset = test_dataset.map(lambda item1, item2: tf.numpy_function(map_func, [item1, item2], [tf.float32, tf.int32]),num_parallel_calls=tf.data.experimental.AUTOTUNE)
# Shuffle and batch
test_dataset = test_dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
test_dataset = test_dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

In [21]:
class BahdanauAttention(tf.keras.Model):
  def __init__(self, units):
    super(BahdanauAttention, self).__init__()
    self.W1 = tf.keras.layers.Dense(units)
    self.W2 = tf.keras.layers.Dense(units)
    self.V = tf.keras.layers.Dense(1)

  def call(self, features, hidden):
    # features(CNN_encoder output) shape == (batch_size, 64, embedding_dim)
    # hidden shape == (batch_size, hidden_size)
    # hidden_with_time_axis shape == (batch_size, 1, hidden_size)
    hidden_with_time_axis = tf.expand_dims(hidden, 1)
    # attention_hidden_layer shape == (batch_size, 64, units)
    attention_hidden_layer = (tf.nn.tanh(self.W1(features) +
                                         self.W2(hidden_with_time_axis)))
    # score shape == (batch_size, 64, 1)
    # This gives you an unnormalized score for each image feature.
    score = self.V(attention_hidden_layer)
    # attention_weights shape == (batch_size, 64, 1)
    attention_weights = tf.nn.softmax(score, axis=1)
    # context_vector shape after sum == (batch_size, hidden_size)
    context_vector = attention_weights * features
    context_vector = tf.reduce_sum(context_vector, axis=1)
    return context_vector, attention_weights

class CNN_Encoder(tf.keras.Model):
    # Since you have already extracted the features and dumped it
    # This encoder passes those features through a Fully connected layer
    def __init__(self, embedding_dim):
        super(CNN_Encoder, self).__init__()
        # shape after fc == (batch_size, 64, embedding_dim)
        self.fc = tf.keras.layers.Dense(embedding_dim)

    def call(self, x):
        x = self.fc(x)
        x = tf.nn.relu(x)
        return x

class RNN_Decoder(tf.keras.Model):
  def __init__(self, embedding_dim, units, vocab_size):
    super(RNN_Decoder, self).__init__()
    self.units = units
    self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
    self.gru = tf.keras.layers.GRU(self.units,
                                   return_sequences=True,
                                   return_state=True,
                                   recurrent_initializer='glorot_uniform')
    self.fc1 = tf.keras.layers.Dense(self.units)
    self.fc2 = tf.keras.layers.Dense(vocab_size)
    self.attention = BahdanauAttention(self.units)

  def call(self, x, features, hidden):
    # defining attention as a separate model
    context_vector, attention_weights = self.attention(features, hidden)
    # x shape after passing through embedding == (batch_size, 1, embedding_dim)
    x = self.embedding(x)
    # x shape after concatenation == (batch_size, 1, embedding_dim + hidden_size)
    x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)
    # passing the concatenated vector to the GRU
    output, state = self.gru(x)
    # shape == (batch_size, max_length, hidden_size)
    x = self.fc1(output)
    # x shape == (batch_size * max_length, hidden_size)
    x = tf.reshape(x, (-1, x.shape[2]))
    # output shape == (batch_size * max_length, vocab)
    x = self.fc2(x)
    return x, state, attention_weights

  def reset_state(self, batch_size):
    return tf.zeros((batch_size, self.units))

In [22]:
embedding_dim = 256
units = 512
vocab_size = vocab_size
num_steps = len(train_X) // BATCH_SIZE
num_val_steps = len(test_X) // BATCH_SIZE
# Shape of the vector extracted from InceptionV3 is (64, 2048)
# These two variables represent that vector shape
features_shape = 2048
attention_features_shape = 64
encoder = CNN_Encoder(embedding_dim)
decoder = RNN_Decoder(embedding_dim, units, vocab_size)
optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(from_logits=True, reduction='none')

def loss_function(real, pred):
  mask = tf.math.logical_not(tf.math.equal(real, 0))
  loss_ = loss_object(real, pred)
  mask = tf.cast(mask, dtype=loss_.dtype)
  loss_ *= mask
  return tf.reduce_mean(loss_)

loss_plot = []
@tf.function
def train_step(img_tensor, target):
  loss = 0
  # initializing the hidden state for each batch
  # because the captions are not related from image to image
  hidden = decoder.reset_state(batch_size=target.shape[0])
  dec_input = tf.expand_dims([tokenizer.word_index['startseq']] * target.shape[0], 1)
  with tf.GradientTape() as tape:
      features = encoder(img_tensor)
      for i in range(1, target.shape[1]):
          # passing the features through the decoder
          predictions, hidden, _ = decoder(dec_input, features, hidden)
          loss += loss_function(target[:, i], predictions)
          # using teacher forcing
          dec_input = tf.expand_dims(target[:, i], 1)
  total_loss = (loss / int(target.shape[1]))
  trainable_variables = encoder.trainable_variables + decoder.trainable_variables
  gradients = tape.gradient(loss, trainable_variables)
  optimizer.apply_gradients(zip(gradients, trainable_variables))
  return loss, total_loss

@tf.function
def validation_step(img_tensor, target):
    loss = 0
    hidden = decoder.reset_state(batch_size=target.shape[0])
    dec_input = tf.expand_dims([tokenizer.word_index['startseq']] * target.shape[0], 1)
    features = encoder(img_tensor)
    for i in range(1, target.shape[1]):
        predictions, hidden, _ = decoder(dec_input, features, hidden)
        loss += loss_function(target[:, i], predictions)
        dec_input = tf.expand_dims(target[:, i], 1)
    total_loss = (loss / int(target.shape[1]))
    return loss, total_loss

import time
start_epoch = 0
best_val_loss = float('inf')
patience = 3
patience_counter = 0
EPOCHS = 20

for epoch in range(start_epoch, EPOCHS):
    start = time.time()
    total_loss = 0
    for (batch, (img_tensor, target)) in enumerate(dataset):
        batch_loss, t_loss = train_step(img_tensor, target)
        total_loss += t_loss
        if batch % 50 == 0:
            average_batch_loss = batch_loss.numpy()/int(target.shape[1])
            print(f'Epoch {epoch+1} Batch {batch} Loss {average_batch_loss:.4f}')
    # storing the epoch end loss value to plot later
    loss_plot.append(total_loss / num_steps)

    val_total_loss = 0
    for (batch, (val_img_tensor, val_target)) in enumerate(test_dataset):
        val_batch_loss, val_t_loss = validation_step(val_img_tensor, val_target)
        val_total_loss += val_t_loss
    val_loss = val_total_loss / num_val_steps

    print(f'Epoch {epoch+1} Loss {total_loss/num_steps:.6f}')
    print(f'Time taken for 1 epoch {time.time()-start:.2f} sec')
    print(f'Epoch {epoch+1} Validation Loss {val_loss:.6f}\n')

    if val_loss < best_val_loss:
        best_val_loss = val_loss
        patience_counter = 0
    else:
        patience_counter += 1
        if patience_counter >= patience:
            print(f'Early stopping at epoch {epoch+1}')
            break

Epoch 1 Batch 0 Loss 2.8128
Epoch 1 Loss 5.395481
Time taken for 1 epoch 98.49 sec
Epoch 1 Validation Loss 5.266945

Epoch 2 Batch 0 Loss 2.3928
Epoch 2 Loss 5.210311
Time taken for 1 epoch 2.26 sec
Epoch 2 Validation Loss 5.429208

Epoch 3 Batch 0 Loss 2.5126
Epoch 3 Loss 5.089406
Time taken for 1 epoch 1.55 sec
Epoch 3 Validation Loss 4.909826

Epoch 4 Batch 0 Loss 2.4645
Epoch 4 Loss 5.017128
Time taken for 1 epoch 1.54 sec
Epoch 4 Validation Loss 4.518634

Epoch 5 Batch 0 Loss 2.2680
Epoch 5 Loss 4.877752
Time taken for 1 epoch 1.42 sec
Epoch 5 Validation Loss 4.473413

Epoch 6 Batch 0 Loss 2.4586
Epoch 6 Loss 4.658760
Time taken for 1 epoch 1.32 sec
Epoch 6 Validation Loss 4.198400

Epoch 7 Batch 0 Loss 2.2107
Epoch 7 Loss 4.598165
Time taken for 1 epoch 1.38 sec
Epoch 7 Validation Loss 3.970387

Epoch 8 Batch 0 Loss 2.1075
Epoch 8 Loss 4.414136
Time taken for 1 epoch 1.63 sec
Epoch 8 Validation Loss 3.851125

Epoch 9 Batch 0 Loss 2.0547
Epoch 9 Loss 4.174045
Time taken for 1 epoc

In [23]:
!pip install nltk rouge-score
import nltk
from nltk.translate.bleu_score import sentence_bleu, SmoothingFunction
from nltk.translate.meteor_score import meteor_score
from rouge_score import rouge_scorer

def calculate_bleu(reference, candidate):
    smoothie = SmoothingFunction().method4
    return sentence_bleu([reference], candidate, smoothing_function=smoothie)

def evaluate(image, max_length):
    attention_plot = np.zeros((max_length, attention_features_shape))
    hidden = decoder.reset_state(batch_size=1)
    temp_input = tf.expand_dims(load_image(image)[0], 0)
    img_tensor_val = image_features_extract_model(temp_input)
    img_tensor_val = tf.reshape(img_tensor_val, (img_tensor_val.shape[0],-1,img_tensor_val.shape[3]))
    features = encoder(img_tensor_val)
    dec_input = tf.expand_dims([tokenizer.word_index['startseq']], 0)
    result = []
    for i in range(max_length):
        predictions, hidden, attention_weights = decoder(dec_input,features,hidden)
        attention_plot[i] = tf.reshape(attention_weights, (-1, )).numpy()
        predicted_id = tf.random.categorical(predictions, 1)[0][0].numpy()
        if (predicted_id == 0):
            return result, attention_plot
        if (tokenizer.index_word[predicted_id] == 'endseq'):
            return result, attention_plot
        result.append(tokenizer.index_word[predicted_id])
        dec_input = tf.expand_dims([predicted_id], 0)
    attention_plot = attention_plot[:len(result), :]
    return result, attention_plot

def check_test(test_image_names, image_dict, image_dir, max_caption_words):
  bleu_scores = []
  for rid in range(0, len(test_image_names)):
    image_name = test_image_names[rid]
    real_caption = ' '.join([caption.replace('startseq ', '').replace(' endseq', '') for caption in image_dict[image_name]])
    image_path = image_dir + image_name + '.jpg'
    result, attention_plot = evaluate(image_path, max_caption_words)
    #from IPython.display import Image, display
    #display(Image(image_path))
    print('Image Name: ', image_name)
    print('Real Caption:', real_caption)
    print('Prediction Caption:', ' '.join(result))
    bleu = calculate_bleu(real_caption, ' '.join(result))
    bleu_scores.append(bleu)
    print()
  return bleu_scores

bleu_scores = check_test(testing_file_names, caption_dict, image_dir, max_caption_words)
print(bleu_scores)
print(f'Average BLEU score :{sum(bleu_scores)/len(bleu_scores):.6f}')

Image Name:  101669240_b2d3e7f17b
Real Caption: man on skis looking at artwork for sale in the snow
Prediction Caption: in skis sale dog looking at looking for sale sale for skis man on looking skis artwork looking at for looking

Image Name:  1015118661_980735411b
Real Caption: smiling boy in white shirt and blue jeans in front of rock wall with man in overalls behind him
Prediction Caption: smiling behind boy in man shirt him him

Image Name:  101654506_8eb26cfb60
Real Caption: the white and brown dog is running over the surface of the snow
Prediction Caption: the brown over brown over the white brown over brown surface running is brown brown the over and snow and the

Image Name:  1015584366_dfcec3c85a
Real Caption: the black dog jumped the tree stump
Prediction Caption: the large

Image Name:  1007320043_627395c3d8
Real Caption: the small child climbs on red ropes on playground
Prediction Caption: the on the toward on the on small on climbs on on red child small on red climbs on re