<a href="https://colab.research.google.com/github/errpv78/Scene-Depiction/blob/master/Image_captioning_attention.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Image Captioning with Visual Attention Model

The model architecture is similar to [Show, Attend and Tell: Neural Image Caption Generation with Visual Attention](https://arxiv.org/abs/1502.03044).

Dataset: [MS-COCO](http://cocodataset.org/#home) dataset.<br>
Steps: Preprocesse and caches a subset of images using Inception V3, train an encoder-decoder model, and generate captions on new images using the trained model.


## Show, Attend and Tell: Neural Image Caption Generation with Visual Attention



In [None]:
# Clearing Some Disk Space
!ls
!du -sh sample_data
!rm -R sample_data
!ls

sample_data
55M	sample_data


## Importing modules and packages

In [None]:
# Importing Necessary Packages
import tensorflow as tf
import matplotlib.pyplot as plt
from sklearn.model_selection import train_test_split
from sklearn.utils import shuffle
import re
import numpy as np
import os
import time
import json
from glob import glob
from PIL import Image
import pickle
from tqdm.notebook import tqdm

## Downloading and preparing the MS-COCO dataset

Training set: 13GB file

In [None]:
# Downloading Data

# Downloading Ccaptions
annotation_folder = '/annotations/'

"""# fname: Name of file. If absolute path /path/to/file.txt is specified the file will be 
saved at that location.
# origin: Original URL of the file.
# cache_subdir: Subdirectory under the Keras cache dir where the file is saved. If an absolute
path /path/to/folder is specified the file will be saved at that location.
# extract: True tries extracting the file as an Archive, like tar or zip."""
if not os.path.exists(os.path.abspath('.') + annotation_folder):
    annotation_zip = tf.keras.utils.get_file('captions.zip',
                                        cache_subdir=os.path.abspath('.'),
                                        origin = 'http://images.cocodataset.org/annotations/annotations_trainval2014.zip',
                                        extract = True)

annotation_file = os.path.dirname(annotation_zip)+'/annotations/captions_train2014.json'

# Removing Zip File After Extraction
os.remove(annotation_zip)

# Downloading Images
image_folder = '/train2014/'
image_zip = tf.keras.utils.get_file('train2014.zip',
                                    cache_subdir=os.path.abspath('.'),
                                    origin = 'http://images.cocodataset.org/zips/train2014.zip',
                                    extract = True)
PATH = os.path.dirname(image_zip) + image_folder

# Removing Zip File After Extraction
os.remove(image_zip)

Downloading data from http://images.cocodataset.org/annotations/annotations_trainval2014.zip
Downloading data from http://images.cocodataset.org/zips/train2014.zip


In [None]:
# Checking Directory Contents And Dataset Size
!ls
!du -sh train2014
!du -sh annotations

annotations  train2014
13G	train2014
806M	annotations


In [None]:
!df -h .

Filesystem      Size  Used Avail Use% Mounted on
overlay          69G   44G   22G  68% /


## Limiting the size of the training set 
Current data size = 50000


In [None]:
# Reading Json File For Captions
with open(annotation_file, 'r') as f:
    annotations = json.load(f)

# Storing Captions And Image Names In Vectors
all_captions = []
all_img_name_vector = []

for annot in annotations['annotations']:
    # Inserting Start And End Tokens To Know Where To Start And Stop Caption
    caption = '<start> ' + annot['caption'] + ' <end>'
    image_id = annot['image_id']
    full_coco_image_path = PATH + 'COCO_train2014_' + '%012d.jpg' % (image_id)

    all_img_name_vector.append(full_coco_image_path)
    all_captions.append(caption)


# Shuffling Captions And Image Names Together
train_captions, img_name_vector = shuffle(all_captions,
                                          all_img_name_vector,
                                          random_state=78)


# Selecting First 50000 Captions From Shuffled Set
num_examples = 50000
train_captions = train_captions[:num_examples]
img_name_vector = img_name_vector[:num_examples]

In [None]:
# Checking Sample Data Size
print(len(train_captions), "out of", len(all_captions), "example.")

50000 out of 414113 example.


## Preprocessing the images using InceptionV3
Extracting image features using InceptionV3(which is pretrained on imagenet) from its last covolution layer.

Input format for InceptionV3
* Image size to 299px by 299px
* Normalize the image so that it contains pixels in the range of -1 to 1, which matches the format of the images used to train InceptionV3.

In [None]:
# Making Images Compatible To InceptionV3 Input Format
"""1, Resizing the image to 299px by 299px
2. Preprocess the images using the preprocess_input method to normalize the 
image so that it contains pixels in the range of -1 to 1, which matches the 
format of the images used to train InceptionV3."""

def load_image(image_path):
    # Reading entire contents of input filename.
    img = tf.io.read_file(image_path) 

    # Decoding JPEG-encoded image to uint8 tensor.
    img = tf.image.decode_jpeg(img, channels=3)

    # Resizing image to compatible Inception model size
    img = tf.image.resize(img, (299, 299))

    # Preprocessesing tensor or Numpy array encoding batch of images.
    img = tf.keras.applications.inception_v3.preprocess_input(img)
    return img, image_path

## Initializing InceptionV3 and loading pretrained Imagenet weights

Steps:
* Forwarding each image through the network and storing resulting vector in dictionary (image_name --> feature_vector).

* After all images are passed through network, dictionary is pickled and saved to disk.


In [None]:
# Loading InceptionV3 Model
"""# include_top: Boolean, whether to include the fully-connected layer at the top, as last layer
of network. Default to True.
# weights: One of None (random initialization), imagenet (pre-training on ImageNet), or path
to weights file to be loaded. Default to imagenet"""
image_model = tf.keras.applications.InceptionV3(include_top=False,
                                                weights='imagenet')
new_input = image_model.input
hidden_layer = image_model.layers[-1].output

image_features_extract_model = tf.keras.Model(new_input, hidden_layer)

Downloading data from https://storage.googleapis.com/tensorflow/keras-applications/inception_v3/inception_v3_weights_tf_dim_ordering_tf_kernels_notop.h5


## Caching the features extracted from InceptionV3

In [None]:
# Getting Unique Images From 50,000 Data

# Sorting Images By Name
encode_train = sorted(set(img_name_vector))

""" tf.data.Dataset API supports writing descriptive and efficient input pipelines.
1. Create a source dataset from your input data.
2. Apply dataset transformations to preprocess the data.
3. Iterate over the dataset and process the elements.
# from_tensor_slices creates a dataset with a separate element for each row of the input tensor"""
image_dataset = tf.data.Dataset.from_tensor_slices(encode_train)
image_dataset = image_dataset.map(
  load_image, num_parallel_calls=tf.data.experimental.AUTOTUNE).batch(32)

for img, path in tqdm(image_dataset):
  batch_features = image_features_extract_model(img)
  batch_features = tf.reshape(batch_features,
                              (batch_features.shape[0], -1, batch_features.shape[3]))

  for bf, p in zip(batch_features, path):
    path_of_feature = p.numpy().decode("utf-8")
    np.save(path_of_feature, bf.numpy())

HBox(children=(FloatProgress(value=0.0, max=1228.0), HTML(value='')))




## Preprocessing and tokenizing the captions

* Tokeinizing: Assigning tokens to each unique word
* Limitting size of vocab to 5000 words
* Replacing other words with unknown 
* Creating word-to-index and index-to-word mappings.
* Padding all sequences to make them of same length as the longest one.

In [None]:
# Finding Maximum Length Of Any Caption In Dataset
def calc_max_length(tensor):
    return max(len(t) for t in tensor)

In [None]:
# Choosing Top 5000 Words From Vocabulary By Frequency Count
top_k = 5000

# Text Tokeinization
"""# num_words: maximum number of words to keep, based on word frequency. Only most common `
num_words-1` words will be kept.
# filters: string where each element is a character that will be filtered from texts. 
Default is all punctuation, plus tabs and line breaks, minus `'` character.
# oov_token: if given, it will be added to word_index and used to replace out-of-vocabulary
words during text_to_sequence calls."""
tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=top_k,
                                                  oov_token="<unknown>",
                                                  filters='!"#$%&()*+.,-/:;=?@[\]^_`{|}~ ')
tokenizer.fit_on_texts(train_captions)
train_seqs = tokenizer.texts_to_sequences(train_captions)

In [None]:
# Adding Token 0
tokenizer.word_index['<pad>'] = 0
tokenizer.index_word[0] = '<pad>'

In [None]:
# Creating Tokenized Vectors
train_seqs = tokenizer.texts_to_sequences(train_captions)

In [None]:
# Connecting With Drive To Store Tokenizer.pickle and Checkpoints
from google.colab import drive
drive.mount('/content/gdrive')
import os
os.environ['KAGGLE_CONFIG_DIR'] = "drive/My Drive/Kaggle"

# Changing Working Directory
%cd gdrive/My Drive/Kaggle/
%cd Image Captioning/Visual_Attention_MSCOCO
!ls

Mounted at /content/gdrive
/content/gdrive/My Drive/Kaggle
/content/gdrive/My Drive/Kaggle/Image Captioning/Visual_Attention_MSCOCO
tokenizer.pickle


In [None]:
# Saving Tokenizer
with open('tokenizer.pickle', 'wb') as handle:
    pickle.dump(tokenizer, handle, protocol=pickle.HIGHEST_PROTOCOL)

In [None]:
# Loading Tokenizer
with open('tokenizer.pickle', 'rb') as handle:
    tokenizer = pickle.load(handle)
tokenizer.word_index['<start>']    

3

In [None]:
# Padding Each Vector To Max_length Of Captions
cap_vector = tf.keras.preprocessing.sequence.pad_sequences(train_seqs, padding='post')

In [None]:
# Calculating Max_length To Store Attention Weights
max_length = calc_max_length(train_seqs)
print("Maximum caption length:", max_length)

Maximum caption length: 51


## Spliting training and testing data

In [None]:
# Creating Training And Validation Sets Using 80-20 Split
img_name_train, img_name_val, cap_train, cap_val = train_test_split(img_name_vector,
                                                                    cap_vector,
                                                                    test_size=0.2,
                                                                    random_state=0)

In [None]:
# Checking Dataset Size
len(img_name_train), len(cap_train), len(img_name_val), len(cap_val)

(40000, 40000, 10000, 10000)

## Creating a tf.data dataset for training

In [None]:
# Initializing Parameters
BATCH_SIZE = 128
BUFFER_SIZE = 1000
embedding_dim = 256
units = 512
vocab_size = top_k + 1
num_steps = len(img_name_train) // BATCH_SIZE

# Shape of vector extracted from InceptionV3 is (64, 2048)
features_shape = 2048
attention_features_shape = 64

In [None]:
# Mapping Function For Image Name And Image Tensor
def map_func(img_name, cap):
  img_tensor = np.load(img_name.decode('utf-8')+'.npy')
  return img_tensor, cap

In [None]:
# Dataset Preperation For Training
dataset = tf.data.Dataset.from_tensor_slices((img_name_train, cap_train))

# Mapping To Load Numpy Files In Parallel
dataset = dataset.map(lambda item1, item2: tf.numpy_function(
          map_func, [item1, item2], [tf.float32, tf.int32]),
          num_parallel_calls=tf.data.experimental.AUTOTUNE)

# Shuffling And Batching
dataset = dataset.shuffle(BUFFER_SIZE).batch(BATCH_SIZE)
dataset = dataset.prefetch(buffer_size=tf.data.experimental.AUTOTUNE)

## Model

Model decoder: [Neural Machine Translation with Attention](../sequences/nmt_with_attention.ipynb).

The model architecture is inspired by the [Show, Attend and Tell](https://arxiv.org/pdf/1502.03044.pdf) paper.

* Extract features from lower convolutional layer of InceptionV3 giving us vector of shape (8, 8, 2048).
* Squash that to shape of (64, 2048).
* Pass this vector through CNN Encoder (which consists of a single Fully connected layer).
* The RNN (here GRU) attends over image to predict the next word.

In [None]:
class BahdanauAttention(tf.keras.Model):
  def __init__(self, units):
    super(BahdanauAttention, self).__init__()
    self.W1 = tf.keras.layers.Dense(units)
    self.W2 = tf.keras.layers.Dense(units)
    self.V = tf.keras.layers.Dense(1)

  def call(self, features, hidden):

    # features(CNN_encoder output) shape == (batch_size, 64, embedding_dim)
    # hidden shape == (batch_size, hidden_size)
    # hidden_with_time_axis shape == (batch_size, 1, hidden_size)
    hidden_with_time_axis = tf.expand_dims(hidden, 1)

    # score shape == (batch_size, 64, hidden_size)
    score = tf.nn.tanh(self.W1(features) + self.W2(hidden_with_time_axis))

    # attention_weights shape == (batch_size, 64, 1)
    # you get 1 at the last axis because you are applying score to self.V
    attention_weights = tf.nn.softmax(self.V(score), axis=1)

    # context_vector shape after sum == (batch_size, hidden_size)
    context_vector = attention_weights * features
    context_vector = tf.reduce_sum(context_vector, axis=1)

    return context_vector, attention_weights

In [None]:
class CNN_Encoder(tf.keras.Model):
    # Since you have already extracted the features and dumped it using pickle
    # This encoder passes those features through a Fully connected layer
    def __init__(self, embedding_dim):
        super(CNN_Encoder, self).__init__()
        
        # shape after fc == (batch_size, 64, embedding_dim)
        self.fc = tf.keras.layers.Dense(embedding_dim)

    def call(self, x):
        x = self.fc(x)
        x = tf.nn.relu(x)
        return x

In [None]:
class RNN_Decoder(tf.keras.Model):
  def __init__(self, embedding_dim, units, vocab_size):
    super(RNN_Decoder, self).__init__()
    self.units = units

    self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
    self.gru = tf.keras.layers.GRU(self.units,
                                   return_sequences=True,
                                   return_state=True,
                                   recurrent_initializer='glorot_uniform')
    self.fc1 = tf.keras.layers.Dense(self.units)
    self.fc2 = tf.keras.layers.Dense(vocab_size)

    self.attention = BahdanauAttention(self.units)

  def call(self, x, features, hidden):
    # defining attention as a separate model
    context_vector, attention_weights = self.attention(features, hidden)

    # x shape after passing through embedding == (batch_size, 1, embedding_dim)
    x = self.embedding(x)

    # x shape after concatenation == (batch_size, 1, embedding_dim + hidden_size)
    x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)

    # passing the concatenated vector to the GRU
    output, state = self.gru(x)

    # shape == (batch_size, max_length, hidden_size)
    x = self.fc1(output)

    # x shape == (batch_size * max_length, hidden_size)
    x = tf.reshape(x, (-1, x.shape[2]))

    # output shape == (batch_size * max_length, vocab)
    x = self.fc2(x)

    return x, state, attention_weights

  def reset_state(self, batch_size):
    return tf.zeros((batch_size, self.units))

In [None]:
encoder = CNN_Encoder(embedding_dim)
decoder = RNN_Decoder(embedding_dim, units, vocab_size)
# top_k+1 = vocab size + 1 (5001)

In [None]:
optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')

def loss_function(real, pred):
  mask = tf.math.logical_not(tf.math.equal(real, 0))
  loss_ = loss_object(real, pred)

  mask = tf.cast(mask, dtype=loss_.dtype)
  loss_ *= mask

  return tf.reduce_mean(loss_)

## Checkpointing

In [None]:
checkpoint_path = "checkpoints/train"
ckpt = tf.train.Checkpoint(encoder=encoder,
                           decoder=decoder,
                           optimizer = optimizer)
ckpt_manager = tf.train.CheckpointManager(ckpt, checkpoint_path, max_to_keep=5)

In [None]:
start_epoch = 0
if ckpt_manager.latest_checkpoint:
  start_epoch = int(ckpt_manager.latest_checkpoint.split('-')[-1])
  # restoring the latest checkpoint in checkpoint_path
  ckpt.restore(ckpt_manager.latest_checkpoint)

## Training

* Extract features stored in respective `.npy` files and then pass those features through encoder.
* Encoder output, hidden state(initialized to 0) and decoder input (which is start token) is passed to decoder.
* Decoder returns predictions and decoder hidden state.
* Decoder hidden state is then passed back into model and predictions are used to calculate loss.
* Use teacher forcing to decide next input to decoder.
* Teacher forcing is technique where target word is passed as next input to decoder.
* Final step is to calculate gradients and apply it to optimizer and backpropagate.


In [None]:
# Loss Plot Array
loss_plot = []

In [None]:
@tf.function
def train_step(img_tensor, target):
  loss = 0

  # Initializing Hidden State For Each Batch (because captions are not related from image to image)
  hidden = decoder.reset_state(batch_size=target.shape[0])

  dec_input = tf.expand_dims([tokenizer.word_index['<start>']] * target.shape[0], 1)

  with tf.GradientTape() as tape:
      features = encoder(img_tensor)

      for i in range(1, target.shape[1]):
          # passing features through decoder
          predictions, hidden, _ = decoder(dec_input, features, hidden)

          loss += loss_function(target[:, i], predictions)

          # using teacher forcing
          dec_input = tf.expand_dims(target[:, i], 1)

  total_loss = (loss / int(target.shape[1]))

  trainable_variables = encoder.trainable_variables + decoder.trainable_variables

  gradients = tape.gradient(loss, trainable_variables)

  optimizer.apply_gradients(zip(gradients, trainable_variables))

  return loss, total_loss

In [None]:
EPOCHS = 20

for epoch in tqdm(range(start_epoch, EPOCHS)):
    start = time.time()
    total_loss = 0

    for (batch, (img_tensor, target)) in tqdm(enumerate(dataset)):
        batch_loss, t_loss = train_step(img_tensor, target)
        total_loss += t_loss

        if batch % 100 == 0:
            print ('Epoch {} Batch {} Loss {:.4f}'.format(
              epoch + 1, batch, batch_loss.numpy() / int(target.shape[1])))
    # storing the epoch end loss value to plot later
    loss_plot.append(total_loss / num_steps)

    if epoch % 1 == 0:
      ckpt_manager.save()

    print ('Epoch {} Loss {:.6f}'.format(epoch + 1,
                                         total_loss/num_steps))
    print ('Time taken for 1 epoch {} sec\n'.format(time.time() - start))

HBox(children=(FloatProgress(value=0.0, max=19.0), HTML(value='')))

HBox(children=(FloatProgress(value=1.0, bar_style='info', max=1.0), HTML(value='')))

Epoch 2 Batch 0 Loss 0.7449


In [None]:
plt.plot(loss_plot)
plt.xlabel('Epochs')
plt.ylabel('Loss')
plt.title('Loss Plot')
plt.show()

## Captions

* Evaluate function is similar to training loop, except teacher forcing is not used. Input to decoder at each time step is its previous predictions along with hidden state and encoder output.
* Stop predicting when model predicts end token.
* Store attention weights for every time step.

In [None]:
def evaluate(image):
    attention_plot = np.zeros((max_length, attention_features_shape))

    hidden = decoder.reset_state(batch_size=1)

    temp_input = tf.expand_dims(load_image(image)[0], 0)
    img_tensor_val = image_features_extract_model(temp_input)
    img_tensor_val = tf.reshape(img_tensor_val, (img_tensor_val.shape[0], -1, img_tensor_val.shape[3]))

    features = encoder(img_tensor_val)

    dec_input = tf.expand_dims([tokenizer.word_index['<start>']], 0)
    result = []

    for i in range(max_length):
        predictions, hidden, attention_weights = decoder(dec_input, features, hidden)

        attention_plot[i] = tf.reshape(attention_weights, (-1, )).numpy()

        predicted_id = tf.random.categorical(predictions, 1)[0][0].numpy()
        result.append(tokenizer.index_word[predicted_id])

        if tokenizer.index_word[predicted_id] == '<end>':
            return result, attention_plot

        dec_input = tf.expand_dims([predicted_id], 0)

    attention_plot = attention_plot[:len(result), :]
    return result, attention_plot

In [None]:
def plot_attention(image, result, attention_plot):
    temp_image = np.array(Image.open(image))

    fig = plt.figure(figsize=(10, 10))

    len_result = len(result)
    for l in range(len_result):
        temp_att = np.resize(attention_plot[l], (8, 8))
        ax = fig.add_subplot(len_result//2, len_result//2, l+1)
        ax.set_title(result[l])
        img = ax.imshow(temp_image)
        ax.imshow(temp_att, cmap='gray', alpha=0.6, extent=img.get_extent())

    plt.tight_layout()
    plt.show()

In [None]:
# Captions on validation set
rid = np.random.randint(0, len(img_name_val))
image = img_name_val[rid]
real_caption = ' '.join([tokenizer.index_word[i] for i in cap_val[rid] if i not in [0]])
result, attention_plot = evaluate(image)

print ('Real Caption:', real_caption)
print ('Prediction Caption:', ' '.join(result))
plot_attention(image, result, attention_plot)


## Predicting on other images


In [None]:
image_url = 'https://tensorflow.org/images/surf.jpg'
image_extension = image_url[-4:]
image_path = tf.keras.utils.get_file('image'+image_extension,
                                     origin=image_url)

# image_path = '/pARIKH 1758.jpg'
max_length = 34
result, attention_plot = evaluate(image_path)
print ('Prediction Caption:', ' '.join(result))
# plot_attention(image_path, result, attention_plot)

# Opening the image
Image.open(image_path)


In [None]:
!cd checkpoints

In [None]:
!ls

annotations  checkpoints  sample_data  train2014
