In [None]:
import os
from pathlib import Path
import json

import random
import re

import pandas as pd

import torch
from torch.utils.data.dataset import Dataset

from PIL import Image

import evaluate
import transformers

from transformers import RobertaTokenizerFast

from transformers import VisionEncoderDecoderModel
from transformers import ViTFeatureExtractor

from transformers import default_data_collator
from transformers import Trainer, TrainingArguments

In [None]:
BATCH_SIZE = 5
VAL_EPOCHS = 1 
LEARNING_RATE = 1e-2
SEED = 42
MAX_LEN = 128

TRAIN_EPOCHS = 5

# Training Encoder-Decoder

## Abstract

In this notebook we will use the decoder from the previous notebook to train our image captioning model. But first we will take a look at some concepts that we did not discuss in the previous notebook.

## Concepts

### Vision Transformer (ViT)

A Vision Transformer is a deep learning model that is used for computer vision tasks, such as image classification and object detection. It is based on the Transformer architecture, originally developed for natural language processing tasks. The model works by encoding an image into a set of feature maps and then passing these maps through multiple self-attention layers. The self-attention mechanism helps the model to identify important features in the image and to learn the relationships between them. This allows the model to focus on specific regions of the image and to make predictions based on those features. The output of the self-attention layers is then fed into a feedforward network to make a final prediction. The model repeats this process multiple times to refine the predictions. This architecture allows the Vision Transformer to learn both global and local features in an image, making it well-suited for a variety of computer vision tasks.

### Metrics

Also, we need to look at some metrics to properly understand how good our model gets with each epoch of training.

#### ROUGE

ROUGE (Recall-Oriented Understudy for Gisting Evaluation) is a set of metrics for evaluating the quality of text summarization and machine translation outputs by comparing an automatic summary or translation to a reference or a set of references. The metrics calculate the overlap between n-grams, word sequences, or sentences of the automatic and reference summaries, giving scores for recall, precision, and the harmonic mean (F1 score). ROUGE is widely used in the evaluation of summarization and machine translation tasks in natural language processing.

#### METEOR

METEOR (Metric for Evaluation of Translation with Explicit Ordering) is a metric for evaluating the quality of machine translation and text summarization outputs. It is similar to BLEU, but takes into account synonymy, stemming, and fluency in addition to n-gram overlap.

$$ \text{METEOR} = ( \text{harmonic mean of precision and recall}) * (\text{brevity penalty} + (1 - \text{brevity penalty}) * \text{harmonic mean of exact match and stem match})$$

The precision and recall are calculated as the ratio of the number of correctly translated words to the total number of words in the generated and reference summaries, respectively. The brevity penalty is applied to adjust the score based on the length of the generated summary relative to the reference summary. If the generated summary is shorter than the reference summary, the brevity penalty is applied to lower the METEOR score.

The exact match component of the METEOR score measures the overlap between the generated and reference summaries in terms of exact word matches. The stem match component measures the overlap between the generated and reference summaries in terms of word stems, taking into account synonymy.

In summary, METEOR provides a more nuanced measure of the quality of machine translation and text summarization outputs than BLEU, as it takes into account factors such as synonymy and fluency in addition to n-gram overlap.

#### BLEU

BLEU (Bilingual Evaluation Understudy) is a commonly used metric for evaluating the quality of machine translation and text summarization outputs. It calculates the n-gram overlap between the generated output and a reference or a set of references. The score is based on the idea that the closer the generated output is to the reference, the better the quality of the output.

The formula for BLEU is as follows:

$$ \text{BLEU} = exp(sum(log(p_i))) * \text{BP} $$

where $p_i$ is the modified precision for each n-gram order (1 to 4), $\text{BP}$ is the brevity penalty, and $exp$ is the exponential function. The modified precision is calculated as the ratio of the number of correctly translated n-grams in the generated output to the total number of n-grams in the generated output, clipped to the number of corresponding n-grams in the reference(s).

The brevity penalty is used to adjust the BLEU score based on the length of the generated output relative to the reference(s). If the generated output is shorter than the reference(s), the brevity penalty is applied to lower the BLEU score.

BLEU scores range from 0 to 1, with higher scores indicating better quality outputs. The score provides a quick and simple way to evaluate the quality of machine translation and text summarization outputs, but it has some limitations, such as not taking into account synonymy or fluency.

### Beam search
Beam search is a heuristic search algorithm used in natural language processing and computer vision to find the most likely sequence of elements in a large search space. It works by maintaining a fixed number of "beams" or candidate sequences, at each step of the search, the beams are expanded by adding the next element to each beam and scoring the resulting sequence, then the beams are pruned, keeping only the top N highest-scoring beams, where N is the width of the beam. The algorithm continues until a stopping criterion is met, such as finding an end-of-sequence symbol or reaching a maximum number of steps. Beam search is a trade-off between the completeness of an exhaustive search and the efficiency of greedy search, as it balances the exploration of the search space and exploitation of the highest-scoring sequences.

