<a href="https://colab.research.google.com/github/g00dAA/Image-Captioning-on-coco-dataset/blob/main/image_captioning_on_coco_dataset.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
import numpy as np
import pandas as pd
import tensorflow as tf
import matplotlib.pyplot as plt
from PIL import Image
from math import sqrt
from tqdm.auto import tqdm
import os
import json
import re
import time
import collections
import random
import requests
import json
import pickle

In [None]:
BASE_PATH = '../input/coco-2017-dataset/coco2017'

**Open the JSON file containing the annotations**

In [None]:
with open(f'{BASE_PATH}/annotations/captions_train2017.json', 'r') as f:
    data = json.load(f)
    data = data['annotations']

# Initialize a list to store image-caption pairs
img_cap_pairs = []

for sample in data:
    img_name = '%012d.jpg' % sample['image_id']              # Format the image filename using the image ID
    img_cap_pairs.append([img_name, sample['caption']])      # Append the image name and caption as a pair to the list


# Creating the DataFrame
captions = pd.DataFrame(img_cap_pairs, columns=['image', 'caption'])
captions['image'] = captions['image'].apply(
    lambda x: f'{BASE_PATH}/train2017/{x}'
)

# Randomly select 70,000 samples from the DataFrame to create a subset
captions = captions.sample(70000)
captions = captions.reset_index(drop=True)
captions.head()

**Text Preprocessing**

In [None]:
def preprocess(text):
    text = text.lower()                     # Convert to lowercase
    text = re.sub(r'[^\w\s]', '', text)     # Remove punctuation
    text = re.sub('\s+', ' ', text)         # Replace multiple spaces with a single space
    text = text.strip()                     # Strip leading and trailing spaces
    text = '[start] ' + text + ' [end]'     # Add start and end tokens to the text
    return text

In [None]:
# Applying Preprocessing
captions['caption'] = captions['caption'].apply(preprocess)
captions.head()

In [None]:
# Selecting a random row
random_row = captions.sample(1).iloc[0]

# Printing the caption associated with the image
print(random_row.caption)
print()

im = Image.open(random_row.image)
im

**Setting Up the Hyperparameters**

In [None]:
MAX_LENGTH = 40
VOCABULARY_SIZE = 10000    # Reduced vocabulary size
BATCH_SIZE = 32            # Reduced batch size
BUFFER_SIZE = 2000         # Increased buffer size
EMBEDDING_DIM = 256        # Reduced embedding dimension
UNITS = 256                # Reduced number of units
EPOCHS = 20                # Reduced number of epochs


In [None]:
# Creating a TextVectorization layer for tokenizing text captions
tokenizer = tf.keras.layers.TextVectorization(
    max_tokens=VOCABULARY_SIZE,                     # Setting the maximum size of the vocabulary
    standardize=None,                               # No standardization is applied
    output_sequence_length=MAX_LENGTH               # Setting the length of output sequences
)

# Adapting the tokenizer to the captions dataset
tokenizer.adapt(captions['caption'])

In [None]:
tokenizer.vocabulary_size()

In [None]:
# saving the vocabulary of the tokenizer to a binary file
pickle.dump(tokenizer.get_vocabulary(), open('vocab_coco.file', 'wb'))

In [None]:
# Creating a StringLookup layer to map words to indices
word2idx = tf.keras.layers.StringLookup(
    mask_token="",  # No mask token
    vocabulary=tokenizer.get_vocabulary()  # Use the tokenizer's vocabulary
)

# Creating a StringLookup layer to map indices back to words
idx2word = tf.keras.layers.StringLookup(
    mask_token="",  # No mask token
    vocabulary=tokenizer.get_vocabulary(),  # Use the tokenizer's vocabulary
    invert=True  # Invert the mapping to go from indices to words
)

In [None]:
# Creating a dictionary to map each image to its corresponding captions
img_to_cap_vector = collections.defaultdict(list)

# Populate the dictionary with image-caption pairs
for img, cap in zip(captions['image'], captions['caption']):
    img_to_cap_vector[img].append(cap)

