## BLIP-Base - Formula Image to Text

In [None]:
!pip install -q transformers datasets
!pip install -q evaluate
!pip install -q sacrebleu rouge_score jiwer
!pip install wandb -Uqq

In [2]:
import transformers
from datasets import load_dataset, Image
from PIL import Image
import torch
import warnings
warnings.filterwarnings("ignore")

In [None]:
if torch.cuda.is_available():
  device = torch.device("cuda")
  print("There are %d GPU(s) available." % torch.cuda.device_count())
  print("We will use the GPU:", torch.cuda.get_device_name(0))
else:
  print("No GPU available, using the CPU instead.")
  device = torch.device("cpu")

### Load Data

In [None]:
from google.colab import drive
drive.mount('/content/drive')

In [None]:
from datasets import DatasetDict
data = DatasetDict.load_from_disk('/data/formula2text-4k')

### Configuration class

In [27]:
class cfc:
  checkpoint = "Salesforce/blip-image-captioning-base"
  img_dir = "/data/images_formulas/"
  test_file_path = "/data/datafiles/test_data.json"

  model_name = "BLIP-Base_image-to-text"
  model_dir  = f"/content/drive/MyDrive/models/{model_name}"
  wandb_project = "VLM"
  run_name = model_name

### Data Preprocessing

In [9]:
from torch.utils.data import Dataset
from PIL import Image

prefix = "a formula of "

class ImageCaptioningDataset(Dataset):

  def __init__(self, dataset, processor):
    self.dataset = dataset
    self.processor = processor

  def __len__(self):
    return len(self.dataset)

  def __getitem__(self, idx):
    item = self.dataset[idx]
    encoding = self.processor(images=item["image"], text=prefix+item["label"], padding="max_length", return_tensors="pt")

    encoding = {k: v.squeeze().to(device) for k,v in encoding.items()}
    return encoding

### Load model and processor

In [None]:
from transformers import BlipForConditionalGeneration, AutoProcessor

processor = AutoProcessor.from_pretrained(cfc.checkpoint)
model = BlipForConditionalGeneration.from_pretrained(cfc.checkpoint)

In [None]:
train_dataset = ImageCaptioningDataset(data["train"], processor)
valid_dataset = ImageCaptioningDataset(data["valid"], processor)

In [14]:
from torch.utils.data import DataLoader

train_dataloader = DataLoader(train_dataset, shuffle=True, batch_size=16)
valid_dataloader = DataLoader(valid_dataset, shuffle=True, batch_size=16)

### Fine-tuning the model

In [None]:
import wandb
wandb.login()

In [None]:
wandb.init(
    project=cfc.wandb_project,
    name = cfc.run_name,

    config={
        "architecture": "BLIP-Large",
        "dataset": "Formula2Text-4k",
        }
    )

In [None]:
from tqdm.notebook import tqdm
import torch

# Create an optimizer
optimizer = torch.optim.AdamW(model.parameters(), lr=5e-5)

# Check if GPU or CPU available
device = "cuda" if torch.cuda.is_available() else "cpu"
model.to(device)
print("device :", device)

num_epochs = 6
print(f"Number of epochs: {num_epochs}")
num_training_steps = num_epochs * len(train_dataloader)
print(f"Number of training steps: {num_training_steps}")
progress_bar = tqdm(range(num_training_steps))

# Train the model
model.train()

for epoch in range(num_epochs):
  print("Epoch:", epoch)
  for idx, batch in enumerate(train_dataloader):
    input_ids = batch.pop("input_ids").to(device)
    pixel_values = batch.pop("pixel_values").to(device)

    outputs = model(input_ids=input_ids, pixel_values=pixel_values, labels=input_ids)
    loss = outputs.loss
    print("Loss:", loss.item())
    loss.backward()

    optimizer.step()
    optimizer.zero_grad()
    progress_bar.update(1)

In [21]:
# Save the model
torch.save(model.state_dict(), "../models/BLIP-Base-Image-to-Text.pt")

In [None]:
wandb.finish()

### Model Evaluation on Testset

In [23]:
from google.colab import files

In [24]:
!cp /content/drive/MyDrive/cf_module/cf_custom_functions.py /content

In [25]:
import cf_custom_functions as cf

In [31]:
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
pd.set_option('display.max_colwidth', None)

In [None]:
# Load testset
df_test = cf.load_test_data(cfc.test_file_path)

### Load pre-trained model

In [29]:
from transformers import AutoProcessor, BlipForConditionalGeneration

processor = AutoProcessor.from_pretrained(cfc.checkpoint)
model = BlipForConditionalGeneration.from_pretrained(cfc.checkpoint).to(device)

In [32]:
def generate_VLM_predictions(test_data:pd.DataFrame, model:object, processor:object, IMG_DIR:str) -> pd.DataFrame:

  df = test_data.copy()
  model = model
  image_path = IMG_DIR
  y_preds = []

  for i, entry in df.iterrows():
    image = Image.open(image_path + entry["image_name"]).convert('RGB')
    inputs = processor(images=image, return_tensors="pt").pixel_values.to(device)
    generated_ids = model.generate(inputs, max_length=50)
    generated_caption = processor.batch_decode(generated_ids, skip_special_tokens=True)[0].strip()
    y_preds.append(generated_caption)

  y_preds = np.array(y_preds)
  df["prediction"] = y_preds
  return df

In [None]:
# Predictions of test_data
df_preds_pt = generate_VLM_predictions(df_test,model,processor, cfc.img_dir)
df_preds_pt_clean = cf.post_processing_multi_predictions(df_preds_pt)

In [None]:
metrics_pt = cf.compute_evaluation_metrics(df_preds_pt_clean,"clean_prediction")
cf.save_evaluation_metrics(f"{cfc.model_name}_pretrained-new",metrics_pt,"../metrics/VLM_metrics.json")

### Load fine-tuned model

In [None]:
from transformers import AutoProcessor, BlipProcessor, BlipForConditionalGeneration, BlipConfig

config = BlipConfig.from_pretrained(cfc.checkpoint)
processor_ft = AutoProcessor.from_pretrained(cfc.checkpoint)
model_ft = BlipForConditionalGeneration(config=config)
model_ft.load_state_dict(torch.load(".../models/BLIP-Base-Image-to-Text.pt"))
model_ft = model_ft.to(device)
model_ft.eval()

In [None]:
# Predictions of test_data
df_preds_ft = generate_VLM_predictions(df_test,model_ft,processor_ft,cfc.img_dir)
df_preds_ft_clean = cf.post_processing_multi_predictions(df_preds_ft)

In [None]:
# Remove the prefix from the beginning of each string
df_preds_ft_clean["clean_prediction"] = df_preds_ft_clean["clean_prediction"].str.replace("a formula of ", "")

In [None]:
metrics_ft = cf.compute_evaluation_metrics(df_preds_ft_clean,"clean_prediction")
cf.save_evaluation_metrics(f"{cfc.model_name}_finetuned-new",metrics_ft,"../metrics/VLM_metrics.json")