# Imports
The following packages are imported:
- tensorflow
- matplotlib
- numpy
- IPython


In [None]:
# import wandb

import tensorflow as tf
from tensorflow.keras.layers import *

import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

import IPython

# Data
- data_dir is the path to the images and the `results.csv`
- image_dir is the path exculsively to the images
- csv_file is the path to the `results.csv` file

In [None]:
data_dir = '../input/flickr-image-dataset/flickr30k_images'
image_dir = f'{data_dir}/flickr30k_images'
csv_file = f'{data_dir}/results.csv'

Here we read the csv file as a dataframe and make some observations from it.
For a quick EDA we are going to 
- check the shape of the dataframe
- check the names of the columns
- find out the unique image names there are

In [None]:
df = pd.read_csv(csv_file, delimiter='|')

print(f'[INFO] The shape of dataframe: {df.shape}')
print(f'[INFO] The columns in the dataframe: {df.columns}')
print(f'[INFO] Unique image names: {len(pd.unique(df["image_name"]))}')

In [None]:
df.columns = ['image_name', 'comment_number', 'comment']
del df['comment_number']

# Image names now correspond to the absolute position
df['image_name'] = image_dir+'/'+df['image_name']

# <start> comment <end>
df['comment'] = "<start> "+df['comment']+" <end>"

In [None]:
# Shuffle the dataframe
df = df.sample(frac=1).reset_index(drop=True)
df.head()

In [None]:
# train_size, val_size, test_size

train_size = 60_000 
val_size = 10_000
test_size = 20_000

Splitting the dataframe accordingly

In [None]:
train_df = df.iloc[:train_size,:]
val_df = df.iloc[train_size:train_size+val_size,:]
test_df = df.iloc[train_size+val_size:train_size+val_size+test_size,:]

train_df.shape, val_df.shape, test_df.shape

In [None]:
# Enter different indices.
index = 200

image_name = train_df['image_name'][index]
comment = train_df['comment'][index]

print(comment)

IPython.display.Image(filename=image_name)

# Text Handling
- Defined the size of the vocab which is `5000`.
- Initialized the Tokenizer class.
    - Standardized (all to lower case)
    - Filters the punctuations
    - Splits the text
    - Creates the vocabulary (`<start>, <end> and <unk>` is defined)

In [None]:
# Choose the top 10000 words from the vocabulary
top_k = 5000
tokenizer = tf.keras.preprocessing.text.Tokenizer(num_words=top_k,
                                                  oov_token="<unk>",
                                                  filters='!"#$%&()*+.,-/:;=?@[\]^_`{|}~')

Here we fit the `tokenizer` object on the captions. This helps in the updation of the vocab that the `tokenizer` object might have.

In the first iteration the vocabulary does not start from `0`. Both the dictionaries have 1 as the key or value.

In [None]:
# build the vocabulary
tokenizer.fit_on_texts(train_df['comment'].astype("str"))

In [None]:
# This is a sanity check function
def check_vocab(word):
    i = tokenizer.word_index[word]
    print(f"The index of the word: {i}")
    print(f"Index {i} is word {tokenizer.index_word[i]}")
    
check_vocab("pajama")

Here we are padding the sentences so that each of the sentences are of the same length.

In [None]:
tokenizer.word_index['<pad>'] = 0
tokenizer.index_word[0] = '<pad>'

In [None]:
# Create the tokenized vectors
train_seqs = tokenizer.texts_to_sequences(train_df['comment'].astype("str"))
val_seqs = tokenizer.texts_to_sequences(val_df['comment'].astype("str"))
test_seqs = tokenizer.texts_to_sequences(test_df['comment'].astype("str"))

In [None]:
# Pad each vector to the max_length of the captions
# If you do not provide a max_length value, pad_sequences calculates it automatically
train_cap_vector = tf.keras.preprocessing.sequence.pad_sequences(train_seqs, padding='post')
val_cap_vector = tf.keras.preprocessing.sequence.pad_sequences(val_seqs, padding='post')
test_cap_vector = tf.keras.preprocessing.sequence.pad_sequences(test_seqs, padding='post')

In [None]:
# Caption vector
train_cap_vector.shape, val_cap_vector.shape, test_cap_vector.shape

In [None]:
train_cap_ds = tf.data.Dataset.from_tensor_slices(train_cap_vector)
val_cap_ds = tf.data.Dataset.from_tensor_slices(val_cap_vector)
test_cap_ds = tf.data.Dataset.from_tensor_slices(test_cap_vector)