# Get a list of all image keys (file paths)
img_keys = list(img_to_cap_vector.keys())
random.shuffle(img_keys)

slice_index = int(len(img_keys)*0.8)
# Split the keys into training and validation sets
img_name_train_keys, img_name_val_keys = (img_keys[:slice_index],
                                          img_keys[slice_index:])

In [None]:
# Preparing the training images and captions
train_imgs = []
train_captions = []

for imgt in img_name_train_keys:
    capt_len = len(img_to_cap_vector[imgt])           # Get the number of captions for the current image
    train_imgs.extend([imgt] * capt_len)              # Add the image key multiple times, once for each caption
    train_captions.extend(img_to_cap_vector[imgt])    # Add all captions for the current image


# Preparing the validation images and captions similarly
val_imgs = []
val_captions = []

for imgv in img_name_val_keys:
    capv_len = len(img_to_cap_vector[imgv])         # Get the number of captions for the current image
    val_imgs.extend([imgv] * capv_len)              # Add the image key multiple times, once for each caption
    val_captions.extend(img_to_cap_vector[imgv])    # Add all captions for the current image

In [None]:
len(train_imgs), len(train_captions), len(val_imgs), len(val_captions)

In [None]:
def load_data(img_path, caption):
    img = tf.io.read_file(img_path)                                   # Read the image file from the given path
    img = tf.io.decode_jpeg(img, channels=3)                          # Decode the image as a JPEG file, resulting in a 3D tensor
    img = tf.keras.layers.Resizing(299, 299)(img)                     # Resize the image to the size expected by Inception V3 model
    img = tf.keras.applications.inception_v3.preprocess_input(img)    # Preprocess the image using Inception V3's preprocessing function
    caption = tokenizer(caption)                                      # Tokenize the caption
    return img, caption

In [None]:
train_dataset = tf.data.Dataset.from_tensor_slices(
    (train_imgs, train_captions))

train_dataset = train_dataset.map(
    load_data, num_parallel_calls=tf.data.AUTOTUNE
    ).shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

val_dataset = tf.data.Dataset.from_tensor_slices(
    (val_imgs, val_captions))

val_dataset = val_dataset.map(
    load_data, num_parallel_calls=tf.data.AUTOTUNE
    ).shuffle(BUFFER_SIZE).batch(BATCH_SIZE)

**Image Augmentation Model**

In [None]:
image_augmentation = tf.keras.Sequential(
    [
        tf.keras.layers.RandomFlip("horizontal"),    # Randomly flip images horizontally
        tf.keras.layers.RandomRotation(0.2),         # Randomly rotate images by up to ±40 degrees
        tf.keras.layers.RandomContrast(0.3),         # Randomly adjust the contrast of images by up to ±30%
    ]
)


**Encoder**

In [None]:
def CNN_Encoder():
    # Initializing the InceptionV3 model with pretrained ImageNet weights,
    # excluding the top classification layer
    inception_v3 = tf.keras.applications.InceptionV3(
        include_top=False,
        weights='imagenet'
    )

    # Get the output of the last layer of the InceptionV3 model
    output = inception_v3.output
    # Reshape the output to a 2D tensor to be used as input for the RNN decoder
    output = tf.keras.layers.Reshape(
        (-1, output.shape[-1]))(output)

    # Create a new model that takes an image as input and outputs the reshaped tensor
    cnn_model = tf.keras.models.Model(inception_v3.input, output)
    # Return the CNN encoder model
    return cnn_model

In [None]:
class TransformerEncoderLayer(tf.keras.layers.Layer):
    def __init__(self, embed_dim, num_heads):
        super().__init__()
        # Layer normalization layers
        self.layer_norm_1 = tf.keras.layers.LayerNormalization()
        self.layer_norm_2 = tf.keras.layers.LayerNormalization()
        # Multi-head attention layer
        self.attention = tf.keras.layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim)
        # Dense layer with ReLU activation
        self.dense = tf.keras.layers.Dense(embed_dim, activation="relu")

    def call(self, x, training):
        # Apply the first layer normalization
        x = self.layer_norm_1(x)
        # Apply the dense layer
        x = self.dense(x)
        # Compute the attention output
        attn_output = self.attention(
            query=x,
            value=x,
            key=x,
            attention_mask=None,
            training=training
        )
        # Apply the second layer normalization with residual connection
        x = self.layer_norm_2(x + attn_output)
        return x

