In [14]:
from typing import List
import torch
import clip
from PIL import Image
import numpy as np
from bfair.sensors.image.clip.optimization import get_tokens_pipeline

In [15]:
BATCH_SIZE = 64
device = "cuda" if torch.cuda.is_available() else "cpu"
model, preprocess = clip.load("ViT-B/32", device)
attributes = ["male", "female"]
attr_cls = "gender"
tokens_pipeline = [[value for value in attributes]]

In [16]:
def clip_sensor_call(item, attributes: List[str], attr_cls: str):
    """
    Calls a ClipBasedSensor execution.

    :param item: images list
    :param List[str] attributes: attribute class values
    :param str attr_cls: attribute class name
    :return: labels from attributed tokens
    """
    for tokens in tokens_pipeline:
        text = clip.tokenize(tokens).to(device)

    results = []
    i = 0
    for i in range(0, len(item), min(BATCH_SIZE, len(item) - i)):
        images = []
        for photo_addrs in item[i : min(i + BATCH_SIZE, len(item))]:
            img = Image.open(photo_addrs)
            img_preprocess = preprocess(img)
            img.close()
            images.append(img_preprocess)

        image_input = torch.tensor(np.stack(images)).to(device)
        with torch.no_grad():
            logits_per_image, _ = model(image_input, text)

            batch_probs = logits_per_image.softmax(dim=-1).cpu().numpy()

            attribute_probs = [[] for _ in range(len(batch_probs))]
            for k in range(len(batch_probs)):
                image_probs = batch_probs[k]
                for j in range(len(attributes)):
                    attribute_probs[k].append((attributes[j], image_probs[j]))

            attributed_tokens = []
            for h in range(i, min(i + BATCH_SIZE, len(item))):
                attributed_tokens.append(
                    (
                        "image_" + str(i + h % BATCH_SIZE),
                        attribute_probs[h % BATCH_SIZE],
                    )
                )

            results.append(attributed_tokens)

    flatten_results = []
    for batch in results:
        for result in batch:
            flatten_results.append(result)

    return flatten_results

    # for filter in self.filtering_pipeline:
    #     attributed_tokens = filter(flatten_results)

    # ### changing output
    # labels_from_attr_tokens = []
    # for _, labels_values_pair in attributed_tokens:
    #     labels_from_attr_tokens.append([labels for labels, _ in labels_values_pair])

    # return labels_from_attr_tokens

In [17]:
image_list = [
    "datasets/utkface/1_0_0_20161219140623097.jpg.chip.jpg",
    "datasets/utkface/115_1_1_20170112213257263.jpg.chip.jpg",
    "datasets/utkface/105_1_0_20170112213021902.jpg.chip.jpg",
]
clip_sensor_call(image_list, attributes, attr_cls)

[('image_0', [('male', 0.93397015), ('female', 0.06602984)]),
 ('image_1', [('male', 0.3417764), ('female', 0.65822357)]),
 ('image_2', [('male', 0.4526427), ('female', 0.54735726)])]

In [26]:
from sklearn.linear_model import LogisticRegression

# get labels for each image
image_labels = clip_sensor_call(image_list, attributes, attr_cls)

# prepare data
X = []
y = []
for i, (image_path, labels) in enumerate(zip(image_list[:-1], image_labels[:-1])):
    X.append([extended_labels[1] for extended_labels in labels[1]])
    y.append(attributes[int(image_path.split('_')[1])])

# train logistic regression model
X = np.array(X).reshape(len(X), -1)
y = np.array(y)
lr = LogisticRegression(random_state=0).fit(X, y)


array(['female'], dtype='<U6')

In [28]:

X = []
for labels in image_labels[-1:]:
    X.append([extended_labels[1] for extended_labels in labels[1]])
list(lr.predict(X))

['female']

In [None]:
if y_i == "":
            y.append(np.array([]))
        elif isinstance(y_i, str):
            y.append(np.array([y_i]))
        else:
            y.append(np.array(y_train.values[i]))