# Image Handling
- Load the image
- decode jpeg
- resize
- standardize

In [None]:
@tf.function
def load_img(image_path):
    img = tf.io.read_file(image_path)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.image.resize(img, (299, 299))
    img = tf.keras.applications.inception_v3.preprocess_input(img)
    return img

In [None]:
train_img_name = train_df['image_name'].values
val_img_name = val_df['image_name'].values
test_img_name = test_df['image_name'].values

In [None]:
train_img_ds = tf.data.Dataset.from_tensor_slices(train_img_name).map(load_img)
val_img_ds = tf.data.Dataset.from_tensor_slices(val_img_name).map(load_img)
test_img_ds = tf.data.Dataset.from_tensor_slices(test_img_name).map(load_img)

# Joint data

In [None]:
# prefecth and batch the dataset
AUTOTUNE = tf.data.experimental.AUTOTUNE
BATCH_SIZE = 64

train_ds = tf.data.Dataset.zip((train_img_ds, train_cap_ds)).batch(BATCH_SIZE,drop_remainder=True).prefetch(buffer_size=AUTOTUNE)
val_ds = tf.data.Dataset.zip((val_img_ds, val_cap_ds)).batch(BATCH_SIZE,drop_remainder=True).prefetch(buffer_size=AUTOTUNE)
test_ds = tf.data.Dataset.zip((test_img_ds, test_cap_ds)).batch(BATCH_SIZE,drop_remainder=True).prefetch(buffer_size=AUTOTUNE)

Sanity check for the division of datasets

## Model

In [None]:
# Some global variables
EMBEDDING_DIM = 256
VOCAB_SIZE = top_k+1
UNITS = 512
KERNEL = 64
FEATURES = 2048

Using InceptionV3

In [None]:
class CNN_Encoder(tf.keras.Model):
    
    def __init__(self, embedding_dim, batch_size):
        super(CNN_Encoder, self).__init__()
        self.batch_size = batch_size
        self.embedding_dim = embedding_dim
        
    def build(self, input_shape):
        self.image_model = tf.keras.applications.InceptionV3(include_top=False,
                                                weights='imagenet')
        self.new_input = self.image_model.input
        self.hidden_layer = self.image_model.layers[-1].output
        self.image_features_extract_model = tf.keras.Model(self.new_input, self.hidden_layer)
        self.image_features_extract_model.trainable = False
        
        self.reshape = tf.keras.layers.Reshape(target_shape=(KERNEL,FEATURES))
        self.fc = Dense(units=self.embedding_dim,
                        activation='relu')
        
    def call(self, x):
        x = self.image_features_extract_model(x)
        x = self.reshape(x)
        x = self.fc(x)
        return x

In [None]:
# Test the encoder
encoder = CNN_Encoder(EMBEDDING_DIM, BATCH_SIZE)
for image, caption in train_ds.take(1):
    features = encoder(image)
    print(f"ENCODER OUTPUT: {features.shape}")

In [None]:
class BahdanauAttention(tf.keras.Model):
    def __init__(self, units):
        super(BahdanauAttention, self).__init__()
        self.W1 = tf.keras.layers.Dense(units)
        self.W2 = tf.keras.layers.Dense(units)
        self.V = tf.keras.layers.Dense(1)

    def call(self, annotations, hidden):
        hidden_with_time_axis = tf.expand_dims(hidden, 1)
        attention_hidden_layer = (tf.nn.tanh(self.W1(annotations) +
                                             self.W2(hidden_with_time_axis)))
        score = self.V(attention_hidden_layer)
        attention_weights = tf.nn.softmax(score, axis=1)
        context_vector = attention_weights * annotations
        context_vector = tf.reduce_sum(context_vector, axis=1) 

        return context_vector, attention_weights

In [None]:
class RNN_Decoder(tf.keras.Model):
    def __init__(self, embedding_dim, units, vocab_size, batch_size):
        super(RNN_Decoder, self).__init__()
        self.batch_size = batch_size
        self.units = units

        self.embedding = tf.keras.layers.Embedding(vocab_size, embedding_dim)
        self.gru = tf.keras.layers.GRU(self.units,
                                       return_sequences=True,
                                       return_state=True,
                                       recurrent_initializer='glorot_uniform')
        self.fc1 = tf.keras.layers.Dense(self.units)
        self.fc2 = tf.keras.layers.Dense(vocab_size)
        self.attention = BahdanauAttention(self.units)

    def call(self, x, annotations, hidden):
        context_vector, attention_weights = self.attention(annotations, hidden)
        x = self.embedding(x)
        x = tf.concat([tf.expand_dims(context_vector, 1), x], axis=-1)
        output, state = self.gru(x)
        x = self.fc1(output)
        x = tf.reshape(x, (-1, x.shape[2]))
        x = self.fc2(x)

        return x, state, attention_weights

    def reset_state(self):
        return tf.zeros((self.batch_size, self.units))

