In [4]:
import datasets
import pandas as pd
from pathlib import Path
from PIL import Image
from transformers import AutoTokenizer
from transformers import Blip2Processor, Blip2VisionModel, Blip2QFormerModel, Blip2QFormerConfig, Blip2ForConditionalGeneration
from transformers import AutoProcessor, Blip2ForConditionalGeneration
import os
import bitsandbytes as bnb

import torch

from peft import LoraConfig, get_peft_model, LoftQConfig
from transformers import AutoModelForCausalLM, BitsAndBytesConfig
from peft import (
    LoraConfig,
    PeftConfig,
    get_peft_model,
    prepare_model_for_kbit_training,
)
from transformers import (
    AutoConfig,
    AutoModelForCausalLM,
    AutoTokenizer,
    BitsAndBytesConfig,)

In [10]:
from peft import LoraConfig, get_peft_model,LoftQConfig

config = LoraConfig(
    r=16,
    lora_alpha=32,
    lora_dropout=0.05,
    bias="none",
    target_modules=["q_proj", "k_proj"]
)

bnb_config = BitsAndBytesConfig(
    load_in_4bit=True,
    bnb_4bit_quant_type="nf4",
    bnb_4bit_compute_dtype=torch.bfloat16,
)

The argument `trust_remote_code` is to be used with Auto classes. It has no effect here and is ignored.
The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

In [11]:
class BlipZSIC:
    def __init__(self, bnb_config: BitsAndBytesConfig, loraConfig: LoraConfig) -> None:
        self.base = Blip2ForConditionalGeneration.from_pretrained(
            "Salesforce/blip2-opt-2.7b",  # TODO put back in "load_in_8bit" for model
            device_map={"": 0},
            trust_remote_code=True,
            quantization_config=bnb_config
        )
        self.model = None
        self.processor = Blip2Processor.from_pretrained(
            "Salesforce/blip2-opt-2.7b")
        self.loraConfig = loraConfig
        self.adapterList = []
        self.currentState = ''

    def compileModel(self)->None: # compile the model into qLora
        self.model = get_peft_model(self.base, self.loraConfig)
        self.model.print_trainable_parameters()

    def addAdapter(self,adapter:str): #pass in the adapter path to add
        self.adapterList.append(adapter)
        self.model.add_adapter(self.lora_config, adapter_name=adapter)

    def switchAdapter(self,adapterNum:int): 
        if adapterNum != 0: #left in for clarity
            try:
                if self.currentState != adapterNum:
                    self.model.set_adapter(self.adapterList[adapterNum])
                    self.currentState = adapterNum
                    print(f"switched to adapter {adapterNum}")
            except IndexError:
                print("index out of range, returning")
            
        else:
            self.model.disable_adapters()
            self.currentState = adapterNum
            print("adapters disabled")

    def forward(self,input_ids,pixel_values,modeltype = -1):
        if modeltype != -1 :
            self.switchAdapter(modeltype)
        
        return self.model(input_ids=input_ids,
                   pixel_values=pixel_values,
                   labels=input_ids)

Blip = BlipZSIC(bnb_config,config)

trainable params: 5,242,880 || all params: 3,749,922,816 || trainable%: 0.13981301102065136


In [46]:
from torch.utils.data import Dataset, DataLoader

class ImageCaptioningDataset(Dataset):
    def __init__(self, dataset, processor):
        self.dataset = dataset
        self.processor = processor

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, idx):
        item = self.dataset[idx]
        encoding = self.processor(images=item["image"], padding="max_length", return_tensors="pt")
        # remove batch dimension
        encoding = {k: v.squeeze() for k, v in encoding.items()}
        encoding["text"] = item["text"]
        return encoding

def collate_fn(batch):
    # pad the input_ids and attention_mask
    processed_batch = {}
    for key in batch[0].keys():+
        if key != "text":
            processed_batch[key] = torch.stack([example[key] for example in batch])
        else:
            text_inputs = Blip.processor.tokenizer(
                [example["text"] for example in batch], padding=True, return_tensors="pt"
            )
            processed_batch["input_ids"] = text_inputs["input_ids"]
            processed_batch["attention_mask"] = text_inputs["attention_mask"]
    return processed_batch

# Data and Dataloader setup


In [None]:
images_path = list(Path("/root/Datasets/Images").glob("*.jpg"))
labels = pd.read_csv("/root/Datasets/preprocess.csv")
images = [Image.open(str(images_path[0].parent / path)) for path in labels['image']]

dataset = datasets.Dataset.from_dict({"image": images, "text": labels['caption']})
dataset = dataset.train_test_split(test_size=0.1, seed=42)

train_dataset = ImageCaptioningDataset(dataset['train'], Blip.processor)
train_dataloader = DataLoader(train_dataset, shuffle=True, batch_size=4, collate_fn=collate_fn)

test_dataset = ImageCaptioningDataset(dataset['test'], Blip.processor)
test_dataloader = DataLoader(test_dataset, shuffle=True, batch_size=4, collate_fn=collate_fn)

# Training

In [None]:
import torch
from tqdm import tqdm
optimizer = torch.optim.Adam(Blip.model.parameters(), lr=5e-4)

device = "cuda" if torch.cuda.is_available() else "cpu"

Blip.model.train()

for epoch in range(10):
    training_loss = 0
    print("Epoch:", epoch)
    for idx, batch in tqdm(enumerate(train_dataloader)):
        input_ids = batch.pop("input_ids").to(device)
        pixel_values = batch.pop("pixel_values").to(device)
        outputs = Blip.forward(input_ids=input_ids,
                        pixel_values=pixel_values,
                        labels=input_ids)#put in a 4th value as an int to select adapters

        loss = outputs.loss
        training_loss += loss.item()
        loss.backward()
        optimizer.step()
        optimizer.zero_grad()
        
    print(training_loss / len(train_dataloader))

In [None]:
im = dataset['test'].select([290])['image']

In [None]:
pixel_values = processor(im, return_tensors="pt").to(device).pixel_values
model.eval()

outputs = model.generate(pixel_values=pixel_values)

In [None]:
generated_caption = processor.batch_decode(outputs, skip_special_tokens=True)[0]
print(generated_caption)