# Image Captioning using VGG + GRU  

## Data processing

In [2]:
from pathlib import Path
import torch

import numpy as np
import matplotlib.pyplot as plt

from torchvision.transforms import Resize, ToTensor, Normalize, GaussianBlur

import torch.nn as nn
from torch import optim
import os
import pandas as pd

from datetime import datetime as dt

The file captions.txt is a txt file where each row presents an image_name, the comment_number and the comment, since each image has 5 different captions

In [3]:
import zipfile

zip_file_path = "./archive.zip"
extract_folder = "./archive"

# with zipfile.ZipFile(zip_file_path, 'r') as zip_ref:
#    zip_ref.extractall(extract_folder)

images_path = os.path.join(extract_folder, "flickr30k_images")
captions_path = "./captions.txt"

archive_df = pd.read_csv(captions_path,sep=",",header=None,names=["image_name", "caption_id", "caption"],skiprows=1)

## Vocabulary of words

In order to feed the captions to train the model, we want to create a vocabulary of words. We employ the library nltk to define word tokens. A frequency is assigned to each word, and the word is insterted in the vocabulary if frequency >= frequency_threshold, considering the entire dataset. 


In [4]:
!pip install nltk



In [5]:
from collections import Counter
import nltk
from nltk.tokenize import word_tokenize

nltk.download("punkt")
nltk.download("punkt_tab")


class Vocabulary:
    def __init__(self, freq_threshold=5):
        self.freq_threshold = freq_threshold
        self.word2idx = {}
        self.idx2word = {}
        self.wheights = {}      #calculated as inverse of the frequency
        self.idx = 0

        self.add_word("<PAD>")  #to have all sentences of same target_len
        self.add_word("<SOS>")  #start of sentence
        self.add_word("<EOS>")  #end of sentence
        self.add_word("<UNK>")  #unknown word
        self.counter = None     #to count the frequency of each word

    def build_vocabulary(self, sentence_list):
        """"
        Build vocabulary from a list of sentences based on word frequencies.
        
        Args:
            sentence_list (list of str): A list of sentences for vocabulary creation.

        The method processes each sentence to:
        - preprocess and tokenize it.
        - count word frequencies.
        - add words that meet or exceed the frequency threshold to the vocabulary,assigning weights as the inverse of their frequencies.

        Returns:
            None: Modifies the vocabulary and weights in place.
        """
        frequencies = Counter()
        for sentence in sentence_list:
            sentence = self.preprocess(sentence)
            words = word_tokenize(sentence.lower())
            frequencies.update(words)
        for word, freq in frequencies.items():
            if freq >= self.freq_threshold:
                self.add_word(word)
                self.wheights[word] = 1 / freq
        self.counter = frequencies


    #auxiliary methods to build_vocabulary:

    def add_word(self, word):
        if word not in self.word2idx:
            self.word2idx[word] = self.idx
            self.idx2word[self.idx] = word
            self.idx += 1

    def preprocess(self, sentence):
        return "".join([char for char in sentence if char.isalnum() or char.isspace()])


    def one_hot_encode(self, word_or_idx):
        if isinstance(word_or_idx, str):                                         #If it's a word
            idx = self.word2idx.get(word_or_idx, self.word2idx["<UNK>"])
        elif isinstance(word_or_idx, int):                                       #If it's already an index
            idx = word_or_idx
        else:
            raise ValueError("Input must be a word (str) or an index (int).")
        one_hot_vector = torch.zeros(len(self.word2idx), dtype=torch.float32)
        one_hot_vector[idx] = 1.0
        return one_hot_vector
    
    def __len__(self):
        return len(self.word2idx)


[nltk_data] Downloading package punkt to
[nltk_data]     C:\Users\franc\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package punkt_tab to
[nltk_data]     C:\Users\franc\AppData\Roaming\nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!


