## Using the Pre-Trained Vision Encoder Decoder Model for Image-to-Text generation

> Steps:
> 1. Import Pre-Trained VIT Model, Feature Extractor (used to prepare images for model intake by the decoder), Tokenizer (used to generate texts and convert texts to tokens for comparison).
> 2. Update model configuration to align with our task of text generation.
> 3. Import data from diffusion-db database, have enough for training but under our computational limits. Split the dataset into training, validation, and test datasets.
> 4. Define preprocessing steps to be applied to the dataset: tokenize the labels that we are trying to predict, and feed the images into the model's pre-trained tokenizer to return pixel values as numpy tensors.
> 5. Map the preprocessing function onto the entire dataset,
> 6. Defining training arguments for the Seq2Seq trainer.
> 7. Define the evaluation metric: BERT Score in this case.
> 8. Define the trainer object with our pre-defined training arguments.
> 9. Finetune the pre-trained model on our training data, validate using validation data.

### Import required packages

In [55]:
# Import packages
import os
import nltk
import torch
import evaluate
import numpy as np
from transformers import (
    VisionEncoderDecoderModel,
    AutoTokenizer,
    ViTImageProcessor,
    Seq2SeqTrainer,
    Seq2SeqTrainingArguments,
    default_data_collator,
    pipeline
)
from datasets import load_dataset

### Model Configurations

> Below, we obtain the nltk tokenizer used by huggingface. Define the metric as BERTSCORE (trying to figure out a way to write a custom cosine-similarity metric), import pre-trained model, tokenizer, and feature extractors from the VIT model created by Ankur Kumar. And assign the model to our GPU if it's available.

In [41]:
## Model Configurations
## ------------------------------------------------------------------------------------- ##
## ------------------------------------------------------------------------------------- ##
# Get tokenizer
try:
    nltk.data.find("tokenizers/punkt")
except (LookupError, OSError):
    nltk.download("punkt", quiet=True)

# Import bertscore as a metric
metric = evaluate.load("bertscore")

# Import pre-trained model, tokenizer, feature extractor
model = VisionEncoderDecoderModel.from_pretrained(
    "nlpconnect/vit-gpt2-image-captioning"
)
feature_extractor = ViTImageProcessor.from_pretrained(
    "nlpconnect/vit-gpt2-image-captioning"
)
tokenizer = AutoTokenizer.from_pretrained("nlpconnect/vit-gpt2-image-captioning")

# Assign model to GPU device if available
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
model.to(device)

