In [1]:
from ultralytics import YOLO
from transformers import AutoImageProcessor, AutoModelForZeroShotImageClassification, AutoTokenizer, ZeroShotImageClassificationPipeline, SiglipModel, SiglipProcessor
import torch
from PIL import Image, ImageDraw
import numpy as np
import orjson
from tqdm import tqdm
import os

In [2]:
yolo_model = YOLO("yolov9e_0.995_0.823_epoch65.pt")  # load a pretrained model (recommended for training)

In [3]:
clip_path = 'siglip/so400m_epoch15_aug_0.891'

In [2]:
# for FDSP trained model only - preprocess the weight
from safetensors.torch import load_file, save_file
model = load_file(clip_path + '/model.safetensors')
model = {k.replace('_orig_mod.', ''): v for k, v in model.items()}
save_file(model, clip_path + '/new_model.safetensors', metadata={'format': 'pt'})
# delete the original one and rename the new one manually

In [4]:
class PipelineWithoutPostprocess(ZeroShotImageClassificationPipeline):
    def postprocess(self, model_outputs):
        candidate_labels = model_outputs.pop("candidate_labels")
        logits = model_outputs["logits"][0]
        if self.framework == "pt" and self.model.config.model_type == "siglip":
            probs = torch.sigmoid(logits).squeeze(-1)
            scores = probs.tolist()
            if not isinstance(scores, list):
                scores = [scores]
        elif self.framework == "pt":
            # probs = logits.softmax(dim=-1).squeeze(-1)
            probs = logits.squeeze(-1)  # no softmax because only 1 target class at test time, softmax causes it to go 1.0 for all
            scores = probs.tolist()
            if not isinstance(scores, list):
                scores = [scores]
        else:
            raise ValueError(f"Unsupported framework: {self.framework}")

        result = [
            {"score": score, "label": candidate_label}
            for score, candidate_label in sorted(zip(scores, candidate_labels), key=lambda x: -x[0])
        ]
        return result


image_classifier = PipelineWithoutPostprocess(task="zero-shot-image-classification",
                                              model=AutoModelForZeroShotImageClassification.from_pretrained(clip_path),
                                              tokenizer=AutoTokenizer.from_pretrained(clip_path),
                                              image_processor=AutoImageProcessor.from_pretrained(clip_path),
                                              batch_size=4, device='cuda')

In [4]:
model = SiglipModel.from_pretrained(clip_path, torch_dtype=torch.float16).to('cuda')
processor = SiglipProcessor.from_pretrained(clip_path)
logit_scale_exp = model.logit_scale.exp()
logit_bias = model.logit_bias

In [5]:
image = Image.open("real-esrgan/green_rocket_x4v3.jpg")
image = np.asarray(image)  # input

In [6]:
image = torch.tensor(image, dtype=torch.float16, device='cuda').permute(2, 0, 1)

In [7]:
feats = processor(images=[image, image], text=['grey missile','red white and blue light aircraft','green and black missile','white and red helicopter'], padding=True, return_tensors='pt').to('cuda')
feats['pixel_values'] = feats['pixel_values'].type(torch.float16)

In [9]:
output = model(**feats)

In [10]:
similarity_score = output.logits_per_image

In [None]:
image_feat = model.vision_model(pixel_values=feats['pixel_values']).pooler_output
text_feat = model.text_model(input_ids=feats['input_ids']).pooler_output
image_feat /= image_feat.norm(dim=-1, keepdim=True)
text_feat /= text_feat.norm(dim=-1, keepdim=True)
similarity_score = image_feat @ text_feat.T * logit_scale_exp + logit_bias
similarity_score

In [11]:
similarity_score = similarity_score.tolist()

In [12]:
for i, caption in enumerate([image, image]):
    print(i, similarity_score[i])

0 [-13.25, -19.28125, -4.7578125, -20.96875]
1 [-13.25, -19.28125, -4.7578125, -20.96875]


In [None]:
# SigLIP NAViT
import os
import sys