'\n    def one_hot_encode_caption(self, caption):\n        one_hot_vectors = [self.one_hot_encode(word) for word in caption]\n        return torch.tensor(one_hot_vectors)\n    def numericalize(self, sentence):\n        sentence = self.preprocess(sentence)\n        tokenized_text = word_tokenize(sentence.lower())\n        return [\n            self.word2idx.get(word, self.word2idx["<UNK>"]) for word in tokenized_text\n        ]\ndef caption_to_indices(vocab, caption):\n    indices = [vocab.word2idx["<SOS>"]]\n    for word in caption.split():\n        indices.append(vocab.word2idx.get(word.lower(), "<UNK>"))\n    indices.append(vocab.word2idx["<EOS>"])\n    return indices\ndef pad_sequence(seq, target_len, pad_idx):\n    if len(seq) < target_len:\n        seq.extend(\n            [pad_idx] * (target_len - len(seq))\n        )  # padding at the end of sequence\n    return seq[:target_len]\n# caption_indices_list = [caption_to_indices(our_vocab, caption) for caption in captions_list]\n# pa

Now we will build our own vocabulary using all the captions from the dataset.

In [7]:
captions_list = archive_df["caption"].tolist()
if os.path.isfile("our_vocab.pkl"):
    our_vocab = torch.load("our_vocab.pkl")
else:
    our_vocab = Vocabulary(freq_threshold=10)
    our_vocab.build_vocabulary(captions_list)

vocab_size = len(our_vocab)
print("The length of the vocabulary is: ", vocab_size)

The length of the vocabulary is:  5464


We want to give different weights to the special tokens, since they are the ones that appear more frequently.

In [10]:
words_in_vocab = set(our_vocab.word2idx.keys())
words_in_wheights = set(our_vocab.wheights.keys())
unwheighted_words = words_in_vocab - words_in_wheights
print("Words without wheights: ", unwheighted_words)

our_vocab.wheights["<PAD>"] = 0
our_vocab.wheights["<SOS>"] = 0
our_vocab.wheights["<EOS>"] = 1 / len(our_vocab)
our_vocab.wheights["<UNK>"] = 1 / len(our_vocab)

class_weights = torch.tensor(
    [our_vocab.wheights[our_vocab.idx2word[i]] for i in range(len(our_vocab))]
)

print("Print the weights of the first 30 words:\n", class_weights[:30])

Words without wheights:  set()
Print the weights of the first 30 words:
 tensor([0.0000e+00, 0.0000e+00, 1.8302e-04, 1.8302e-04, 4.6206e-05, 7.5654e-05,
        1.5198e-03, 2.7619e-05, 1.9608e-02, 4.5025e-04, 7.5301e-04, 6.1504e-05,
        2.4771e-04, 6.4267e-04, 8.5390e-05, 1.5106e-03, 2.9138e-04, 1.1981e-05,
        1.5878e-05, 2.5381e-03, 7.5884e-05, 4.0323e-03, 4.9515e-05, 2.1268e-04,
        3.3156e-04, 7.7519e-04, 1.0526e-02, 1.0529e-04, 1.9153e-04, 1.0616e-03])


To simplify the architecture that we are going to define and its training, we have decided to set the maximum length of the output captions. We first determine the average length of the dataset captions and then cut/pad them in case they are longer/shorter. This is also done to avoid having too many padding indices in the index representation of the captions.

In [15]:
import math

average_caption_length = sum(len(caption.split()) for caption in captions_list) / len(captions_list)
target_len = math.ceil(average_caption_length) + 1   #+1 to add the <EOS> token
print("The target length of the generated captions will be ", target_len)


The target length of the generated captions will be  15


In [19]:
def caption_to_padded_indices(vocab, caption, target_len):
    """
    Converts a caption to a tensor of indices with padding.

    Args:
        vocab: Vocabulary object with word-to-index mappings (`word2idx`).
        caption (str): The caption text to convert to indices.
        target_len (int): The desired length of the output tensor.
    
    Returns:
        torch.Tensor: A tensor of indices of length `target_len`, padded with the index for `<PAD>` if needed, or truncated if too long.
        
    Pads with `<PAD>` tokens if `indices` length is less than `target_len`, and appends `<EOS>` at the end of the caption before padding or truncating.
    """
    target_len -= 1

    indices = []  

    for word in caption.split():
        indices.append(vocab.word2idx.get(word.lower(), vocab.word2idx["<UNK>"]))

    pad_idx = vocab.word2idx["<PAD>"]
    if len(indices) < target_len:
        indices.extend([pad_idx] * (target_len - len(indices)))
    else:
        indices = indices[:target_len]
    indices.append(vocab.word2idx["<EOS>"])
    
    return torch.tensor(indices)