In [None]:
class Embeddings(tf.keras.layers.Layer):
    def __init__(self, vocab_size, embed_dim, max_len):
        super().__init__()
        # Token embedding layer
        self.token_embeddings = tf.keras.layers.Embedding(
            vocab_size, embed_dim)
        # Position embedding layer
        self.position_embeddings = tf.keras.layers.Embedding(
            max_len, embed_dim, input_shape=(None, max_len))

    def call(self, input_ids):
        # Determine the length of the input sequence
        length = tf.shape(input_ids)[-1]
        # Create a range of position IDs
        position_ids = tf.range(start=0, limit=length, delta=1)
        # Add a new dimension to make position_ids a 2D tensor
        position_ids = tf.expand_dims(position_ids, axis=0)

        # Get the token embeddings for the input_ids
        token_embeddings = self.token_embeddings(input_ids)
        # Get the position embeddings for the position_ids
        position_embeddings = self.position_embeddings(position_ids)

        # Return the sum of the token and position embeddings
        return token_embeddings + position_embeddings


In [None]:
# Define a custom Transformer Decoder Layer class
class TransformerDecoderLayer(tf.keras.layers.Layer):

    # Initialize the layer with embedding dimension, units, and number of heads
    def __init__(self, embed_dim, units, num_heads):
        super().__init__()
        # Embedding layer for token and position embeddings
        self.embedding = Embeddings(
            tokenizer.vocabulary_size(), embed_dim, MAX_LENGTH)

        # First multi-head attention layer with dropout
        self.attention_1 = tf.keras.layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim, dropout=0.1
        )
        # Second multi-head attention layer for encoder-decoder attention with dropout
        self.attention_2 = tf.keras.layers.MultiHeadAttention(
            num_heads=num_heads, key_dim=embed_dim, dropout=0.1
        )

        # Layer normalization layers
        self.layernorm_1 = tf.keras.layers.LayerNormalization()
        self.layernorm_2 = tf.keras.layers.LayerNormalization()
        self.layernorm_3 = tf.keras.layers.LayerNormalization()

        # Feed-forward network layers
        self.ffn_layer_1 = tf.keras.layers.Dense(units, activation="relu")
        self.ffn_layer_2 = tf.keras.layers.Dense(embed_dim)

        # Output layer with softmax activation for generating predictions
        self.out = tf.keras.layers.Dense(tokenizer.vocabulary_size(), activation="softmax")

        # Dropout layers for regularization
        self.dropout_1 = tf.keras.layers.Dropout(0.3)
        self.dropout_2 = tf.keras.layers.Dropout(0.5)

    # The call method for the layer, defining its forward pass
    def call(self, input_ids, encoder_output, training, mask=None):
        # Obtain embeddings for the input IDs
        embeddings = self.embedding(input_ids)

        # Initialize masks for attention
        combined_mask = None
        padding_mask = None

        # Create masks if provided
        if mask is not None:
            # Get causal attention mask to prevent future tokens from being attended to
            causal_mask = self.get_causal_attention_mask(embeddings)
            # Create padding mask for attention
            padding_mask = tf.cast(mask[:, :, tf.newaxis], dtype=tf.int32)
            # Combine masks
            combined_mask = tf.cast(mask[:, tf.newaxis, :], dtype=tf.int32)
            combined_mask = tf.minimum(combined_mask, causal_mask)

        # Apply the first attention layer with the combined mask
        attn_output_1 = self.attention_1(
            query=embeddings,
            value=embeddings,
            key=embeddings,
            attention_mask=combined_mask,
            training=training
        )

        # Apply layer normalization after adding the attention output to embeddings
        out_1 = self.layernorm_1(embeddings + attn_output_1)

        # Apply the second attention layer with the padding mask
        attn_output_2 = self.attention_2(
            query=out_1,
            value=encoder_output,
            key=encoder_output,
            attention_mask=padding_mask,
            training=training
        )

        # Apply layer normalization after adding the second attention output
        out_2 = self.layernorm_2(out_1 + attn_output_2)

        # Apply the feed-forward network and dropout
        ffn_out = self.ffn_layer_1(out_2)
        ffn_out = self.dropout_1(ffn_out, training=training)
        ffn_out = self.ffn_layer_2(ffn_out)

        # Apply the final layer normalization and dropout
        ffn_out = self.layernorm_3(ffn_out + out_2)
        ffn_out = self.dropout_2(ffn_out, training=training)
        # Generate predictions using the output layer
        preds = self.out(ffn_out)
        return preds

    # Helper function to create a causal attention mask
    def get_causal_attention_mask(self, inputs):
        # Get the shape of the inputs
        input_shape = tf.shape(inputs)
        batch_size, sequence_length = input_shape[0], input_shape[1]
        # Create a matrix where the upper triangle is zeroed out
        i = tf.range(sequence_length)[:, tf.newaxis]
        j = tf.range(sequence_length)
        mask = tf.cast(i >= j, dtype="int32")
        mask = tf.reshape(mask, (1, input_shape[1], input_shape[1]))
        # Repeat the mask for each element in the batch
        mult = tf.concat(
            [tf.expand_dims(batch_size, -1), tf.constant([1, 1], dtype=tf.int32)],
            axis=0
        )
        return tf.tile(mask, mult)