from siglip_so400m_14_980_flash_attn2_navit.modeling_siglip import SiglipModel, SiglipVisionModel, SiglipTextModel
from siglip_so400m_14_980_flash_attn2_navit.image_processing_siglip import SiglipImageProcessor
from siglip_so400m_14_980_flash_attn2_navit.tokenization_siglip import SiglipTokenizer
from siglip_so400m_14_980_flash_attn2_navit.processing_siglip import SiglipProcessor

DEVICE = torch.device("cuda:0")
PATCH_SIZE = 14

pixel_attention_mask = [
    [
        [1] * 14 + [1] * 14  + [1] * 14,
        [1] * 14 + [1] * 14  + [1] * 14,
        [1] * 14 + [1] * 14  + [1] * 14,
        [1] * 14 + [1] * 14  + [1] * 14,
        [1] * 14 + [1] * 14  + [1] * 14,
        [1] * 14 + [1] * 14  + [1] * 14,
        [1] * 14 + [1] * 14  + [1] * 14,
        [1] * 14 + [1] * 14  + [1] * 14,
        [1] * 14 + [1] * 14  + [1] * 14,
        [1] * 14 + [1] * 14  + [1] * 14,
        [1] * 14 + [1] * 14  + [1] * 14,
        [1] * 14 + [1] * 14  + [1] * 14,
        [1] * 14 + [1] * 14  + [1] * 14,
        [1] * 14 + [1] * 14  + [1] * 14,

        [0] * 14 + [0] * 14  + [0] * 14,
        [0] * 14 + [0] * 14  + [0] * 14,
        [0] * 14 + [0] * 14  + [0] * 14,
        [0] * 14 + [0] * 14  + [0] * 14,
        [0] * 14 + [0] * 14  + [0] * 14,
        [0] * 14 + [0] * 14  + [0] * 14,
        [0] * 14 + [0] * 14  + [0] * 14,
        [0] * 14 + [0] * 14  + [0] * 14,
        [0] * 14 + [0] * 14  + [0] * 14,
        [0] * 14 + [0] * 14  + [0] * 14,
        [0] * 14 + [0] * 14  + [0] * 14,
        [0] * 14 + [0] * 14  + [0] * 14,
        [0] * 14 + [0] * 14  + [0] * 14,
        [0] * 14 + [0] * 14  + [0] * 14,
    ],
    [
        [1] * 14 + [1] * 14  + [0] * 14,
        [1] * 14 + [1] * 14  + [0] * 14,
        [1] * 14 + [1] * 14  + [0] * 14,
        [1] * 14 + [1] * 14  + [0] * 14,
        [1] * 14 + [1] * 14  + [0] * 14,
        [1] * 14 + [1] * 14  + [0] * 14,
        [1] * 14 + [1] * 14  + [0] * 14,
        [1] * 14 + [1] * 14  + [0] * 14,
        [1] * 14 + [1] * 14  + [0] * 14,
        [1] * 14 + [1] * 14  + [0] * 14,
        [1] * 14 + [1] * 14  + [0] * 14,
        [1] * 14 + [1] * 14  + [0] * 14,
        [1] * 14 + [1] * 14  + [0] * 14,
        [1] * 14 + [1] * 14  + [0] * 14,

        [1] * 14 + [1] * 14  + [0] * 14,
        [1] * 14 + [1] * 14  + [0] * 14,
        [1] * 14 + [1] * 14  + [0] * 14,
        [1] * 14 + [1] * 14  + [0] * 14,
        [1] * 14 + [1] * 14  + [0] * 14,
        [1] * 14 + [1] * 14  + [0] * 14,
        [1] * 14 + [1] * 14  + [0] * 14,
        [1] * 14 + [1] * 14  + [0] * 14,
        [1] * 14 + [1] * 14  + [0] * 14,
        [1] * 14 + [1] * 14  + [0] * 14,
        [1] * 14 + [1] * 14  + [0] * 14,
        [1] * 14 + [1] * 14  + [0] * 14,
        [1] * 14 + [1] * 14  + [0] * 14,
        [1] * 14 + [1] * 14  + [0] * 14,
    ],
]
pixel_attention_mask = torch.tensor(pixel_attention_mask, dtype=torch.bool, device=DEVICE)
patches_subgrid = pixel_attention_mask.unfold(
    dimension=1, size=PATCH_SIZE, step=PATCH_SIZE
).unfold(dimension=2, size=PATCH_SIZE, step=PATCH_SIZE)
patch_attention_mask = (patches_subgrid.sum(dim=(-1, -2)) > 0).bool()