#Example
print("Two men are standing outside")
print(caption_to_padded_indices(our_vocab, "two men are standing outside", target_len))

Two men are standing outside
tensor([ 4, 27, 22, 30, 23,  0,  0,  0,  0,  0,  0,  0,  0,  0,  2])
tensor([ 4, 27, 22, 30, 23,  4, 27, 22, 30, 23,  4, 27, 22, 30,  2])


## Division into training, validation and test set

Since in the dataset each image appears 5 times with different captions, we need to make sure that in the trainining, validation and test sets there are different images, otherwise validation and testing wouldn't be done on unseen data. We first split with percentages 70 - 20 - 10; to increase the number of training data, we use all 5 captions for each image, therefore repeating images in the train, but keep unique images in the validation and test sets.

In [23]:
from torch.utils.data import random_split

image_capidx_df = pd.DataFrame(
    {
        "image_name": archive_df["image_name"],
        "caption": archive_df["caption"],
    }
)
#print(image_capidx_df.head)

#For the first splitting, consider only unique images
unique_images = image_capidx_df["image_name"].unique()
num_images = len(unique_images)
unique_images_df = pd.DataFrame(unique_images, columns=["image_name"])

np.random.seed(42)  #for reproducibility
np.random.shuffle(unique_images)

train_images, val_images, test_images = random_split(range(num_images), [0.7, 0.2, 0.1])

train_image_names = unique_images_df.iloc[train_images.indices].image_name.tolist()
val_image_names = unique_images_df.iloc[val_images.indices].image_name.tolist()
test_image_names = unique_images_df.iloc[test_images.indices].image_name.tolist()

train_df = image_capidx_df[
    image_capidx_df["image_name"].isin(train_image_names)
].reset_index(drop=True)

val_df = (
    image_capidx_df[image_capidx_df["image_name"].isin(val_image_names)]
    .groupby("image_name", as_index=False)
    .agg({"caption": lambda x: x.sample(1).values[0]})  #Randomly select one caption
).reset_index(drop=True)

test_df = (
    image_capidx_df[image_capidx_df["image_name"].isin(test_image_names)]
    .groupby("image_name", as_index=False)
    .agg({"caption": lambda x: x.sample(1).values[0]})  #Randomly select one caption
).reset_index(drop=True)

print(f"Train dataset size:\t {train_df.shape[0]}")   #contains 5 repetition per image but with different captions
print(f"Validation dataset size: {val_df.shape[0]}")  #only contains unique images
print(f"Test dataset size:\t {test_df.shape[0]}")     #only contains unique images

Train dataset size:	 111245
Validation dataset size: 6356
Test dataset size:	 3178


In [24]:
from PIL import Image
from torch.utils.data import Dataset, DataLoader
from torchvision import transforms


class Flickr30kImages(Dataset):
    def __init__(self, dataframe, transform=None):
        self.dataframe = dataframe
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        image_name = self.dataframe.iloc[idx]["image_name"]
        caption = self.dataframe.iloc[idx]["caption"]
        tensor_captioning_indices = caption_to_padded_indices(
            our_vocab, caption, target_len
        )

        image_path = os.path.join(images_path, image_name)
        image = Image.open(image_path)

        if self.transform:
            image = self.transform(image)

        return image, tensor_captioning_indices

    def get_caption(self, idx):
        return self.dataframe.iloc[idx]["caption"]