## Preparing the Dataset

First, we will load the captions dictionary.

In [None]:
with open('./data/captions.json', 'r') as captions_file:
    caption_dict = json.load(captions_file)
    del captions_file

Then we will change the paths to the directory since the original paths are `/content/filename.jpg`.

In [None]:
images_path = './data/Images/'
images = list(caption_dict.keys())

for image_path in images:
    if image_path.endswith('jpg'):
        new = images_path + image_path.split('/')[-1]
        caption_dict[new] = caption_dict.pop(image_path)
    else:
        caption_dict.pop(image_path)

Now, we will do train/test split on our captions because we need eval dataset for the training process.

In [None]:
def train_test_split(dictionary, test_size_frac):
    images = dictionary.keys()
    images_test = random.sample(images,int(test_size_frac*len(images)))
    images_train = [img for img in images if img not in images_test]

    train_dict = {
      img: dictionary[img] for img in images_train
    }

    test_dict = {
      img: dictionary[img] for img in images_test
    }
    return(train_dict,test_dict)

train,test = train_test_split(caption_dict, 0.3)

We will now create a pandas DataFrame because later we will create a PyTorch dataset in which we will do feature extraction on images and tokenization on texts.

In [None]:
def get_df(dictionary):
    df = pd.DataFrame([])

    captions = []
    images = []
    for image in list(caption_dict.keys()):
        caption = caption_dict[image]
        for capt in caption:
            captions.append(' '.join(capt.replace('<s> ','').replace('  <e>','').strip().split(' ')[:30]))
            images.append(image)

    df['images'] = images
    df['captions'] = captions
    return(df)

train_df = get_df(train)
test_df = get_df(test)

## Preparing and Training Model

In this section we will prepare and train our image captioning model.

First, we need to load tokenizer.

In [None]:
tokenizer = RobertaTokenizerFast.from_pretrained('./models/Byte_tokenizer', max_len=MAX_LEN)

Then we will load our feature_extractor for use below.

In [None]:
feature_extractor = ViTFeatureExtractor.from_pretrained("google/vit-base-patch16-224")

Now, we will create a class for PyTorch dataset in which every time we get an item, its image is preprocessed using the feature extractor and each caption is tokenized.

In [None]:
class ImageCaptioningDataset(Dataset):
    def __init__(self, df, tokenizer,feature_extractor, decoder_max_length=31):
        self.df = df
        self.tokenizer = tokenizer
        self.feature_extractor = feature_extractor
        self.decoder_max_length = decoder_max_length

    def __len__(self):
        return len(self.df)

    def __getitem__(self, idx):
        # get file name + text 
        img_path = self.df['images'][idx]
        caption = self.df['captions'][idx]
        # prepare image (i.e. resize + normalize)
        image = Image.open(img_path).convert("RGB")
        pixel_values = self.feature_extractor(image, return_tensors="pt").pixel_values
        # add labels (input_ids) by encoding the text
        labels = self.tokenizer(caption, truncation = True,
                                          padding="max_length", 
                                          max_length=self.decoder_max_length).input_ids
        # important: make sure that PAD tokens are ignored by the loss function
        labels = [label if label != self.tokenizer.pad_token_id else -100 for label in labels]

        encoding = {"pixel_values": pixel_values.squeeze(), "labels": torch.tensor(labels)}
        return encoding
    
train_dataset = ImageCaptioningDataset(df=train_df,
                           tokenizer=tokenizer,
                          feature_extractor= feature_extractor)
eval_dataset = ImageCaptioningDataset(df=test_df,
                           tokenizer=tokenizer,feature_extractor= feature_extractor)


Now we create our model, its pretty simple. The parameter `tie_encoder_decoder` is used to create a cross attention head connection.

In [None]:
# set encoder decoder tying to True
model = VisionEncoderDecoderModel.from_encoder_decoder_pretrained\
                    ("google/vit-base-patch16-224", './models/RobertaDecoder/', tie_encoder_decoder=True)

Now, we need to configure it.

First, we need to set token-related properties. Here is the list:

* decoder_start_token_id - The ID of the decoder start token
* pad_token_id - The ID of the pad token
* eos_token_id - The id of the end of sequence token
* vocab_size - The size of vocabulary

In [None]:
model.config.decoder_start_token_id = tokenizer.cls_token_id
model.config.pad_token_id = tokenizer.pad_token_id
model.config.eos_token_id = tokenizer.sep_token_id
model.config.vocab_size = model.config.decoder.vocab_size