from PIL import Image
import numpy as np
# model = SiglipModel.from_pretrained("siglip_so400m_14_384_flash_attn2_navit").to(DEVICE, dtype=torch.float16)
vision_model = SiglipVisionModel.from_pretrained("siglip_so400m_14_384_flash_attn2_navit", _flash_attn_2_enabled=False).to(DEVICE, dtype=torch.float16)
text_model = SiglipTextModel.from_pretrained("siglip_so400m_14_384_flash_attn2_navit", _flash_attn_2_enabled=False).to(DEVICE, dtype=torch.float16)
processor = SiglipProcessor.from_pretrained("siglip_so400m_14_384_flash_attn2_navit")

logit_scale_exp = torch.tensor([112.4375], device=DEVICE, dtype=torch.float16)
logit_bias = torch.tensor([-16.5469], device=DEVICE, dtype=torch.float16)

image = Image.open("../green rocket.jpg")
image = np.asarray(image)
image = torch.tensor(image, dtype=torch.float16, device=DEVICE).permute(2, 0, 1)
image.shape

feats = processor(images=[image, image], text=['grey missile','red white and blue light aircraft','green and black missile','white and red helicopter'], padding=True, return_tensors='pt')

feats['pixel_values'] = feats['pixel_values'].type(torch.float16).to(DEVICE)
feats['input_ids'] = feats['input_ids'].to(DEVICE)
image_feat = vision_model.vision_model(pixel_values=feats['pixel_values'])
text_feat = text_model.text_model(input_ids=feats['input_ids'])
image_feat = image_feat.pooler_output / image_feat.pooler_output.norm(dim=-1, keepdim=True)
text_feat = text_feat.pooler_output / text_feat.pooler_output.norm(dim=-1, keepdim=True)
similarity_score = image_feat @ text_feat.T * logit_scale_exp + logit_bias
similarity_score

In [None]:
# EVA_CLIP
import torch
import sys
eva_path = 'eva-2/EVA-CLIP/rei/'
sys.path.insert(0, 'eva-2/EVA-CLIP/rei/')
from eva_clip import create_model_and_transforms, get_tokenizer
from PIL import Image
model_name = "EVA02-CLIP-L-14-336" 
pretrained = eva_path + "EVA02_CLIP_L_336_psz14_s6B.pt" # or "/path/to/EVA02_CLIP_B_psz16_s8B.pt"

EVA, _, preprocess = create_model_and_transforms(model_name, pretrained, force_custom_clip=True, precision='fp16')
EVA_tokenizer = get_tokenizer(model_name)
EVA = EVA.to('cuda')

In [5]:
from basicsr.archs.rrdbnet_arch import RRDBNet
from realesrgan import RealESRGANer
from realesrgan.archs.srvgg_arch import SRVGGNetCompact

model_name = 'real-esrgan/realesr-general-x4v3'
rrdb_net = SRVGGNetCompact(num_in_ch=3, num_out_ch=3, num_feat=64, num_conv=32, upscale=4, act_type='prelu')
netscale = 4
ESRGAN = RealESRGANer(
    scale=netscale,
    model_path=model_name+ '.pth',
    model=rrdb_net,
    pre_pad=10,
    half=True)

In [6]:
with open('../../data/vlm.jsonl', 'r') as f:
    instances = [orjson.loads(line.strip()) for line in f if line.strip() != ""]
results = []
val_percent = 0.2
val_split = int(len(instances) * val_percent)
train, val = instances[:-val_split], instances[-val_split:]
bs = 4
batched_instances = [val[i:i + bs] for i in range(0, len(val), bs)]