def display_image(index, dataset):
    if index >= len(dataset):
        raise ValueError(f"Index is out of bounds for the dataset")
    image, _ = dataset[index]
    image = image.permute(1, 2, 0).detach().numpy()
    # image = (image * 255).astype('uint8')
    height, width, _ = image.shape
    plt.xlim(0, width)
    plt.ylim(height, 0)
    plt.imshow(image)
    plt.title(dataset.get_caption(index))
    plt.axis("off")
    plt.show()

transform = transforms.Compose([transforms.Resize((224, 224)), transforms.ToTensor()])
ex_dataset = Flickr30kImages(dataframe=train_df, transform=transform)
dataloader = DataLoader(ex_dataset, batch_size=1, shuffle=True)
display_image(2, ex_dataset)


We define compare_transforms for later evaluation with blurred pictures

In [25]:
def compare_transforms(transformations, index_image):
    if not all(isinstance(transf, Dataset) for transf in transformations):
        raise TypeError(
            "All elements in the `transformations` list need to be of type Dataset"
        )

    num_tr = len(transformations)
    fig, axes = plt.subplots(1, num_tr, figsize=(num_tr * 4, 4))

    if num_tr == 1:
        axes = [axes]

    image, _ = transformations[0][index_image]

    for counter, (axis, transf) in enumerate(zip(axes, transformations)):
        image, _ = transf[index_image]
        if isinstance(image, torch.Tensor):
            image = image.permute(1, 2, 0).detach().numpy()
            if image.shape[2] == 3:
                if image.max() <= 1.0:
                    image = (image * 255).astype("uint8")
        axis.imshow(image)
        axis.axis("off")

    plt.suptitle(transformations[0].get_caption(index_image))
    plt.tight_layout()
    plt.show()


## NN model
### VGG + GRU architecture

We define a first architecture for image captioning by combining the pre-trained VGG16 model and the GRU unit.

In [28]:
from torchvision import models
from torchvision.transforms import Resize, ToTensor, Normalize