In [None]:
# Test the decoder
encoder = CNN_Encoder(EMBEDDING_DIM, BATCH_SIZE)
decoder = RNN_Decoder(EMBEDDING_DIM, UNITS, VOCAB_SIZE, BATCH_SIZE)

for image, caption in train_ds.take(1):
    features = encoder(image)
    print(f"ENCODER OUTPUT: {features.shape}")
    hidden = decoder.reset_state()
    dec_input = tf.expand_dims([tokenizer.word_index['<start>']] * caption.shape[0], 1)
    predictions, hidden, attn_weights = decoder(dec_input, features, hidden)
    print(f"PREDICTION: {predictions.shape}")
    print(f"HIDDEN: {hidden.shape}")
    print(f"ATTENTION: {attn_weights.shape}")

# Wrapping the Gradient Tape in Model Class

In [None]:
for image, caption in train_ds.take(1):
    print(image.shape)
    print(caption.shape)

In [None]:
class Image_Caption_Gen(tf.keras.Model):
    def __init__(self, encoder, decoder):
        super(Image_Caption_Gen, self).__init__()
        self.encoder = encoder
        self.decoder = decoder

    def train_step(self, data):
        img_tensor, target = data
        
        loss = 0
        
        # initializing the hidden state for each batch
        # because the captions are not related from image to image
        hidden = self.decoder.reset_state()
        
        dec_input = tf.expand_dims([tokenizer.word_index['<start>']] * BATCH_SIZE, 1)
        
        with tf.GradientTape() as tape:
            features = self.encoder(img_tensor)
            
            for i in range(1, target.shape[1]):
                # passing the features through the decoder
                predictions, hidden, _ = self.decoder(dec_input, features, hidden)
                
                loss += loss_function(target[:, i], predictions)
                
                # using teacher forcing
                dec_input = tf.expand_dims(target[:, i], 1)
                
        total_loss = (loss / int(target.shape[1]))
        trainable_variables = self.encoder.trainable_variables + self.decoder.trainable_variables
        gradients = tape.gradient(loss, trainable_variables)
        optimizer.apply_gradients(zip(gradients, trainable_variables))
        return {"custom_loss": total_loss}
    
    def test_step(self, data):
        img_tensor, target = data
        
        loss = 0
        
        # initializing the hidden state for each batch
        # because the captions are not related from image to image
        hidden = self.decoder.reset_state()
        
        dec_input = tf.expand_dims([tokenizer.word_index['<start>']] * BATCH_SIZE, 1)
        
        features = self.encoder(img_tensor)
            
        for i in range(1, target.shape[1]):
            # passing the features through the decoder
            predictions, hidden, _ = self.decoder(dec_input, features, hidden)

            loss += loss_function(target[:, i], predictions)

            # using teacher forcing
            dec_input = tf.expand_dims(target[:, i], 1)
                
        total_loss = (loss / int(target.shape[1]))
        return {"custom_loss": total_loss}

We use `Adam` as the optimizer.

The loss is `SparseCategoricalCrossentropy`, because here it would be inefficient to use one-hot-encoders are the ground truth. We will also use mask to help mask the `<pad>` so that we do not let the sequence model learn to overfit on the same.

In [None]:
# Early Stopping to prevent overfitting
es = tf.keras.callbacks.EarlyStopping(monitor="val_custom_loss", patience=2, verbose=2, restore_best_weights=True)

In [None]:
EPOCHS=10
# Test the decoder
encoder = CNN_Encoder(EMBEDDING_DIM, BATCH_SIZE)
decoder = RNN_Decoder(EMBEDDING_DIM, UNITS, VOCAB_SIZE, BATCH_SIZE)

optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')

def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)

    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask

    return tf.reduce_mean(loss_)

main_model = Image_Caption_Gen(encoder, decoder)
main_model.compile(loss=loss_function, optimizer=optimizer)

history_inception = main_model.fit(
    train_ds,
    validation_data=val_ds,
    callbacks = [es],
    epochs=EPOCHS)

In [None]:
custom_test_loss = main_model.evaluate(test_ds)
print(f'[INFO] Test Loss: {custom_test_loss}')

Now trying out Inception Resnet