In [None]:
for batch_instance in tqdm(batched_instances):
    images = [Image.open(os.path.join('../../data/images/', i['image'])) for i in batch_instance]
    
    # YOLO object det
    yolo_result = yolo_model.predict(images, imgsz=1600, conf=0.1, iou=0.1, max_det=10, verbose=False)  # max F1, try augment=True and adjusting iou
    yolo_result = [(r.boxes.xyxy.tolist(), r.boxes.conf.tolist()) for r in yolo_result]
    yolo_result = [tuple(zip(*r)) for r in yolo_result]  # list of tuple[box, conf] in each image in xyxy format
    
    # crop the boxes out
    cropped_boxes = []
    for im, boxes in zip(images, yolo_result):
        im_boxes = []
        for (x1, y1, x2, y2), _ in boxes:
            cropped = im.crop((x1, y1, x2, y2))
            cropped = np.asarray(cropped)
            if not any(s <= 10 for s in cropped.shape[:2]):
                cropped = ESRGAN.enhance(cropped, outscale=netscale)[0]
                # cropped = Image.fromarray(cropped)
            im_boxes.append(cropped)
        cropped_boxes.append(im_boxes)
    
    captions_list = [[anno['caption'] for anno in img['annotations']] for img in batch_instance]  # list of list of str, len is n_img == 4
    assert len(cropped_boxes) == len(captions_list)
    
    # CLIP inference
    clip_results = []
    with torch.no_grad():
        for boxes, captions in zip(cropped_boxes, captions_list):
            boxes = [torch.tensor(box, dtype=torch.float16).permute(2, 0, 1) for box in boxes]  # load onto CPU for now
            feats = processor(images=boxes, text=captions, padding=True, return_tensors='pt').to('cuda')  # transfer all image tensors at once to reduce mem R/W
            feats['pixel_values'] = feats['pixel_values'].type(torch.float16)
            image_feat = model.vision_model(pixel_values=feats['pixel_values']).pooler_output
            text_feat = model.text_model(input_ids=feats['input_ids']).pooler_output
            image_feat /= image_feat.norm(dim=-1, keepdim=True)
            text_feat /= text_feat.norm(dim=-1, keepdim=True)
            similarity_score = (text_feat @ image_feat.T * logit_scale_exp + logit_bias).tolist()
            clip_results.append(similarity_score)
            
            # r = image_classifier(boxes, candidate_labels=captions)  # for HF pipeline
            
            # BEGIN SIGLIP NAVIT
            # boxes = [torch.tensor(np.asarray(box), dtype=torch.float16, device=DEVICE).permute(2, 0, 1) for box in boxes]
            # feats = processor(images=boxes, text=captions, padding=True, return_tensors='pt')
            # feats['pixel_values'] = feats['pixel_values'].type(torch.float16).to(DEVICE)
            # feats['input_ids'] = feats['input_ids'].to(DEVICE)
            # 
            # image_feat = vision_model.vision_model(pixel_values=feats['pixel_values'], patch_attention_mask=patch_attention_mask)
            # text_feat = text_model.text_model(input_ids=feats['input_ids'])
            # image_feat = image_feat.pooler_output / image_feat.pooler_output.norm(dim=-1, keepdim=True)
            # text_feat = text_feat.pooler_output / text_feat.pooler_output.norm(dim=-1, keepdim=True)
            # similarity_score = image_feat @ text_feat.T * logit_scale_exp + logit_bias
            # 
            # r = []
            # for image, score in zip(images, similarity_score):
            #     image_scores = [{'label': caption, 'score': score.item()} for caption, score in zip(captions, score)]
            #     r.append(image_scores)
            # END SIGLIP NAVIT
            
            
            # BEGIN EVA CLIP, outputs same format as HF pipeline
            # image_batched = [preprocess(im) for im in boxes]
            # image_batched = torch.stack(image_batched).to('cuda')
            # tokenized_captions = EVA_tokenizer(captions).to('cuda')
            # 
            # image_features = EVA.encode_image(image_batched)
            # text_features = EVA.encode_text(tokenized_captions)
            # image_features /= image_features.norm(dim=-1, keepdim=True)
            # text_features /= text_features.norm(dim=-1, keepdim=True)
            # similarity_score = (100.0 * image_features @ text_features.T)
            # r = []
            # for image, score in zip(images, similarity_score):
            #     image_scores = [{'label': caption, 'score': score.item()} for caption, score in zip(captions, score)]
            #     r.append(image_scores)
            # END EVA CLIP
            
            # image_to_text_scores = {caption: [] for caption in captions}  # {caption: [score1, score2, ...]}, scores in sequence of bbox
            # for box in r:
            #     for label_score in box:
            #         image_to_text_scores[label_score['label']].append(label_score['score'])
            # clip_results.append(image_to_text_scores)

    # combine the results
    visualize = False
    for im, cropped_box_PIL, yolo_box, captions, similarity_scores, instance in zip(images, cropped_boxes, yolo_result, captions_list, clip_results, batch_instance):
        if visualize: im_cp = im.copy()
        result_for_im = {}
        for caption, caption_scores in zip(captions, similarity_scores):
            box_idx = np.argmax(caption_scores)
            highest_caption_score = max(caption_scores)
            box = cropped_box_PIL[box_idx]
            result_for_im[caption] = yolo_box[box_idx][0]  # dict[caption] = xyxy in list
            if visualize:
                draw = ImageDraw.Draw(im_cp)  # noqa
                (x1, y1, x2, y2), box_conf = yolo_box[box_idx]
                draw.rectangle(xy=((x1, y1), (x2, y2)), outline='red')
                draw.text((x1, y1), text=f'{caption} {box_conf:.2f} {highest_caption_score:.2f}', fill='red')
        if visualize: im_cp.show()
        results.append({'image': instance['image'], 'annotations': [{'bbox': v, 'caption': k} for k, v in result_for_im.items()]})
        # save every image in case of crash
        with open('evals/yolov9e-1600-epoch65-conf0.1-so400m_epoch15_aug_0.891.json', 'wb+') as f:
            f.write(orjson.dumps(results))