The we need to configure parameters for beam search. Here is the list:

* max_length - Maximum sequence length
* no_repeat_ngram_size - The ngram size. Used to ensure that all words in an ngram are unique.
* length_penalty - The length penalty is a scalar value that is applied to the final score of each candidate sequence, based on its length. The idea behind this technique is to give longer sequences a lower score, in order to encourage the generation of shorter outputs.
* num_beams - Number of beams in beam search

In [None]:
model.config.max_length = MAX_LEN
model.config.no_repeat_ngram_size = 3
model.config.length_penalty = 2.0
model.config.num_beams = 5

After configuring model, we need to define a function that computes all metrics we want to use for checking model quality. In our case we use ROUGE, METEOR and BLEU. The function will decode texts and use the objects loaded from `evaluate` library to compute the metrics.

In [None]:
# load validation metrics
rouge = evaluate.load("rouge")
meteor = evaluate.load("meteor")
bleu = evaluate.load("bleu")

def compute_metrics(pred):
    labels_ids = pred.label_ids
    pred_ids = pred.predictions

    # all unnecessary tokens are removed
    pred_str = tokenizer.batch_decode(pred_ids, skip_special_tokens=True)
    labels_ids[labels_ids == -100] = tokenizer.pad_token_id
    label_str = tokenizer.batch_decode(labels_ids, skip_special_tokens=True)

    rouge_output = rouge.compute(predictions=pred_str, references=label_str)
    meteor_output = meteor.compute(predictions=pred_str, references=label_str)
    bleu_output = bleu.compute(predictions=pred_str, references=label_str)

    metrics_output = rouge_output
    metrics_output.update({"meteor": meteor_output["meteor"], "bleu": bleu_output["bleu"]})

    return metrics_output

## Training Image Captioning model

In this section we will train our model. We already know all of the parameters from the previous notebook so I do not feel need to explain them again.

In [None]:
captioning_model = './models/Untrained_VIT_Captioning'

training_args = TrainingArguments(
    output_dir=captioning_model,
    per_device_train_batch_size=BATCH_SIZE,
    per_device_eval_batch_size=BATCH_SIZE,
    evaluation_strategy="epoch",
    save_steps=2048,
    num_train_epochs = TRAIN_EPOCHS,
    overwrite_output_dir=True,
    save_total_limit=1,
)

In [None]:
trainer = Trainer(
    tokenizer=feature_extractor,
    model=model,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset=train_dataset,
    eval_dataset=eval_dataset,
    data_collator=default_data_collator,
)

And we start training. 

Important Warning: The model uses around 4 gigabytes of gpu memory.

In [None]:
# Fine-tune the model, training and evaluating on the train dataset
trainer.train()

Let's save the trained model.

In [None]:
trainer.save_model('./models/Trained_VIT_Captioning')

## Loading and Evaluating performance

In this section we will test it on some images from test set. You can also use your own images, just call our captioning function with your own path.

First, we will load our model.

In [None]:
model = VisionEncoderDecoderModel.from_pretrained('./models/Trained_VIT_Captioning')

Then we need to create the captioning function. It will open image, apply feature extraction, generate caption, decode it, replace special tokens and remove multiple spaces.

In [None]:
def caption_image(image_path):
    opened_image = Image.open(image_path).convert("RGB")
    feature_extraction = feature_extractor(opened_image, return_tensors="pt").pixel_values
    generated_caption = model.generate(feature_extraction)[0]
    decoded_caption = tokenizer.decode(generated_caption)
    
    decoded_caption = decoded_caption.replace("<s>", "")
    decoded_caption = decoded_caption.replace("</s>", "")
    decoded_caption = re.sub("\s+", " ", decoded_caption)

    return decoded_caption.capitalize()

Then, we will get a random image from test set and display it.

In [None]:
random_path_from_test_set = test_df.sample(1).images.iloc[0]
Image.open(temp).convert("RGB")

Now, we can use our function to caption a random image from the test set, and for example, an image of a domino.

In [None]:
caption_image(random_path_from_test_set)

In [None]:
caption_image("Dominoes.webp")

It probably will not generate a good caption for the domino image, since it has never seen any domino.

## Conclusion

In these two notebook we passed through the required steps for creating image captioning model with HuggingFace and learned a bunch of new things (probably). I hope you liked this tutorial.

## References

* https://medium.com/@kalpeshmulye/image-captioning-using-hugging-face-vision-encoder-decoder-step-2-step-guide-part-1-495ecb05f0d5

* https://huggingface.co/docs/transformers/v4.26.0/en/main_classes/trainer#trainer

* https://huggingface.co/docs/transformers/v4.26.0/en/model_doc/roberta

* https://huggingface.co/docs/transformers/v4.26.0/en/model_doc/vit