class NN_conv_gru(nn.Module):
    """
    A neural network model for image captioning that combines a pre-trained VGG16 convolutional neural network with a Gated Recurrent Unit (GRU). 

    This model is designed to extract features from input images using VGG16, which is initialized with weights pre-trained on the ImageNet dataset. 
    The model then processes these features through a GRU cell to generate sequences of predicted word distributions.

    Attributes:
        input_size (int): The size of the input features for the GRU.
        hidden_size (int): The size of the hidden state in the GRU.
        output_size (int): The size of the output layer, corresponding to the vocabulary size.
        target_len (int): The length of the output sequence to generate.

    Methods:
        forward_cnn(input_batch): Extracts image features using the VGG16 model.
        forward_gru(x, h): Processes the input through the GRU and generates output word scores.
        forward(input): Combines feature extraction and sequence generation, returning a tensor of predicted word distributions based on the input images.
    """

    def __init__(self, input_size, hidden_size, output_size, target_len):
        super().__init__()
        self.input_size = input_size
        self.hidden_size = hidden_size
        self.output_size = output_size
        self.target_len = target_len

        self.vgg_model = models.vgg16(weights=models.VGG16_Weights.IMAGENET1K_V1)
        feature_dim = self.vgg_model.classifier[0].in_features
        self.vgg_model.classifier = nn.Identity()

        for param in self.vgg_model.parameters():
            param.requires_grad = False

        self.fc_between1 = nn.Linear(feature_dim, feature_dim // 4)  #to adapt to gru
        self.fc_between2 = nn.Linear(feature_dim // 4, hidden_size)  

        self.GRU = nn.GRUCell(input_size, hidden_size, bias=True, dtype=torch.float32)
        self.h_to_words = nn.Linear(hidden_size, output_size)

        self.log_softmax = nn.LogSoftmax(dim=1)
        self.tanh = nn.Tanh()
        self.sigmoid = nn.Sigmoid()

    def forward_cnn(self, input_batch):
        x = self.vgg_model(input_batch)
        return x.squeeze()

    def forward_gru(self, x, h):
        h = self.GRU(x, h)
        y = self.h_to_words(h)
        return y, h

    def forward(self, input):
        x = self.forward_cnn(input)

        h = self.fc_between1(x)
        h = self.fc_between2(h)
        num_batches = input.shape[0]

        y = torch.zeros((num_batches, self.target_len + 1, self.output_size))
        y[:, 0, :] = our_vocab.one_hot_encode("<SOS>")
        y = y.to(x.device)
        #feed the indexed caption as it grows until target_len

        for t in range(self.target_len):
            temp_y = y[:, t, :].clone()
            temp_output, h = self.forward_gru(temp_y, h)
            y = y.clone()
            y[:, t + 1, :] = temp_output
        y = y[:, 1:, :]  #remove the SOS token
        return y

## Training

We have trained the model on the train_dataset (approximately 110 000 pairs image-caption)

In [29]:
def training_loop(
    model, optimizer, loss_fn, train_loader, val_loader, num_epochs, print_every
):
    print("Starting training")
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.to(device)
    train_losses, val_losses = [], []

    for epoch in range(1, num_epochs + 1):
        model, train_loss = train_epoch(model, optimizer, loss_fn, train_loader, val_loader, device, print_every)
        val_loss = validate(model, loss_fn, val_loader, device)
        print(
            f"Epoch {epoch}/{num_epochs}: "
            f"Train loss: {sum(train_loss)/len(train_loss):.3f}, "
            f"Val. loss: {val_loss:.3f}, "
        )
        train_losses.extend(train_loss)
        val_losses.append(val_loss)
    return model, train_losses, val_losses


def train_epoch(
    model, optimizer, loss_fn, train_loader, val_loader, device, print_every
):
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    model.train()
    train_loss_batches = []
    num_batches = len(train_loader)
    for batch_index, (x, y) in enumerate(train_loader, 1):
        inputs, targets = x.to(device), y.to(device)
        optimizer.zero_grad()
        torch.autograd.set_detect_anomaly(True)

        logits = model.forward(inputs)
        logits = logits.reshape(-1, logits.size(-1))
        targets = targets.reshape(-1)

        logits = logits.to(device)
        targets = targets.to(device)

        loss = loss_fn(logits, targets)
        loss.backward()
        optimizer.step()
        train_loss_batches.append(loss.item())

        if batch_index % 25 == 0 or batch_index == 1:
            print(f"Batch {batch_index} at time {dt.now()}, loss: {loss.item()}")

        if print_every is not None and batch_index % print_every == 0:
            val_loss = validate(model, loss_fn, val_loader, device)
            model.train()
            print(
                f"\tBatch {batch_index}/{num_batches}: "
                f"\tTrain loss: {sum(train_loss_batches[-print_every:])/print_every:.3f}, "
                f"\tVal. loss: {val_loss:.3f}, "
            )

    return model, train_loss_batches


def validate(model, loss_fn, val_loader, device):
    val_loss_cum = 0
    model.eval()
    with torch.no_grad():
        for batch_index, (x, y) in enumerate(val_loader, 1):
            inputs, targets = x.to(device), y.to(device)
            logits = model.forward(inputs)
            logits = logits.reshape(-1, logits.size(-1))
            targets = targets.reshape(-1)
            batch_loss = loss_fn(logits, targets)
            val_loss_cum += batch_loss.item()
    return val_loss_cum / len(val_loader)

We have saved the trained model in file ' ..... ', to avoid re-training it every time. 

The weights used in the loss function are the normalized wieghts extracted from the dictionary.

In [None]:
input_size  = vocab_size 
hidden_size = 512
output_size = input_size
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

model = NN_conv_gru(input_size, hidden_size, output_size, target_len)
learning_rate = 0.0000005
optimizer = torch.optim.Adam(params=model.parameters(), lr=learning_rate)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
class_weights = class_weights / class_weights.sum()
class_weights = class_weights.to(device)

loss_fn = nn.CrossEntropyLoss(weight=class_weights.to(device))
num_epochs = 1
batch_size = 128

#To feed the images to VGG
mean = [0.485, 0.456, 0.406]
std = [0.229, 0.224, 0.225]
transform = transforms.Compose(
    [Resize((224, 224)), ToTensor(), Normalize(mean, std, inplace=True)]
)

train_dataset = Flickr30kImages(dataframe=train_df, transform=transform)
train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=4)
val_dataset = Flickr30kImages(dataframe=val_df, transform=transform)
val_loader = DataLoader(val_dataset, batch_size=batch_size, num_workers=4)

print(f"Training batches: {len(train_loader)}, Validation batches: {len(val_loader)}")
torch.cuda.empty_cache()

checkpoint = torch.load("VGG-GRU-model_v7.ckpt", map_location=device)
model.load_state_dict(checkpoint["model_state_dict"])
del checkpoint

if False:
    model, train_losses, val_losses = training_loop(
        model,
        optimizer,
        loss_fn,
        train_loader,
        val_loader,
        num_epochs,
        print_every=len(train_loader) // 3,
    )

    time = dt.now().strftime("%Y-%m-%d__%H:%M:%S")
    torch.save(
        {
            "model_state_dict": model.state_dict(),
            "train_losses": train_losses,
            "val_losses": val_losses,
        },
        f"./VGG-GRU-model_{time}.ckpt",
    )

## Results' analysis

To qualitatively analyze the results, we need a function which converts the output of the model, that is a distribution of probabilities, into word indices, according to our Vocabulary class.

In [32]:
def logits_to_captions(logits, vocab):
    """
    Converts output logits from the neural network into a human-readable caption string.

    This function processes the logits and extracts the most likely word indices. 
    The predicted indices are then mapped to words using the provided vocabulary, stopping the conversion when the end-of-sequence token ("<EOS>") is encountered. 

    Parameters:
        logits (torch.Tensor): A tensor containing predicted word scores, either in shape (batch_size, vocab_size) or (vocab_size).
        vocab (Vocabulary): An object that contains a mapping from indices to words (idx2word).

    Returns:
        str: The generated caption as a space-separated string of words.
    """

    if len(logits.shape) == 2:
        _, predicted_indices = logits.max(dim=-1)
    else:
        predicted_indices = logits
    caption = []
    for idx in predicted_indices:
        word = vocab.idx2word[idx.item()]
        if word == "<EOS>":
            break
        if word != "<SOS>":
            caption.append(word)
    caption_string = " ".join(caption)
    return caption_string

In [None]:
class Flickr30kImages_names(Dataset):
    def __init__(self, dataframe, transform=None):
        self.dataframe = dataframe
        self.transform = transform

    def __len__(self):
        return len(self.dataframe)

    def __getitem__(self, idx):
        image_name = self.dataframe.iloc[idx]["image_name"]
        caption = self.dataframe.iloc[idx]["caption"]
        tensor_captioning_indices = caption_to_padded_indices(
            our_vocab, caption, target_len
        )

        image_path = os.path.join(images_path, image_name)
        image = Image.open(image_path)

        if self.transform:
            image = self.transform(image)

        return image, image_name

    def get_caption(self, idx):
        return self.dataframe.iloc[idx]["caption"]

    def get_image_name(self, idx):
        return self.dataframe.iloc[idx]["image_name"]

In [None]:
if False:
    test_dataset = Flickr30kImages_names(dataframe=test_df, transform=transform)
    test_loader = DataLoader(val_dataset, batch_size=32, num_workers=0)
    print(f"Starting testing loop with {len(test_loader)} batches")
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
    print(device)
    model.to(device)
    test_predicted_dict = {}
    with torch.no_grad():
        for i, (x, y) in enumerate(test_loader):
            print(f"Batch {i+1}/{len(test_loader)}")
            logits = model.forward(x.to(device)).cpu()
            for batch_idx in range(logits.size(0)):
                generated_caption = logits_to_captions(
                    logits[batch_idx, :, :], our_vocab
                )
                image_name = y[batch_idx]
                test_predicted_dict[image_name] = generated_caption

    torch.save(test_predicted_dict, "test_predicted_dict.pkl")
else:
    test_predicted_dict = torch.load("test_predicted_dict_GRU.pkl")

In [None]:
# test_predicted_dict = img_name -> caption
# filtered_ground_truth_dict = img_name -> [caption]

if False:
    test_ground_truth_dict = {}
    for img_name in test_predicted_dict.keys():
        captions_list = image_caption_df[image_capidx_df["image_name"] == img_name][
            "caption"
        ].tolist()
        test_ground_truth_dict[img_name] = []

        for caption in captions_list:
            capt = caption.split(" ")
            if not len(capt) <= target_len - 1:  # -1 because of <EOS>
                capt = capt[:target_len]
            test_ground_truth_dict[img_name].append(" ".join(capt))

    torch.save(test_ground_truth_dict, "test_ground_truth_dict.pkl")
else:
    test_ground_truth_dict = torch.load("test_ground_truth_dict.pkl")

To quantitavely evaluate the model's performance, we use CIDEr, METEOR, ROUGE-1, ROUGE-2 and ROUGE-L scores

In [None]:
!pip install evaluate rouge_score

In [None]:
from pycocoevalcap.cider.cider import Cider
import evaluate
from rouge_score import rouge_scorer

cider_scorer = Cider()
cider_score, _ = cider_scorer.compute_score(test_ground_truth_dict, test_predicted_dict)

meteor = evaluate.load("meteor")
predictions = [test_predicted_dict[img_name][0] for img_name in test_predicted_dict]
references = [test_ground_truth_dict[img_name][0] for img_name in test_ground_truth_dict]
meteor_score = meteor.compute(predictions=predictions, references=references)

scorer = rouge_scorer.RougeScorer(["rouge1", "rouge2", "rougeL"], use_stemmer=True)
rouge_1_scores = []
rouge_2_scores = []
rouge_l_scores = []
for image_name in test_predicted_dict.keys():
    references = test_ground_truth_dict[image_name]
    prediction = test_predicted_dict[image_name][0]
    scores = scorer.score(references[0], prediction)
    rouge_1_scores.append(scores["rouge1"].fmeasure)
    rouge_2_scores.append(scores["rouge2"].fmeasure)
    rouge_l_scores.append(scores["rougeL"].fmeasure)
average_rouge_1 = sum(rouge_1_scores) / len(rouge_1_scores)
average_rouge_2 = sum(rouge_2_scores) / len(rouge_2_scores)
average_rouge_l = sum(rouge_l_scores) / len(rouge_l_scores)


print(f"CIDEr score for test dataset: {cider_score}")
print(f"METEOR score: {meteor_score['meteor']}")
print(f"Average ROUGE-1 Score: {average_rouge_1}")
print(f"Average ROUGE-2 Score: {average_rouge_2}")
print(f"Average ROUGE-L Score: {average_rouge_l}")

## Evaluation on blurred images

A further evaluation to test the robustness of the model to variations in input quality can be done by applying GaussianBlur to the images in the test set.

In [None]:
transform_blur = transforms.Compose(
    [
        GaussianBlur(kernel_size=31, sigma=3),
        Resize((224, 224)),
        ToTensor(),
        Normalize(mean, std, inplace=True),
    ]
)

In [None]:
if True:
    blur_test_dataset = Flickr30kImages_names(dataframe=test_df, transform=None)
    blur_test_loader = DataLoader(blur_test_dataset, batch_size=1, num_workers=2)

    print(f"Starting testing loop with {len(test_loader)} batches")
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    print(device)
    model.to(device)

    blur_predicted_dict = {}
    with torch.no_grad():
        for i, (image, imagename) in enumerate(test_dataset):
            if i%100==0:
                print(f"Batch {i+1}/{len(test_dataset)}")

            
            #image = torchvision.transforms.functional.to_pil_image(x[0])
            image = GaussianBlur(kernel_size=31, sigma=3)(image)
            
            pixel_values = processor(images=image, return_tensors="pt").pixel_values.to(device)

            generated_ids = model.generate(pixel_values=pixel_values.clone().to(device), max_length=80)
            generated_caption = processor.batch_decode(
                generated_ids, skip_special_tokens=True
            )

            blur_predicted_dict[imagename] = generated_caption
            #print(generated_caption)
                
    torch.save(blur_predicted_dict, "blur_predicted_dict_GIT_blur.pkl")
else:
    pass
    #blur_predicted_dict = torch.load("blur_predicted_dict_GRU.pkl")

In [None]:
cider_scorer = Cider()
cider_score, _ = cider_scorer.compute_score(test_ground_truth_dict, blur_predicted_dict)

meteor = evaluate.load("meteor")
predictions = [blur_predicted_dict[img_name][0] for img_name in blur_predicted_dict]
references = [test_ground_truth_dict[img_name][0] for img_name in test_ground_truth_dict]
meteor_score = meteor.compute(predictions=predictions, references=references)

scorer = rouge_scorer.RougeScorer(["rouge1", "rouge2", "rougeL"], use_stemmer=True)
rouge_1_scores = []
rouge_2_scores = []
rouge_l_scores = []
for image_name in blur_predicted_dict.keys():
    references = test_ground_truth_dict[image_name]
    prediction = blur_predicted_dict[image_name][0]
    scores = scorer.score(references[0], prediction)
    rouge_1_scores.append(scores["rouge1"].fmeasure)
    rouge_2_scores.append(scores["rouge2"].fmeasure)
    rouge_l_scores.append(scores["rougeL"].fmeasure)
average_rouge_1 = sum(rouge_1_scores) / len(rouge_1_scores)
average_rouge_2 = sum(rouge_2_scores) / len(rouge_2_scores)
average_rouge_l = sum(rouge_l_scores) / len(rouge_l_scores)


print(f"CIDEr score for blurred test dataset: {cider_score}")
print(f"METEOR score: {meteor_score['meteor']}")
print(f"Average ROUGE-1 Score: {average_rouge_1}")
print(f"Average ROUGE-2 Score: {average_rouge_2}")
print(f"Average ROUGE-L Score: {average_rouge_l}")

# SAVE to revise!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!!

In [None]:
test_dataset = Flickr30kImages(dataframe=test_df, transform=transform)
test_loader = DataLoader(test_dataset, batch_size=3, num_workers=0)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
model.to(device)

with torch.no_grad():
    for batch_idx, (x, _) in enumerate(val_loader):  
        logits = model.forward(x.to(device))       
        for i in range(logits.size(0)):
            image_name = test_loader.dataset.get_image_name(batch_idx * test_loader.batch_size + i)  
            print( logits_to_captions(logits[i, :, :], our_vocab),  " ||| ",  image_name,  logits_to_captions(y[batch_idx, :], our_vocab))


In [None]:
# test_predicted_dict = img_name -> caption
# filtered_ground_truth_dict = img_name -> [caption]

if False:
    test_ground_truth_dict = {}
    for img_name in test_predicted_dict.keys():
        captions_list = image_caption_df[image_capidx_df["image_name"] == img_name][
            "caption"
        ].tolist()
        test_ground_truth_dict[img_name] = []

        for caption in captions_list:
            capt = caption.split(" ")
            if not len(capt) <= target_len - 1:  # -1 because of <EOS>
                capt = capt[:target_len]
            test_ground_truth_dict[img_name].append(" ".join(capt))

    torch.save(test_ground_truth_dict, "test_ground_truth_dict.pkl")
else:
    test_ground_truth_dict = torch.load("test_ground_truth_dict.pkl")

In [None]:
blur_test_dataset = Flickr30kImages(dataframe=test_df, transform=transform_blur)
blur_test_loader = DataLoader(blur_test_dataset, batch_size=3, num_workers=0)

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(device)
model.to(device)

with torch.no_grad():
    for batch_idx, (x, _) in enumerate(val_loader):  
        logits = model.forward(x.to(device))       
        for i in range(logits.size(0)):
            image_name = blur_test_loader.dataset.get_image_name(batch_idx * blur_test_loader.batch_size + i)  
            print( logits_to_captions(logits[i, :, :], our_vocab),  " ||| ",  image_name,  logits_to_captions(y[batch_idx, :], our_vocab))