In [7]:
# plot bbox
for im, boxes in zip(ims, yolo_result):
    im = im.copy()
    draw = ImageDraw.Draw(im)
    for (x1, y1, x2, y2), conf in boxes:
        draw.rectangle(xy=((x1, y1), (x2, y2)), outline='red')
        draw.text((x1, y1), text=f'{conf:.2f}', fill='red')
    im.show()

In [59]:
visualize = False
for im, cropped_box_PIL, yolo_box, similarity_scores in zip(ims, cropped_boxes, yolo_result, clip_results):
    if visualize: im_cp = im.copy()
    result_for_im = {}
    for caption, caption_scores in similarity_scores.items():
        box_idx = np.argmax(caption_scores)
        highest_caption_score = max(caption_scores)
        box = cropped_box_PIL[box_idx]
        result_for_im[caption] = yolo_box[box_idx][0]  # dict[caption] = (xyxy in list, conf)
        if visualize:
            draw = ImageDraw.Draw(im_cp)
            (x1, y1, x2, y2), box_conf = yolo_box[box_idx]
            draw.rectangle(xy=((x1, y1), (x2, y2)), outline='red')
            draw.text((x1, y1), text=f'{caption} {box_conf:.2f} {highest_caption_score:.2f}', fill='red')
    if visualize: im_cp.show()
    results.append(result_for_im)

In [60]:
results

[{'grey missile': [705.0738525390625,
   506.7243347167969,
   782.65283203125,
   563.574951171875],
  'red, white, and blue light aircraft': [1030.6815185546875,
   77.49951934814453,
   1056.74853515625,
   110.44055938720703],
  'green and black missile': [705.0738525390625,
   506.7243347167969,
   782.65283203125,
   563.574951171875],
  'white and red helicopter': [527.7639770507812,
   118.3411865234375,
   624.7859497070312,
   161.6909637451172]},
 {'grey camouflage fighter jet': [400.4502868652344,
   158.0403289794922,
   455.9124450683594,
   193.24575805664062],
  'grey and white fighter plane': [1117.64501953125,
   514.673828125,
   1254.2855224609375,
   553.1058959960938],
  'white and black drone': [356.56414794921875,
   455.2095031738281,
   402.8783264160156,
   486.3287353515625],
  'white and black fighter jet': [400.4502868652344,
   158.0403289794922,
   455.9124450683594,
   193.24575805664062],
  'white missile': [400.4502868652344,
   158.0403289794922,
   