In [1]:
import time
from pathlib import Path
import numpy as np
import openvino as ov
from transformers import BlipProcessor

In [2]:
from typing import Optional, Union

import torch
from torch import nn
from torch.nn import CrossEntropyLoss, BCEWithLogitsLoss
from transformers import CLIPForImageClassification, Trainer, TrainingArguments
from transformers import BlipConfig, BlipModel, BlipVisionModel, BlipPreTrainedModel
from transformers.modeling_outputs import ImageClassifierOutput


# Класс наподобие CLIPForImageClassification
class BlipForImageClassification(BlipPreTrainedModel):
    """
    Классификатор с BLIP Vision Encoder.
    """

    def __init__(self, config: BlipConfig) -> None:
        super().__init__(config)
        self.num_labels = config.num_labels
        vision_model = BlipVisionModel._from_config(
            config.vision_config, attn_implementation=config._attn_implementation
        )
        self.vision_model = vision_model

        # Classifier head
        self.classifier = (
            nn.Linear(config.vision_config.hidden_size, config.num_labels) if config.num_labels > 0 else nn.Identity()
        )

        # Initialize weights and apply final processing
        self.post_init()

    def forward(
        self,
        pixel_values: Optional[torch.Tensor] = None,
        labels: Optional[torch.Tensor] = None,
        output_attentions: Optional[bool] = None,
        output_hidden_states: Optional[bool] = None,
        return_dict: Optional[bool] = None,
    ) -> Union[tuple, ImageClassifierOutput]:
        r"""
        labels (`torch.LongTensor` of shape `(batch_size,)`, *optional*):
            Labels for computing the image classification/regression loss. Indices should be in `[0, ...,
            config.num_labels - 1]`. If `config.num_labels == 1` a regression loss is computed (Mean-Square loss), If
            `config.num_labels > 1` a classification loss is computed (Cross-Entropy).
        """
        output_attentions = output_attentions if output_attentions is not None else self.config.output_attentions
        output_hidden_states = (
            output_hidden_states if output_hidden_states is not None else self.config.output_hidden_states
        )
        return_dict = return_dict if return_dict is not None else self.config.use_return_dict

        outputs = self.vision_model(
            pixel_values,
            output_attentions=output_attentions,
            output_hidden_states=output_hidden_states,
            return_dict=return_dict,
        )

        sequence_output = outputs[0]

        # average pool the patch tokens
        sequence_output = torch.mean(sequence_output[:, 1:, :], dim=1)
        # apply classifier
        logits = self.classifier(sequence_output)

        loss = None
        if labels is not None:
            # move labels to correct device to enable model parallelism
            labels = labels.to(logits.device)
            if self.config.problem_type is None:
                if self.num_labels == 1:
                    self.config.problem_type = "regression"
                elif self.num_labels > 1 and (labels.dtype == torch.long or labels.dtype == torch.int):
                    self.config.problem_type = "single_label_classification"
                else:
                    self.config.problem_type = "multi_label_classification"

            if self.config.problem_type == "regression":
                loss_fct = MSELoss()
                if self.num_labels == 1:
                    loss = loss_fct(logits.squeeze(), labels.squeeze())
                else:
                    loss = loss_fct(logits, labels)
            elif self.config.problem_type == "single_label_classification":
                loss_fct = CrossEntropyLoss()
                loss = loss_fct(logits.view(-1, self.num_labels), labels.view(-1))
            elif self.config.problem_type == "multi_label_classification":
                loss_fct = BCEWithLogitsLoss()
                loss = loss_fct(logits, labels)

        if not return_dict:
            output = (logits,) + outputs[2:]
            return ((loss,) + output) if loss is not None else output

        return ImageClassifierOutput(
            loss=loss,
            logits=logits,
            hidden_states=outputs.hidden_states,
            attentions=outputs.attentions,
        )

In [3]:
core = ov.Core()

In [4]:
from typing import Tuple, List
from dataclasses import dataclass

from torch.utils.data import Dataset, DataLoader

@dataclass
class ImageDataset(Dataset):
    dataset: Dataset
    processor: BlipProcessor
    positive_labels:List[str]

    def __len__(self)->int:
        return len(self.dataset)
    
    def __getitem__(self, idx: int)->Tuple:
        image = self.dataset[idx]['image'].convert("RGB")
        inputs = self.processor(images=image, return_tensors="pt")
        inputs['pixel_values'] = inputs['pixel_values'].squeeze(0)
        label = self.dataset[idx]['label']
        label_string = self.dataset.features['label'].int2str(label)
        label = 1 if label_string in self.positive_labels else 0
        inputs['label'] = label
        return inputs

In [None]:
model_name = "Salesforce/blip-image-captioning-large"
model_weights = Path(r"..\data\models\blip-large-probe-2\checkpoint-16550")

processor = BlipProcessor.from_pretrained(model_name)
model = BlipForImageClassification.from_pretrained(model_weights)

In [6]:
from datasets import load_dataset
from pathlib import Path

print("Dataset loading...")
data_dir = Path('../data')
img_ds = load_dataset("imagefolder", data_dir=data_dir, split="train")
print(img_ds.features)
print("Convert dataset for specific model...")
positive_classes = list(filter(lambda name: name!='other', img_ds.features['label'].names))
ds = ImageDataset(img_ds, processor, positive_classes)
batch_loader = torch.utils.data.DataLoader(ds, batch_size=4)
print("Done")

Dataset loading...


Resolving data files:   0%|          | 0/6602 [00:00<?, ?it/s]

{'image': Image(mode=None, decode=True, id=None), 'label': ClassLabel(names=['cigs', 'other', 'pipes', 'roll_cigs', 'smoking'], id=None)}
Convert dataset for specific model...
Done


## Image Classifier

In [7]:
import torch
from pathlib import Path
import openvino as ov

IMAGE_CLASSIFIER_OV = Path("../data/models/openvino/blip_image_classifier.xml")
image_classifier = model
image_classifier.eval()

# check that model works and save it outputs for reusage as text encoder input
# inputs = ds[0]
pixel_values = ds[0]['pixel_values'].unsqueeze(0)
with torch.no_grad():
    outputs = image_classifier(pixel_values)

# if openvino model does not exist, convert it to IR
if not IMAGE_CLASSIFIER_OV.exists():

    # export pytorch model to ov.Model
    with torch.no_grad():
        ov_image_classifier = ov.convert_model(image_classifier, example_input=pixel_values)
    # save model on disk for next usages
    ov.save_model(ov_image_classifier, IMAGE_CLASSIFIER_OV)
    print(f"Image Classifier model successfuly converted and saved to {IMAGE_CLASSIFIER_OV}")
else:
    print(f"Image Classifier will be loaded from {IMAGE_CLASSIFIER_OV}")

Image Classifier will be loaded from ..\..\data\models\openvino\blip_image_classifier.xml


In [8]:
ov_image_classifier = core.compile_model(IMAGE_CLASSIFIER_OV, "CPU")

In [9]:
batch_loader = torch.utils.data.DataLoader(ds, batch_size=1)
for batch in batch_loader:
    break

pixel_values = batch['pixel_values'].numpy()
start_time = time.time()
logits = ov_image_classifier(pixel_values)[ov_image_classifier.output(0)]
end_time = time.time()
print(end_time - start_time)

1.8078680038452148