In [None]:
# Define a custom Keras Model for Image Captioning
class ImageCaptioningModel(tf.keras.Model):

    # Initialize the model with CNN encoder, Transformer encoder and decoder, and optional image augmentation
    def __init__(self, cnn_model, encoder, decoder, image_aug=None):
        super().__init__()
        self.cnn_model = cnn_model  # CNN model for feature extraction from images
        self.encoder = encoder  # Transformer encoder
        self.decoder = decoder  # Transformer decoder
        self.image_aug = image_aug  # Optional image augmentation
        self.loss_tracker = tf.keras.metrics.Mean(name="loss")  # Tracker for loss
        self.acc_tracker = tf.keras.metrics.Mean(name="accuracy")  # Tracker for accuracy

    # Calculate the loss, taking into account the mask for padded tokens
    def calculate_loss(self, y_true, y_pred, mask):
        loss = self.loss(y_true, y_pred)  # Compute the loss using the model's loss function
        mask = tf.cast(mask, dtype=loss.dtype)  # Cast the mask to the same dtype as the loss
        loss *= mask  # Apply the mask to the loss
        return tf.reduce_sum(loss) / tf.reduce_sum(mask)  # Return the average loss

    # Calculate the accuracy, taking into account the mask for padded tokens
    def calculate_accuracy(self, y_true, y_pred, mask):
        accuracy = tf.equal(y_true, tf.argmax(y_pred, axis=2))  # Check if predictions match the true values
        accuracy = tf.math.logical_and(mask, accuracy)  # Apply the mask to the accuracy
        accuracy = tf.cast(accuracy, dtype=tf.float32)  # Cast the accuracy to float32
        mask = tf.cast(mask, dtype=tf.float32)  # Cast the mask to float32
        return tf.reduce_sum(accuracy) / tf.reduce_sum(mask)  # Return the average accuracy

    # Compute the loss and accuracy for a batch of data
    def compute_loss_and_acc(self, img_embed, captions, training=True):
        encoder_output = self.encoder(img_embed, training=training)  # Get encoder output
        y_input = captions[:, :-1]  # Input for the decoder (exclude the last token)
        y_true = captions[:, 1:]  # True output for the decoder (exclude the first token)
        mask = (y_true != 0)  # Create a mask for non-zero tokens
        y_pred = self.decoder(
            y_input, encoder_output, training=training, mask=mask
        )  # Get decoder predictions
        loss = self.calculate_loss(y_true, y_pred, mask)  # Calculate loss
        acc = self.calculate_accuracy(y_true, y_pred, mask)  # Calculate accuracy
        return loss, acc

    # Perform a training step
    def train_step(self, batch):
        imgs, captions = batch  # Unpack the batch

        if self.image_aug:  # If image augmentation is provided
            imgs = self.image_aug(imgs)  # Apply image augmentation

        img_embed = self.cnn_model(imgs)  # Get image embeddings from the CNN model

        with tf.GradientTape() as tape:  # Record operations for automatic differentiation
            loss, acc = self.compute_loss_and_acc(
                img_embed, captions
            )  # Compute loss and accuracy

        # Get trainable variables from the encoder and decoder
        train_vars = (
            self.encoder.trainable_variables + self.decoder.trainable_variables
        )
        grads = tape.gradient(loss, train_vars)  # Compute gradients
        self.optimizer.apply_gradients(zip(grads, train_vars))  # Apply gradients to variables
        self.loss_tracker.update_state(loss)  # Update the loss tracker
        self.acc_tracker.update_state(acc)  # Update the accuracy tracker

        # Return the current loss and accuracy
        return {"loss": self.loss_tracker.result(), "acc": self.acc_tracker.result()}

    # Perform a validation step
    def test_step(self, batch):
        imgs, captions = batch  # Unpack the batch

        img_embed = self.cnn_model(imgs)  # Get image embeddings from the CNN model

        loss, acc = self.compute_loss_and_acc(
            img_embed, captions, training=False  # Compute loss and accuracy in inference mode
        )

        self.loss_tracker.update_state(loss)  # Update the loss tracker
        self.acc_tracker.update_state(acc)  # Update the accuracy tracker

        # Return the current loss and accuracy
        return {"loss": self.loss_tracker.result(), "acc": self.acc_tracker.result()}

    # Define which metrics should be tracked
    @property
    def metrics(self):
        return [self.loss_tracker, self.acc_tracker]