In [None]:
EMBEDDING_DIM = 256
VOCAB_SIZE = top_k+1
UNITS = 512
KERNEL = 64
FEATURES = 2048

In [None]:
@tf.function
def load_img_inception_resnet(image_path):
    img = tf.io.read_file(image_path)
    img = tf.image.decode_jpeg(img, channels=3)
    img = tf.image.resize(img, (299, 299))
    img = tf.keras.applications.inception_resnet_v2.preprocess_input(img)
    return img

In [None]:
train_img_ds_inception_resnet = tf.data.Dataset.from_tensor_slices(train_img_name).map(load_img_inception_resnet)
val_img_ds_inception_resnet = tf.data.Dataset.from_tensor_slices(val_img_name).map(load_img_inception_resnet)
test_img_ds_inception_resnet = tf.data.Dataset.from_tensor_slices(test_img_name).map(load_img_inception_resnet)

In [None]:
train_ds_inception_resnet = tf.data.Dataset.zip((train_img_ds_inception_resnet, train_cap_ds)).batch(BATCH_SIZE,drop_remainder=True).prefetch(buffer_size=AUTOTUNE)
val_ds_inception_resnet = tf.data.Dataset.zip((val_img_ds_inception_resnet, val_cap_ds)).batch(BATCH_SIZE,drop_remainder=True).prefetch(buffer_size=AUTOTUNE)
test_ds_inception_resnet = tf.data.Dataset.zip((test_img_ds_inception_resnet, test_cap_ds)).batch(BATCH_SIZE,drop_remainder=True).prefetch(buffer_size=AUTOTUNE)

In [None]:
KERNEL_RES = 64
FEATURES_RES = 1536

In [None]:
class CNN_Encoder_inception_resnet(tf.keras.Model):
    
    def __init__(self, embedding_dim, batch_size):
        super(CNN_Encoder_inception_resnet, self).__init__()
        self.batch_size = batch_size
        self.embedding_dim = embedding_dim
        
    def build(self, input_shape):
        self.image_model = tf.keras.applications.InceptionResNetV2(include_top=False,
                                                weights='imagenet')
        self.new_input = self.image_model.input
        self.hidden_layer = self.image_model.layers[-1].output
        self.image_features_extract_model = tf.keras.Model(self.new_input, self.hidden_layer)
        self.image_features_extract_model.trainable = False
        
        self.reshape = tf.keras.layers.Reshape(target_shape=(KERNEL_RES,FEATURES_RES))
        self.fc = Dense(units=self.embedding_dim,
                        activation='relu')
        
    def call(self, x):
        x = self.image_features_extract_model(x)
        x = self.reshape(x)
        x = self.fc(x)
        return x

In [None]:
EPOCHS=10
# Test the decoder
encoder = CNN_Encoder_inception_resnet(EMBEDDING_DIM, BATCH_SIZE)
decoder = RNN_Decoder(EMBEDDING_DIM, UNITS, VOCAB_SIZE, BATCH_SIZE)

optimizer = tf.keras.optimizers.Adam()
loss_object = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=True, reduction='none')

def loss_function(real, pred):
    mask = tf.math.logical_not(tf.math.equal(real, 0))
    loss_ = loss_object(real, pred)

    mask = tf.cast(mask, dtype=loss_.dtype)
    loss_ *= mask

    return tf.reduce_mean(loss_)

main_model_res = Image_Caption_Gen(encoder, decoder)
main_model_res.compile(loss=loss_function, optimizer=optimizer)

history_res = main_model_res.fit(
    train_ds_inception_resnet,
    validation_data=val_ds_inception_resnet,
    callbacks = [es],
    epochs=EPOCHS)

In [None]:
plt.plot(history_inception.history["custom_loss"], label="train_loss")
plt.plot(history_inception.history["val_custom_loss"], label="val_loss")
plt.title("Loss vs. Epoch")
plt.xlabel("Epoch #")
plt.ylabel("Loss")
plt.legend(loc="lower left")

plt.savefig("loss.png")
plt.show()

In [None]:
# Save the weights of the model for better reproducibility
main_model.encoder.save_weights("encoder_inception.h5")
main_model.decoder.save_weights("decoder_inception.h5")

In [None]:
main_model_res.encoder.save_weights("encoder_res.h5")
main_model_res.decoder.save_weights("decoder_res.h5")

# Captions

In [None]:
# Test the decoder
encoder = CNN_Encoder(EMBEDDING_DIM, 1)
decoder = RNN_Decoder(EMBEDDING_DIM, UNITS, VOCAB_SIZE, 1)

