In [None]:
import utils
import data_utils
import similarity
import torch
import numpy as np
from PIL import Image
import open_clip
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd


In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Running on device: {device}")

In [None]:
clip_name = 'ViT-B/16'
target_name = 'resnet50'
target_layer = 'layer3'
d_probe = 'broden'
concept_set = 'data/10k.txt'
batch_size = 200
device = 'cuda'
pool_mode = 'avg'

save_dir = 'saved_activations'
similarity_fn = similarity.soft_wpmi


save_names = utils.get_save_names(clip_name = clip_name, target_name = target_name,
                                  target_layer = target_layer, d_probe = d_probe,
                                  concept_set = concept_set, pool_mode=pool_mode,
                                  save_dir = save_dir)

target_save_name, clip_save_name, text_save_name = save_names

similarities, target_feats = utils.get_similarity_from_activations(target_save_name, clip_save_name, 
                                                             text_save_name, similarity_fn, device=device)

with open(concept_set, 'r') as f: 
    words = (f.read()).split('\n')

pil_data = data_utils.get_data(d_probe)
top_vals, top_ids = torch.topk(target_feats, k=5, dim=0)
vals, idsx = torch.topk(similarities,k=1,largest=True)

image_sets = [[pil_data[j][0].resize([128,128]) for j in i] for i in np.array(top_ids).T]

clip_label = [words[int(i)] for i in idsx]

In [None]:
torch.cuda.empty_cache()

In [None]:
# Tokenize Text Corpus
with open('data/10k.txt', 'r') as f: 
    clip_set = (f.read()).split('\n')

model, _, preprocess = open_clip.create_model_and_transforms('ViT-B-32', pretrained='laion2b_s34b_b79k')
model = model.cuda()
tokenizer = open_clip.get_tokenizer('ViT-B-32')
text = tokenizer(clip_set).cuda()
with torch.no_grad(), torch.cuda.amp.autocast():
    text_features = model.encode_text(text)
    text_features /= text_features.norm(dim=-1, keepdim=True)

In [None]:
def pil_images_to_tensors(imageset):
    tensor_images = [preprocess(image).unsqueeze(0) for image in imageset]
    return torch.cat(tensor_images, dim=0).cuda()

In [None]:
torch.cuda.empty_cache()

## Open CLIP Top K results

In [None]:
def get_probs(imageset):
    tensor_images = pil_images_to_tensors(imageset)
    
    with torch.no_grad(), torch.cuda.amp.autocast():
        image_features = model.encode_image(tensor_images)
        image_features /= image_features.norm(dim=-1, keepdim=True)

    text_probs = (100.0 * image_features @ text_features.T).softmax(dim=-1)

    return text_probs

In [None]:
def pred_concept(imageset, word_count):
    text_probs = get_probs(imageset)

    values, indices = torch.topk(text_probs, k=word_count, largest=True)

    image_labels_pairs = [
        (imageset[i], [words[int(idx)] for idx in indices[i]])
        for i in range(len(imageset))
    ]

    return image_labels_pairs

In [None]:
def evaluate_topk(imageset,label,k):
    word_count = int(len(clip_set) * k)
    cur_list = []
    sum = 0
    for pair in pred_concept(imageset,word_count):
        if label in pair[1]:
            cur_list.append(True)
            sum+=1
        else:
            cur_list.append(False)
        
    return cur_list

In [None]:
matches = []
multiplier = 0.01
for i in range(len(image_sets)):
    matches.append(evaluate_topk(image_sets[i],clip_label[i],0.01))

In [None]:
matches

In [None]:
matches

nested_array_np = np.array(matches,dtype=int)
# Reshape the array to have Neuron # rows and 5 columns

reshaped_array = nested_array_np.reshape(1024, 5)

# Create a DataFrame from the reshaped array
df = pd.DataFrame(reshaped_array, columns=['Image1', 'Image2', 'Image3', 'Image4', 'Image5'])

# Display the DataFrame
df.to_csv('openclipcsvs/'+'topk/'+"_".join([clip_name,target_name,target_layer,d_probe,concept_set,str(multiplier)]).replace('.','').replace('/','') + '.csv')