In [None]:
encoder = TransformerEncoderLayer(EMBEDDING_DIM, 1)
decoder = TransformerDecoderLayer(EMBEDDING_DIM, UNITS, 8)

cnn_model = CNN_Encoder()
caption_model = ImageCaptioningModel(
    cnn_model=cnn_model, encoder=encoder, decoder=decoder, image_aug=image_augmentation,
)

In [None]:
cross_entropy = tf.keras.losses.SparseCategoricalCrossentropy(
    from_logits=False, reduction="none"
)

early_stopping = tf.keras.callbacks.EarlyStopping(patience=3, restore_best_weights=True)

caption_model.compile(
    optimizer=tf.keras.optimizers.Adam(),
    loss=cross_entropy
)

In [None]:
history = caption_model.fit(
    train_dataset,
    epochs=EPOCHS,
    validation_data=val_dataset,
    callbacks=[early_stopping]
)

In [None]:
history = caption_model.fit(
    train_dataset,
    epochs=EPOCHS,
    validation_data=val_dataset,
    callbacks=[early_stopping]
)

In [None]:
history = caption_model.fit(
    train_dataset,
    epochs=EPOCHS,
    validation_data=val_dataset,
    callbacks=[early_stopping]
)

In [None]:
plt.plot(history.history['loss'], label='train_loss')
plt.plot(history.history['val_loss'], label='validation loss')
plt.legend()
plt.show()

In [None]:
def load_image_from_path(img_path):
    img = tf.io.read_file(img_path)
    img = tf.io.decode_jpeg(img, channels=3)
    img = tf.keras.layers.Resizing(299, 299)(img)
    img = tf.keras.applications.inception_v3.preprocess_input(img)
    return img


