In [None]:
import os
import os.path as osp
import pandas as pd
import numpy as np
from transformers import CLIPProcessor, CLIPModel
from tqdm.auto import tqdm
from itertools import product


tqdm.pandas()

In [None]:
clip_path = osp.abspath('../analyzing_annotations/clip_model')
model_id = 'openai/clip-vit-base-patch32'

model = CLIPModel.from_pretrained(model_id, cache_dir=clip_path)
print('model: success')
processor = CLIPProcessor.from_pretrained(model_id, cache_dir=clip_path)
print('processor: success')

In [None]:
def get_text_features(s, clip_model=model, clip_processor=processor, return_numpy=True):
    max_length = clip_processor.tokenizer.max_model_input_sizes[clip_processor.tokenizer.name_or_path]
    inputs = clip_processor(text=s, return_tensors='pt', truncation=True, max_length=max_length)
    outputs = clip_model.get_text_features(**inputs).squeeze()
    return outputs.detach().numpy() if return_numpy else outputs

def get_mean_text_features(responses):
    if type(responses) == str:
        responses = eval(responses)
    features = list(map(get_text_features, responses))
    mean_features = np.vstack(features).mean(axis=0)
    return mean_features

In [None]:
twostep_predictions = True
fewshot_predictions = False

processed_dir = osp.abspath('../predicted_data/processed')
input_file = osp.join(processed_dir, f'processed_predictions{"_twostep" if twostep_predictions else ""}{"_fewshot" if fewshot_predictions else ""}.csv')

output_dir = osp.join(processed_dir, 'clip_encodings')
if not osp.isdir(output_dir):
    print(f'make directory {output_dir}')
    os.makedirs(output_dir)

pred_df = pd.read_csv(input_file, index_col=0)

response_cols = [c for c in pred_df.columns if c.startswith('response_')]
models = [c.replace('response_', '') for c in response_cols]

tangrams = sorted(pd.unique(pred_df.tangram))
scenes = sorted(pd.unique(pred_df.scene))

In [None]:
pred_df.head()

In [None]:
idx_array = pred_df.item_identifyer.values

results = dict()
cols = ['response', 'label']

for model, rl in tqdm(list(product(models, cols))):
    col=f'{rl}_{model}'
    embeds = pred_df[col].map(get_text_features)
    results[col] = np.stack(embeds.to_list())

In [None]:
outfile = osp.join(output_dir, osp.splitext(input_file)[0] + '_embeddings.npz')
print(outfile)
np.savez(outfile, text_idx=idx_array, **results)