VisionEncoderDecoderModel(
  (encoder): ViTModel(
    (embeddings): ViTEmbeddings(
      (patch_embeddings): ViTPatchEmbeddings(
        (projection): Conv2d(3, 768, kernel_size=(16, 16), stride=(16, 16))
      )
      (dropout): Dropout(p=0.0, inplace=False)
    )
    (encoder): ViTEncoder(
      (layer): ModuleList(
        (0-11): 12 x ViTLayer(
          (attention): ViTAttention(
            (attention): ViTSelfAttention(
              (query): Linear(in_features=768, out_features=768, bias=True)
              (key): Linear(in_features=768, out_features=768, bias=True)
              (value): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.0, inplace=False)
            )
            (output): ViTSelfOutput(
              (dense): Linear(in_features=768, out_features=768, bias=True)
              (dropout): Dropout(p=0.0, inplace=False)
            )
          )
          (intermediate): ViTIntermediate(
            (dense): Linear(in_featur

### Save pre-trained model

> We save the pre-trained model so we have a copy of it locally

In [42]:
output_dir = "./vit-gpt-model"
model.save_pretrained(output_dir)
feature_extractor.save_pretrained(output_dir)
tokenizer.save_pretrained(output_dir)

('./vit-gpt-model/tokenizer_config.json',
 './vit-gpt-model/special_tokens_map.json',
 './vit-gpt-model/vocab.json',
 './vit-gpt-model/merges.txt',
 './vit-gpt-model/added_tokens.json',
 './vit-gpt-model/tokenizer.json')

### Load in DiffusionDB data

> I wrote a custom function in the vit_train module to get the stable diffusion data and then split it into a training, a validation, and a test dataset.

In [43]:
from vit_train import get_sd_data
data = get_sd_data('poloclub/diffusiondb', '2m_first_1k')

Found cached dataset diffusiondb (/home/codespace/.cache/huggingface/datasets/poloclub___diffusiondb/2m_first_1k/0.9.1/547894e3a57aa647ead68c9faf148324098f47f2bc1ab6705d670721de9d89d1)
100%|██████████| 1/1 [00:00<00:00, 513.94it/s]


In [44]:
data

DatasetDict({
    train: Dataset({
        features: ['image', 'prompt', 'seed', 'step', 'cfg', 'sampler', 'width', 'height', 'user_name', 'timestamp', 'image_nsfw', 'prompt_nsfw'],
        num_rows: 600
    })
    val: Dataset({
        features: ['image', 'prompt', 'seed', 'step', 'cfg', 'sampler', 'width', 'height', 'user_name', 'timestamp', 'image_nsfw', 'prompt_nsfw'],
        num_rows: 200
    })
    test: Dataset({
        features: ['image', 'prompt', 'seed', 'step', 'cfg', 'sampler', 'width', 'height', 'user_name', 'timestamp', 'image_nsfw', 'prompt_nsfw'],
        num_rows: 200
    })
})

### Preprocessing

> With the raw image and text files, we'll have to apply preprocessing to the image features and text features to get pixel values and text tokens instead of PIL images and texts. We'll use the model's pre-trained tokenizer and feature extractor checkpoints so they'll work with the model.

In [45]:
# text preprocessing step
def tokenization_fn(captions, max_target_length):
    """Run tokenization on captions."""
    labels = tokenizer(captions, 
                      padding="max_length", 
                      truncation=True,
                      max_length=max_target_length).input_ids

    return labels

# image preprocessing step
def feature_extraction_fn(images):
    """
    Run feature extraction on images
    If `check_image` is `True`, the examples that fails during `Image.open()` will be caught and discarded.
    Otherwise, an exception will be thrown.
    """
    encoder_inputs = feature_extractor(images=images, return_tensors="np")

    return encoder_inputs.pixel_values

def preprocess_fn(examples, max_target_length):
    """Run tokenization + image feature extraction"""
    images = examples['image']
    captions = examples['prompt'] 
    
    model_inputs = {}
    # This contains image path column
    model_inputs['labels'] = tokenization_fn(captions, max_target_length)
    model_inputs['pixel_values'] = feature_extraction_fn(images)

    return model_inputs

> Below, we apply the preprocessing function, which combines the text pre-processing and image pre-processing and map it onto our entire dataset, to obtain two new features: pixel values, and labels (token ids) for our train, validation, and test sets. 

In [46]:
processed_dataset = data.map(
    function=preprocess_fn,
    batched=True,
    fn_kwargs={"max_target_length": 133},
    remove_columns=data['train'].column_names
)

                                                             

In [47]:
processed_dataset

DatasetDict({
    train: Dataset({
        features: ['labels', 'pixel_values'],
        num_rows: 600
    })
    val: Dataset({
        features: ['labels', 'pixel_values'],
        num_rows: 200
    })
    test: Dataset({
        features: ['labels', 'pixel_values'],
        num_rows: 200
    })
})

### Check to make sure all label embeddings are of the same length

In [48]:
for ind, label in enumerate(processed_dataset['test']['labels']):
  try: 
    assert(len(label) == 133)
  except AssertionError:
    print(f'label {ind} is fucked up')

### Define Seq2Seq Training Arguments

> The Seq2Seq trainer, which is a specific variation of the the trainer class, requires pre-defined training arguments. By setting evaluation strategy as steps, we allow the trainer to evaluate the model performance and output the metric into the logs directory. This will happen every 20 steps. The model will output its weights and variables into the image-captioning-output directory.

In [49]:
training_args = Seq2SeqTrainingArguments(
    predict_with_generate=True,
    evaluation_strategy="steps",
    per_device_train_batch_size=16,
    per_device_eval_batch_size=16,
    output_dir="./image-captioning-output",
    logging_dir = './logs',
    logging_steps = 20,
)

Using the `WANDB_DISABLED` environment variable is deprecated and will be removed in v5. Use the --report_to flag to control the integrations used for logging result (for instance --report_to none).


### Define Post-Processing and Metric Computation

> To return the predictions from token space to text space, we have to define a postprocessing function. This returns both the model's predictions and the actual labels to text space.

In [50]:
ignore_pad_token_for_loss = True


def postprocess_text(preds, labels):
    preds = [pred.strip() for pred in preds]
    labels = [label.strip() for label in labels]

    preds = ["\n".join(nltk.sent_tokenize(pred)) for pred in preds]
    labels = ["\n".join(nltk.sent_tokenize(label)) for label in labels]

    return preds, labels

> Here, we define how the model computes the metric, and we are using BERTScore for our model for now.

In [51]:
def compute_metrics(eval_preds):
    preds, labels = eval_preds
    if isinstance(preds, tuple):
        preds = preds[0]
    decoded_preds = tokenizer.batch_decode(preds, skip_special_tokens=True)
    if ignore_pad_token_for_loss:
        # Replace -100 in the labels as we can't decode them.
        labels = np.where(labels != -100, labels, tokenizer.pad_token_id)
    decoded_labels = tokenizer.batch_decode(labels, skip_special_tokens=True)

    # Some simple post-processing
    decoded_preds, decoded_labels = postprocess_text(decoded_preds,
                                                     decoded_labels)

    result = metric.compute(predictions=decoded_preds,
                            references=decoded_labels,
                            lang='en')
    return result

### Train the model

> Here, we combine the model, the training arguments, the metric computation function, and the training and validation datasets to define a trainer.

In [53]:
# instantiate trainer
trainer = Seq2SeqTrainer(
    model=model,
    tokenizer=feature_extractor,
    args=training_args,
    compute_metrics=compute_metrics,
    train_dataset=processed_dataset['train'],
    eval_dataset=processed_dataset['val'],
    data_collator=default_data_collator,
)

In [54]:
trainer.train()



Step,Training Loss,Validation Loss


TrainOutput(global_step=114, training_loss=1.3121079394691868, metrics={'train_runtime': 396.7151, 'train_samples_per_second': 4.537, 'train_steps_per_second': 0.287, 'total_flos': 3.248350040162304e+17, 'train_loss': 1.3121079394691868, 'epoch': 3.0})

### Save the models

In [57]:
trainer.save_model("./image-captioning-output")
tokenizer.save_pretrained("./image-captioning-output")
feature_extractor.save_pretrained("./image-captioning-output/")

['./image-captioning-output/preprocessor_config.json']

### Use the model for prediction

In [58]:
image_captioner = pipeline("image-to-text", model="./image-captioning-output")



In [None]:
image_captioner("sample_image.png")

### Try using the model in bits

In [3]:
from transformers import VisionEncoderDecoderModel, AutoFeatureExtractor, AutoTokenizer

tokenizer = AutoTokenizer.from_pretrained("./image-captioning-output/")
feature_extractor = AutoFeatureExtractor.from_pretrained("./image-captioning-output/")
model = VisionEncoderDecoderModel.from_pretrained("./image-captioning-output/")



In [59]:
from PIL import Image
import requests
from io import BytesIO

response = requests.get("https://ankur3107.github.io/assets/images/image-captioning-example.png")
img = Image.open(BytesIO(response.content))
image_captioner(img)



[{'generated_text': 'a soccer game in progress with a player in the middle of the field '}]