In [5]:
import numpy as np
import torch
from pkg_resources import packaging
print("Torch version:", torch.__version__)
import os
import numpy as np
import torch
from torchvision import transforms, utils
from torchvision.transforms import Resize, CenterCrop, Normalize
from torchvision.transforms.functional import InterpolationMode
from torch.utils.data import DataLoader
from barbar import Bar
from parse_image import *
import pandas as pd
from PIL import Image, UnidentifiedImageError
import open_clip
GPU = 4
data_path = "/path/to/csv.csv" #ComVG or SVO_Probes csv

Torch version: 2.0.1+cu117


In [None]:
### Replace with other model version
model, _, preprocess = open_clip.create_model_and_transforms('ViT-L-14', pretrained='openai')
tokenizer = open_clip.get_tokenizer('ViT-L-14')
model.cuda(GPU).eval()

In [None]:
data = pd.read_csv(data_path)
data.head(2)

## ComOpenCLIP

In [7]:
def subimage_score_embedding(image, text):
    if text:
        image = preprocess(image)
        text_input = tokenizer([text]).cuda(GPU)
        image_input = torch.tensor(np.stack([image])).cuda(GPU)
        with torch.no_grad():
            image_embed = model.encode_image(image_input).float()
            text_embed = model.encode_text(text_input).float()
        score = text_embed @ image_embed.T
        return image_embed, score
    else:
        return None, None

In [9]:
def comclip_one_pair(row, caption, image_id):
    image = preprocess(read_image(image_id))
    text_input = tokenizer(row.sentence).cuda(GPU)
    image_input = torch.tensor(np.stack([image])).cuda(GPU)
    with torch.no_grad():
        original_image_embed = model.encode_image(image_input).float()
        original_text_embed = model.encode_text(text_input).float()

    svo = row.pos_triplet.split(",")
    subj, verb, obj = svo[0], svo[1], svo[-1]
    object_images, matched_json = create_sub_image_obj(row.sentence_id, image_id)
    relation_images, relation_words = create_relation_object(object_images, subj, verb, obj, image_id, matched_json)
    if relation_images and relation_words:
        for relation_image, word in zip(relation_images, relation_words):
            object_images[word] = relation_image

    image_embeds = []
    image_scores = []
    for key, sub_image in object_images.items():
        image_embed, image_score = subimage_score_embedding(sub_image, key)
        if image_embed is not None and image_score is not None:
            image_embeds.append(image_embed)
            image_scores.append(image_score)
    #regularize the scores
    similarity = normalize_tensor_list(image_scores)
    for score, image in zip(similarity, image_embeds):
        original_image_embed += score * image
    image_features = original_image_embed / original_image_embed.norm(dim=-1, keepdim=True).float()
    text_features = original_text_embed /original_text_embed.norm(dim=-1, keepdim=True).float()
    similarity = text_features.detach().cpu().numpy() @ image_features.detach().cpu().numpy().T
    return similarity, object_images, relation_images

In [10]:
def compute_one_row(idx, row):
    result_pos = comclip_one_pair(row, row.sentence, row.pos_image_id)[0].item()
    result_neg = comclip_one_pair(row, row.sentence, row.neg_image_id)[0].item()
    result = {"id" : idx, "pos_score": result_pos, "neg_score": result_neg}
    return result 

In [None]:
comclip_score = []
for idx, row in data.iterrows():
    score = compute_one_row(idx, row)
    comclip_score.append(score)

### OpenCLIP baseline

In [None]:
matched_id = [] ## if cosine similarity to positive image larger: 0, if not: 1
for idx, row in data.iterrows():
    try:
        text = row.sentence
        image_pos = preprocess(Image.open(os.path.join(skimage.data_dir, image_path+str(row.pos_image_id) + ".jpg")).convert("RGB"))
        image_neg = preprocess(Image.open(os.path.join(skimage.data_dir, image_path+str(row.neg_image_id) + ".jpg")).convert("RGB"))
        images = [image_pos, image_neg]
        image_input = torch.tensor(np.stack(images)).cuda(5)
        text_tokens = tokenizer([text]).cuda(5)
        with torch.no_grad():
            image_features = model.encode_image(image_input).float()
            text_features = model.encode_text(text_tokens).float()
        image_features /= image_features.norm(dim=-1, keepdim=True)
        text_features /= text_features.norm(dim=-1, keepdim=True)
        similarity =text_features.cpu().numpy() @ image_features.cpu().numpy().T
        matched_id.append(np.argmax(similarity))
    except UnidentifiedImageError:
        matched_id.append("image_failed")
    