for image, caption in train_ds.take(1):
    features = encoder(tf.expand_dims(image[1],0))
    print(f"ENCODER OUTPUT: {features.shape}")
    hidden = decoder.reset_state()
    dec_input = tf.expand_dims([tokenizer.word_index['<start>']], 1)
    predictions, hidden, attn_weights = decoder(dec_input, features, hidden)
    print(f"PREDICTION: {predictions.shape}")
    print(f"HIDDEN: {hidden.shape}")
    print(f"ATTENTION: {attn_weights.shape}")

In [None]:
encoder.load_weights("../input/weight/encoder_inception.h5")
decoder.load_weights("../input/weight/decoder_inception.h5")

In [None]:
def evaluate(image):
    #                          max_length  64
    attention_plot = np.zeros((64, KERNEL)) ## Kernel(depends upon encoder), this decides the size of attention_plot[i]

    hidden = decoder.reset_state() ## initialization decoder

    img = tf.expand_dims(load_img(image), 0) 
    features = encoder(img)

    dec_input = tf.expand_dims([tokenizer.word_index['<start>']], 0)
    result = []

    for i in range(64):
        predictions, hidden, attention_weights = decoder(dec_input, features, hidden)

        attention_plot[i] = tf.reshape(attention_weights, (-1, )).numpy()

        predicted_id = tf.random.categorical(predictions, 1)[0][0].numpy()
        result.append(tokenizer.index_word[predicted_id])

        if tokenizer.index_word[predicted_id] == '<end>':
            return result, attention_plot

        dec_input = tf.expand_dims([predicted_id], 0)

    attention_plot = attention_plot[:len(result), :]
    return result, attention_plot

In [None]:
def plot_attention(image, result, attention_plot):
    temp_image = np.array(Image.open(image))

    fig = plt.figure(figsize=(20, 20)) # net figure size
    len_result = len(result)
    for i in range(len_result):
        temp_att = np.resize(attention_plot[i], (8, 8))
        print(temp_att)
        ax = fig.add_subplot(len_result//2, len_result//2, i+1)
        ax.set_title(result[i])
        img = ax.imshow(temp_image)
        ax.imshow(temp_att, cmap='copper_r', alpha=0.6, extent=img.get_extent())

    plt.tight_layout()
    plt.show()
    

In [None]:
from PIL import Image

In [None]:
image_url = 'https://media.istockphoto.com/photos/happy-kids-playing-with-garden-sprinkler-picture-id1159180335'
image_extension = image_url[-4:]
image_path = tf.keras.utils.get_file('image'+image_extension,
                                     origin=image_url)

result, attention_plot = evaluate(image_path)
print ('Prediction Caption:', ' '.join(result))
plot_attention(image_path, result, attention_plot)
# opening the image
Image.open(image_path)

In [None]:
from nltk.translate.bleu_score import sentence_bleu

In [None]:
rid = np.random.randint(70000, 90000)


image_name = train_df['image_name'][index]
comment = train_df['comment'][index]

image = test_df['image_name'][rid]

real_caption = test_df['comment'][rid]
result, attention_plot = evaluate(image)

# remove <start> and <end> from the real_caption
first = real_caption.split(' ', 1)[1]
real_caption = first.rsplit(' ', 1)[0]

#remove "<unk>" in result
for i in result:
   if i=="<unk>":
       result.remove(i)

for i in real_caption:
   if i=="<unk>":
       real_caption.remove(i)

#remove <end> from result        
result_join = ' '.join(result)
result_final = result_join.rsplit(' ', 1)[0]

real_appn = []
real_appn.append(real_caption.split())
reference = real_appn
candidate = result

score_BLEU1 = sentence_bleu(reference, candidate, weights=(1.0, 0.0, 0.0, 0.0))
score_BLEU2 = sentence_bleu(reference, candidate, weights=(0.5, 0.5, 0.0, 0.0))
score_BLEU3 = sentence_bleu(reference, candidate, weights=(0.33, 0.33, 0.33, 0.0))
score_BLEU4 = sentence_bleu(reference, candidate, weights=(0.25, 0.25, 0.25, 0.25))
print(f"BELU-1 score: {score_BLEU1*100}")
print(f"BELU-2 score: {score_BLEU2*100}")
print(f"BELU-3 score: {score_BLEU3*100}")
print(f"BELU-4 score: {score_BLEU4*100}")

print ('Real Caption:', real_caption)
print ('Prediction Caption:', result_final)

plot_attention(image, result, attention_plot)