In [None]:
def display_images_with_labels_topk(image_set, tensor_labels, multiplier):
    # Evaluate similarity for the entire set of images
    matching = evaluate_topk(image_set, tensor_labels, multiplier)

    # Convert threshold_exceed tensor to a boolean array
    threshold_exceed_bool = threshold_exceed.cpu().numpy().astype(bool)

    # Display the images and labels
    print(tensor_labels)
    fig, axs = plt.subplots(1, len(image_set), figsize=(15, 5))
    for j in range(len(image_set)):
        axs[j].imshow(image_set[j])
        axs[j].axis('off')
        axs[j].set_title(f'Label: {"Yes" if threshold_exceed_bool[j] else "No"}', color='green' if threshold_exceed_bool[j] else 'red')

    plt.show()



for i in range(len(image_sets)):
    display_images_with_labels(image_sets[i], clip_label[i], 0.25)

## Embedding Similarity Score Thresholding

In [None]:
def embedding_sim(imageset, word):
    with torch.no_grad(), torch.cuda.amp.autocast():
        encoded_word = model.encode_text(tokenizer(word).cuda())
        encoded_word = encoded_word.float()  # Convert to Float
        encoded_word /= encoded_word.norm(dim=-1, keepdim=True)
        tensor_images = pil_images_to_tensors(imageset)
    
        image_features = model.encode_image(tensor_images)
        image_features = image_features.float()  # Convert to Float
        image_features /= image_features.norm(dim=-1, keepdim=True)

    # Calculate embedding scores using vectorized operations
    embedding_scores = torch.matmul(image_features, encoded_word.T)

    return embedding_scores

def evaluate_sim(imageset, label, threshold):
    embedding_scores = embedding_sim(imageset, label)
    threshold_exceed = (embedding_scores >= threshold).to(torch.float32)
    return threshold_exceed

In [None]:
torch.cuda.empty_cache()

In [None]:
threshold=0.25
with torch.no_grad(), torch.cuda.amp.autocast():
    # Your main processing logic here
    sim_matches = []
    for i in range(len(image_sets)):
        sim_matches.append(evaluate_sim(image_sets[i], clip_label[i], threshold))
        torch.cuda.empty_cache()



In [None]:
nested_array_np.shape

In [None]:
# Save to csv
nested_array_np = np.array([np.array(i.cpu()) for i in sim_matches],dtype=int)
# Reshape the array to have Neuron # rows and 5 columns
squeezed_array = np.squeeze(nested_array_np, axis=-1)
reshaped_array = nested_array_np.reshape(1024, 5)

# Create a DataFrame from the reshaped array
df = pd.DataFrame(reshaped_array, columns=['Image1', 'Image2', 'Image3', 'Image4', 'Image5'])

# Display the DataFrame
df.to_csv('openclipcsvs/'+'thresholding/'+"_".join([clip_name,target_name,target_layer,d_probe,concept_set,str(threshold)]).replace('.','').replace('/','') + '.csv')

In [None]:

# Assuming that image_sets is a list of lists containing PIL images
# and evaluate_sim returns a list of tensors containing yes or no values

def display_images_with_labels(image_set, tensor_labels, threshold):
    # Evaluate similarity for the entire set of images
    threshold_exceed = evaluate_sim(image_set, tensor_labels, threshold)

    # Convert threshold_exceed tensor to a boolean array
    threshold_exceed_bool = threshold_exceed.cpu().numpy().astype(bool)

    # Display the images and labels
    print(tensor_labels)
    fig, axs = plt.subplots(1, len(image_set), figsize=(15, 5))
    for j in range(len(image_set)):
        axs[j].imshow(image_set[j])
        axs[j].axis('off')
        axs[j].set_title(f'Label: {"Yes" if threshold_exceed_bool[j] else "No"}', color='green' if threshold_exceed_bool[j] else 'red')

    plt.show()



for i in range(len(image_sets)):
    display_images_with_labels(image_sets[i], clip_label[i], 0.25)



In [None]:
for i in range(len(image_sets)):
    display_images_with_labels(image_sets[i], tensor_labels[i], threshold)

In [None]:
def show_img_label(idx,k):
    
    obj = pred_concept(image_sets[idx],k)
    display(obj[0][0])
    display(obj[0][1])
    
    display(obj[1][0])
    display(obj[1][1])
    
    display(obj[2][0])
    display(obj[2][1])
    
    display(obj[3][0])
    display(obj[3][1])

    return (obj[0][1],obj[1][1],obj[2][1],obj[3][1])

In [None]:
show_img_label(10,10)