def generate_caption(img_path, add_noise=False):
    img = load_image_from_path(img_path)

    if add_noise:
        noise = tf.random.normal(img.shape)*0.1
        img = img + noise
        img = (img - tf.reduce_min(img))/(tf.reduce_max(img) - tf.reduce_min(img))

    img = tf.expand_dims(img, axis=0)
    img_embed = caption_model.cnn_model(img)
    img_encoded = caption_model.encoder(img_embed, training=False)

    y_inp = '[start]'
    for i in range(MAX_LENGTH-1):
        tokenized = tokenizer([y_inp])[:, :-1]
        mask = tf.cast(tokenized != 0, tf.int32)
        pred = caption_model.decoder(
            tokenized, img_encoded, training=False, mask=mask)

        pred_idx = np.argmax(pred[0, i, :])
        pred_idx = tf.convert_to_tensor(pred_idx)
        pred_word = idx2word(pred_idx).numpy().decode('utf-8')
        if pred_word == '[end]':
            break

        y_inp += ' ' + pred_word

    y_inp = y_inp.replace('[start] ', '')
    return y_inp

In [None]:
idx = random.randrange(0, len(captions))
img_path = captions.iloc[idx].image

pred_caption = generate_caption(img_path)
print('Predicted Caption:', pred_caption)
print()
Image.open(img_path)

In [None]:
# img_url = "https://images.squarespace-cdn.com/content/v1/5e0e65adcd39ed279a0402fd/1627422658456-7QKPXTNQ34W2OMBTESCJ/1.jpg?format=2500w"

# im = Image.open(requests.get(img_url, stream=True).raw)
# im = im.convert('RGB')
# im.save('tmp.jpg')

# pred_caption = generate_caption('tmp.jpg', add_noise=False)
# print('Predicted Caption:', pred_caption)
# print()
# im

In [None]:
img_url = "https://cdn.pixabay.com/photo/2016/10/26/22/02/dog-1772759_1280.jpg"

im = Image.open(requests.get(img_url, stream=True).raw)
im = im.convert('RGB')
im.save('tmp.jpg')

pred_caption = generate_caption('tmp.jpg', add_noise=False)
print('Predicted Caption:', pred_caption)
print()
im

In [None]:
caption_model.save_weights('model.h5')

In [None]:
idx = random.randrange(0, len(captions))
img_path = captions.iloc[idx].image

pred_caption = generate_caption(img_path)
print('Predicted Caption:', pred_caption)
print()
Image.open(img_path)

In [None]:
idx = random.randrange(0, len(captions))
img_path = captions.iloc[idx].image

pred_caption = generate_caption(img_path)
print('Predicted Caption:', pred_caption)
print()
Image.open(img_path)

In [None]:
idx = random.randrange(0, len(captions))
img_path = captions.iloc[idx].image

pred_caption = generate_caption(img_path)
print('Predicted Caption:', pred_caption)
print()
Image.open(img_path)

In [None]:
idx = random.randrange(0, len(captions))
img_path = captions.iloc[idx].image

pred_caption = generate_caption(img_path)
print('Predicted Caption:', pred_caption)
print()
Image.open(img_path)

In [None]:
idx = random.randrange(0, len(captions))
img_path = captions.iloc[idx].image

pred_caption = generate_caption(img_path)
print('Predicted Caption:', pred_caption)
print()
Image.open(img_path)

In [None]:
import matplotlib.pyplot as plt

# Function to generate and display 10 predictions with corresponding images
def display_predictions(num_predictions=10):
    # Set up the subplot grid
    num_cols = 2
    num_rows = (num_predictions + num_cols - 1) // num_cols
    plt.figure(figsize=(15, 5 * num_rows))

    for i in range(num_predictions):
        # Generate a random index and corresponding image path
        idx = random.randrange(0, len(captions))
        img_path = captions.iloc[idx].image

        # Generate a caption for the image
        pred_caption = generate_caption(img_path)

        # Plot the image with the predicted caption
        plt.subplot(num_rows, num_cols, i + 1)
        plt.imshow(Image.open(img_path))
        plt.title(f'Prediction {i+1}: {pred_caption}')
        plt.axis('off')

    plt.tight_layout()
    plt.show()

# Call the function to display the predictions
display_predictions(10)
