Notebook for experiments on Prompt engineering using CLIP or ALIGN

# Choose settings

##### Choose your settings here

In [66]:
DOG_OR_CAT = "cat" #can also be "dog"
TYPE_EXTERNAL_POLLUTION = "cross-label" #other option: "imagenet_only". With cross-label: 1% pollution, 1/2 of that cross-label, other 1/2 imagenet
MODEL="CLIP"

# choose dataset
DATASET_NAME = 'cats-vs-dogs-large'  # needs to match folder name in FM/datasets
LOAD_AND_EMBED_DATASET_IN_BATCHES = True  # True for large datasets, False for small ones
USE_CACHED_EMBEDDINGS = f'{MODEL}_cats-vs-dogs-large.pkl'  # '' for loading the dataset normally, 'CREATE__{x}.pkl' for creating the cache file {x}.pkl, '{x}.pkl' for loading the cache file {x}.pkl
MISLABELED_INSTANCES = 'mislabeled_instances_cats-vs-dogs.pkl'  # if not '', but e.g. 'mislabeled_instances_cats-vs-dogs.pkl', the pickle file specifies which files to drop from the loaded embeddings



# choose how many image-label mismatches to insert
MISMATCH_PORTION = 0.01  # percentage of mismatching image-label pairs added
MANIPULATION_TYPES = [0.5, 0.5, 0.0, 0.0]  # how much of the MISMATCH_PORTION to produce by [exchanging images between classes, inserting images from other datasets, inserting randomly generated images, inserting placeholder images]
IMAGENET_EMBEDDINGS = f'{MODEL}_imagenet-subset.pkl'  # specify if MANIPULATION_TYPES[1] > 0


##### This part is calculated automatically

In [2]:
datasets_path = '/content/drive/My Drive/FM/datasets/'
dataset_path = datasets_path + DATASET_NAME + '/'

if DATASET_NAME == 'cats-vs-dogs-large' or DATASET_NAME == 'train-small':
  LABELS = ['cat', 'dog']
elif DATASET_NAME == 'jellyfish-classification':
  LABELS = ['barrel jellyfish', 'compass jellyfish', 'lions mane jellyfish', 'moon jellyfish']
elif DATASET_NAME == 'traffic-signs':
  LABELS = ['30 kilometers per hour speed limit traffic sign', '80 kilometers per hour speed limit traffic sign', '100 kilometers per hour speed limit traffic sign', 'give way traffic sign', 'no entry traffic sign', 'no overtaking traffic sign', 'priority over oncoming traffic sign', 'stop sign']
else:
  raise ValueError('Invalid dataset selected or labels not set!')

assert 0 <= MISMATCH_PORTION <= 1, f'MISMATCH_PORTION must be in [0, 1] but is {MISMATCH_PORTION}'
assert len(MANIPULATION_TYPES) == 4 and sum(MANIPULATION_TYPES) == 1, f'MANIPULATION_TYPES must contain 4 entries that sum up to 1.0 but is {MANIPULATION_TYPES}'
assert MANIPULATION_TYPES[0] + MANIPULATION_TYPES[1] == 1, 'At the moment, only interclass corruption and imagenet corruption are implemented!'

# Load libraries

In [3]:
# hacky way when hitting "run all" that libraries are not reloaded
try:
  torch.tensor([[0]])
  libraries_already_loaded = True
except:
  libraries_already_loaded = False

In [4]:
if not libraries_already_loaded:
  ! pip install ftfy regex tqdm
  ! pip install git+https://github.com/openai/CLIP.git

Collecting ftfy
  Downloading ftfy-6.1.3-py3-none-any.whl (53 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m53.4/53.4 kB[0m [31m927.4 kB/s[0m eta [36m0:00:00[0m
Installing collected packages: ftfy
Successfully installed ftfy-6.1.3
Collecting git+https://github.com/openai/CLIP.git
  Cloning https://github.com/openai/CLIP.git to /tmp/pip-req-build-3qh4vvyl
  Running command git clone --filter=blob:none --quiet https://github.com/openai/CLIP.git /tmp/pip-req-build-3qh4vvyl
  Resolved https://github.com/openai/CLIP.git to commit a1d071733d7111c9c014f024669f959182114e33
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: clip
  Building wheel for clip (setup.py) ... [?25l[?25hdone
  Created wheel for clip: filename=clip-1.0-py3-none-any.whl size=1369497 sha256=e758d612815ed9fe108e8302b97b3327e437513daf486f67db1edd201be69ec0
  Stored in directory: /tmp/pip-ephem-wheel-cache-_gzr0gx6/wheels/da/2b/4c/d6691fa9597aac8bb85d2ac1

In [14]:
import torch
from torchvision import transforms
import clip
from transformers import AutoImageProcessor, ViTModel, AlignProcessor, AlignModel, AutoTokenizer
from transformers.tokenization_utils_base import BatchEncoding
import numpy as np
from matplotlib import pyplot as plt
from PIL import Image
from pkg_resources import packaging
import os
from google.colab import drive
from sklearn.metrics import f1_score, precision_score, recall_score, accuracy_score, confusion_matrix
import glob
import pickle
from scipy.spatial.distance import cosine

print("Torch version:", torch.__version__)

Torch version: 2.1.0+cu121


In [6]:
def calculate_column_average(matrix):
    """
    Calculates the average for each column in a PyTorch matrix.

    Parameters:
    - matrix: torch.Tensor
        The input matrix for which the column averages are to be calculated.

    Returns:
    - torch.Tensor
        A tensor containing the average value for each column of the input matrix.

    Raises:
    - TypeError:
        Raises an error if the input is not a PyTorch tensor.
    - ValueError:
        Raises an error if the input tensor is empty or has zero columns.
    """

    # Checking if the input is a PyTorch tensor
    if not isinstance(matrix, torch.Tensor):
        raise TypeError("Input should be a PyTorch tensor.")

    # Checking if the input tensor is empty or has zero columns
    if matrix.numel() == 0 or matrix.size(1) == 0:
        raise ValueError("Input tensor is empty or has zero columns.")

    # Calculating the column averages but taking out the entry for the same vector in the matrix (diagonal entry)
    column_sums = torch.sum(matrix, dim=0)
    column_counts = torch.tensor([matrix.size(0)] * (matrix.size(1)), dtype=torch.float32)
    column_averages = (column_sums -1) / (column_counts - 1)

    return column_averages

def cosine_similarity_matrix(embeddings_tensor):
    """
    Function to compute a similarity matrix using dot product

    Parameters:
    - embeddings: list of torch.Tensor
        List of embeddings of images produced with CLIP.

    Returns:
    - similarity_matrix: numpy.ndarray
        2D numpy array representing the similarity matrix between the embeddings.
        Each element (i, j) in the matrix represents the similarity between embeddings[i] and embeddings[j].
        The similarity score between the two embeddings is calculated using cos similarity. The score ranges from 0 to 1,
        where 0 indicates completely dissimilar embeddings and 1 indicates identical embeddings.
    """

    """
    # Calculating the cosine distance between the two embeddings
    distance = cosine(embedding1, embedding2)

    # Converting the distance to similarity score
    similarity = 1 - distance

    return similarity
    """

    # Normalizing the embeddings
    embeddings_tensor = torch.nn.functional.normalize(embeddings_tensor, dim=1)

    # Computing the similarity matrix using dot product
    similarity_matrix = torch.matmul(embeddings_tensor, embeddings_tensor.T)

    return similarity_matrix

from sklearn.metrics.pairwise import cosine_similarity as cosine_similarity_sklearn
def cosine_similarity(x, y=None):
  if y is None:
    return cosine_similarity_matrix(x)
  return cosine_similarity_sklearn(x, y)

# Mounting storage

In [7]:
drive.mount('/content/drive')
!ls "{datasets_path}"

Mounted at /content/drive
AlexNet_cats-vs-dogs-large.pkl			 mislabeled_instances_cats-vs-dogs.pkl
AlexNet_imagenet-subset.pkl			 note.txt
AlexNet_traffic-signs.pkl			 text_dog_embeddings.pkl
ALIGN_cats-vs-dogs-large.pkl			 text_embeddings_correct_align_cat.pkl
ALIGN_imagenet-subset.pkl			 text_embeddings_correct_align_dog.pkl
ALIGN_traffic-signs.pkl				 text_embeddings_correct_clip_cat.pkl
cats-dogs-big_ids.pkl				 text_embeddings_correct_clip_dog.pkl
cats-dogs-big.pkl				 text_embeddings_random_align_cat.pkl
cats-vs-dogs-large				 text_embeddings_random_align_dog.pkl
CLIP_cats-vs-dogs-large.pkl			 text_embeddings_random_clip_cat.pkl
CLIP_imagenet-subset.pkl			 text_embeddings_random_clip_dog.pkl
CLIP_traffic-signs.pkl				 text_random_embeddings.pkl
dog_wrong_2_12.txt				 traffic-signs
dog_wrong.txt					 train-small
image_embeddings__cats-vs-dogs.pkl		 ViT-CLS_cats-vs-dogs-large.pkl
image_embeddings__traffic-signs.pkl		 ViT-CLS_imagenet-subset.pkl
imagenet_one-of-each-class-except-cats

# Define dataset loader

In [None]:
def load_dataset(folder_path, labels):

    # Checking if the provided folder path exists
    if not os.path.exists(folder_path):
        raise ValueError("Folder path does not exist.")

    images = {}
    for label in labels:
      images[label] = []

    # Looping through all files in the folder
    for i, filename in enumerate(glob.glob(folder_path + '**/*', recursive=True)):

      if i % 1000 == 0:
        print(i, 'files loaded')

      try:
        img = Image.open(filename).convert('RGB')
      except:
        continue

      label_found = False
      for label in labels:
        if label in '/'.join(filename.split('/')[-2:]):
          if label_found:
            raise ValueError(f"Label of {filename} is ambiguous.")
          label_found = True
          images[label].append(img)

      if not label_found:
        raise ValueError(f"No label for {filename} found.")

    print(i+1, 'files loaded')

    return images

def get_embeddings_dict_batchwise(folder_path, labels, model, preprocess, batch_size=64):

  # Checking if the provided folder path exists
  if not os.path.exists(folder_path):
    raise ValueError("Folder path does not exist.")

  image_embeddings = {}
  for label in labels:
    image_embeddings[label] = []

  images = {}
  for label in labels:
    images[label] = []

  # Looping through all files in the folder
  all_files = glob.glob(folder_path + '**/*', recursive=True)
  for n_instances_processed, filename in enumerate(all_files):

    try:
      img = Image.open(filename).convert('RGB')
    except:
      continue

    # Find label of the image
    label_found = False
    for label in labels:
      if label in '/'.join(filename.split('/')[-2:]):
        if label_found:
          raise ValueError(f"Label of {filename} is ambiguous.")
        label_found = True
        images[label].append(img)
    if not label_found:
      raise ValueError(f"No label for {filename} found.")

    # Get embeddings if already a batch is full
    if n_instances_processed % batch_size == 0 and n_instances_processed > 0 or n_instances_processed == len(all_files) - 1:
      for label in labels:
        if len(images[label]) == 0:
          continue
        with torch.no_grad():
          processed_images = torch.cat(([preprocess(img).unsqueeze(0) for img in images[label]]))
          image_embeddings[label].append(model.encode_image(processed_images.to(device)))
          del processed_images
      images = {}
      for label in labels:
        images[label] = []
      print(n_instances_processed, 'loaded and encoded')

  # Convert list of embeddings to tensor
  for label in labels:
    image_embeddings[label] = torch.cat((image_embeddings[label]))

  return image_embeddings

# Load and embed dataset

In [58]:
if USE_CACHED_EMBEDDINGS != '' and USE_CACHED_EMBEDDINGS.split('__')[0] != 'CREATE':

  # load embeddings of previous execution from pickle file
  pickle_file = datasets_path + USE_CACHED_EMBEDDINGS
  with open(pickle_file, 'rb') as f:
    image_embeddings = pickle.load(f)

  print('Embeddings loaded from', pickle_file)

else:

  if LOAD_AND_EMBED_DATASET_IN_BATCHES:

    # load and embed images in batches (to save GPU memory and especially RAM)
    image_embeddings = get_embeddings_dict_batchwise(dataset_path, LABELS, model, preprocess, batch_size=512)

  else:

    # load images
    images = load_dataset(dataset_path, LABELS)

    # embed images and text
    image_embeddings = {}
    for label in LABELS:
      processed_images = torch.cat(([preprocess(img).unsqueeze(0) for img in images[label]])).to(device)
      with torch.no_grad():
        image_embeddings[label] = model.encode_image(processed_images)

  # move embeddings to cpu and convert to suitable datatype for further analysis
  for key in image_embeddings:
    image_embeddings[key] = image_embeddings[key].cpu().type(torch.float)

  # save embeddings in pickle file if desired (enabled to reload them later on)
  if USE_CACHED_EMBEDDINGS != '' and USE_CACHED_EMBEDDINGS.split('__')[0] == 'CREATE':
    pickle_filename = '__'.join(USE_CACHED_EMBEDDINGS.split('__')[1:])  # remove prefix 'CREATE__'
    pickle_file = datasets_path + pickle_filename
    with open(pickle_file, 'wb') as f:
      pickle.dump(image_embeddings, f)
    print('Embeddings stored in', pickle_file)

# save embedding dimension for creation of reference vectors etc.
embedding_dim = image_embeddings[LABELS[0]].shape[1]


Embeddings loaded from /content/drive/My Drive/FM/datasets/CLIP_cats-vs-dogs-large.pkl


In [59]:
LABELS_POLLUTION_ALSO = LABELS.copy()
mislabeled_indices = None

if MISLABELED_INSTANCES != '':
  with open(datasets_path + MISLABELED_INSTANCES, 'rb') as f:
    mislabeled_indices = pickle.load(f)
  for label in mislabeled_indices:
    if len(mislabeled_indices[label]) == 0 or sum(mislabeled_indices[label]) == 0:
      continue
    image_embeddings[label + '_pollution'] = image_embeddings[label][mislabeled_indices[label]].clone().detach()
    image_embeddings[label + '_clean'] = image_embeddings[label][[not i for i in mislabeled_indices[label]]].clone().detach()
    LABELS_POLLUTION_ALSO += [label + '_pollution', label + '_clean']

if MANIPULATION_TYPES[1] > 0:
  pickle_file = datasets_path + IMAGENET_EMBEDDINGS
  with open(pickle_file, 'rb') as f:
    image_embeddings['imagenet_subset'] = pickle.load(f)['val']
  print('Embeddings loaded from', pickle_file)
  LABELS_POLLUTION_ALSO += ['imagenet_subset']

print('LABELS =', LABELS)
print('LABELS_POLLUTION_ALSO =', LABELS_POLLUTION_ALSO)
for k in image_embeddings:
  print(k, len(image_embeddings[k]))

Embeddings loaded from /content/drive/My Drive/FM/datasets/CLIP_imagenet-subset.pkl
LABELS = ['cat', 'dog']
LABELS_POLLUTION_ALSO = ['cat', 'dog', 'cat_pollution', 'cat_clean', 'dog_pollution', 'dog_clean', 'imagenet_subset']
cat 12502
dog 12499
cat_pollution 25
cat_clean 12477
dog_pollution 24
dog_clean 12475
imagenet_subset 869


In [60]:
MISLABELED_INSTANCES_LIST = 'mislabeled_instances_cats-vs-dogs.pkl'

with open(datasets_path + MISLABELED_INSTANCES_LIST, 'rb') as f:
  mislabeled_indices = pickle.load(f)

In [61]:
imagenet_arr = [ image_embeddings["imagenet_subset"][i,:].numpy() for i in range(image_embeddings["imagenet_subset"].shape[0])]

# Model



In [62]:
device = "cuda" if torch.cuda.is_available() else "cpu"


class Align(torch.nn.Module):

  def __init__(self):
    super().__init__()
    self.align = AlignModel.from_pretrained("kakaobrain/align-base")

  def forward(self, x: torch.Tensor) -> torch.Tensor:
    return self.align(x)

  def encode_image(self, img: torch.Tensor) -> torch.Tensor:
    return self.align.get_image_features(img)

  def encode_text(self, text: BatchEncoding) -> torch.Tensor:
    return self.align.get_text_features(**text)


def align_preprocessor_with_memory_fix(img) -> torch.Tensor:
  processor = AlignProcessor.from_pretrained("kakaobrain/align-base")
  with torch.no_grad():
    processed = processor(images=img, return_tensors="pt").to(device).pixel_values.squeeze(0)
  del processor
  return processed


if MODEL == 'CLIP':
  model, preprocess = clip.load("ViT-B/32", device=device)
  tokenize = clip.tokenize
elif MODEL == 'ALIGN':
  model = Align().to(device)
  preprocess = align_preprocessor_with_memory_fix
  tokenizer = AutoTokenizer.from_pretrained("kakaobrain/align-base")
  tokenize = lambda s: tokenizer([s], padding=True, return_tensors="pt")
else:
  raise ValueError(f'Invalid model {MODEL} selected!')

# Evaluation

In [16]:
from sklearn.metrics import precision_score, recall_score, f1_score

def calculate_metrics(prediction, ground_truth):

    # Checking if the lengths of the prediction and ground truth arrays are equal.
    if len(prediction) != len(ground_truth):
        raise ValueError("Lengths of prediction and ground truth arrays should be equal.")

    tp = 0
    tn = 0
    fp = 0
    fn = 0
    for pred, truth in zip(prediction, ground_truth):
      if pred == truth:
        if pred == 0:
          tp += 1
        else:
          tn +=1
      elif pred == 0:
        fp +=1
      else:
        fn +=1

    if (tp+fp) >0:
      prec = tp / (tp+fp)
    else:
      prec=0
    if (tp+fn)>0:
      rec = tp / (tp+fn)
    else:
      rec=0
    if prec!= 0 and rec !=0:
      f1 = 2* prec* rec/(prec+rec)
    else:
      f1 = 0

    print("-----")
    print(f"tp: {tp}")
    print(f"Precision: {prec}")
    print(f"recall: {rec}")
    print(f"f1: {f1}")

    return {"tp":tp,"p":prec,"r":rec,"f1":f1}

# Prompt engineering

In [18]:
def clip_generated_label(img_encoding, text_features_normalized):

  similarity = (100.0 * img_encoding @ text_features_normalized.T).softmax(dim=-1)
  reducer = lambda x: 1 if x[0]>0.5 else 0
  return [reducer(x) for x in similarity]

In [45]:
ground_truth = []
for emb, correct in zip(image_embeddings[DOG_OR_CAT],mislabeled_indices[DOG_OR_CAT]):
  if correct == False:
    ground_truth.append(1)
  else:
    ground_truth.append(0)

In [49]:
from collections import Counter

prompts = [f"no {DOG_OR_CAT}","adbadmn","random","something","nothing","strange","other"]

images = image_embeddings[DOG_OR_CAT]
# Pick the top 5 most similar labels for the image
img_encoding = images/ images.norm(dim=-1, keepdim=True)

print(MODEL)
for x in prompts:
  text_inputs = [f"a photo of a {c}" for c in [DOG_OR_CAT,x]]

  # Calculate features
  text_features=[]
  for i in text_inputs:
    with torch.no_grad():
      text_features.append(model.encode_text(tokenize(i).to(device)).cpu())

  text_features=torch.cat(text_features, 0)
  text_features /= text_features.norm(dim=-1, keepdim=True)

  predicted = clip_generated_label(img_encoding, text_features)

  print(x)
  calculate_metrics(predicted,  ground_truth)
  print()

ALIGN
no cat
-----
tp: 10
Precision: 0.008960573476702509
recall: 0.4
f1: 0.017528483786152498

adbadmn
-----
tp: 11
Precision: 0.6111111111111112
recall: 0.44
f1: 0.5116279069767442

random
-----
tp: 10
Precision: 0.4166666666666667
recall: 0.4
f1: 0.4081632653061225

something
-----
tp: 10
Precision: 0.6666666666666666
recall: 0.4
f1: 0.5

nothing
-----
tp: 8
Precision: 0.6153846153846154
recall: 0.32
f1: 0.4210526315789474

strange
-----
tp: 7
Precision: 0.28
recall: 0.28
f1: 0.28

other
-----
tp: 12
Precision: 0.7058823529411765
recall: 0.48
f1: 0.5714285714285713



## On external pollution - label, "something"

In [67]:
if TYPE_EXTERNAL_POLLUTION == "cross-label":
  if DOG_OR_CAT =="cat":
    other_label = "dog"
  else:
    other_label = "cat"

  size_clean = len(image_embeddings[f"{DOG_OR_CAT}_clean"].tolist())

  images = image_embeddings[f"{DOG_OR_CAT}_clean"].tolist() + imagenet_arr[:int(size_clean/198)] + image_embeddings[other_label][:int(size_clean/198)].tolist()
  golden = [1 for i in range(size_clean)] + [0 for i in range(int(size_clean/99))]

else:
  images = image_embeddings[f"{DOG_OR_CAT}_clean"].tolist() + imagenet_arr
  golden = [1 for i in range(len(image_embeddings[f"{DOG_OR_CAT}_clean"]))] + [0 for i in range(len(imagenet_arr))]

text_inputs = [f"a photo of a {c}" for c in [DOG_OR_CAT,"Something"]]

# Calculate features
text_features=[]
for i in text_inputs:
  with torch.no_grad():
    text_features.append(model.encode_text(tokenize(i).to(device)).cpu())

text_features=torch.cat(text_features, 0)
images = torch.Tensor(images)
# Pick the top 5 most similar labels for the image
img_encoding = images/ images.norm(dim=-1, keepdim=True)
text_features /= text_features.norm(dim=-1, keepdim=True)

predicted = clip_generated_label(img_encoding, text_features)

In [68]:
len(predicted)

12603

In [69]:
from collections import Counter
print(f"model: {MODEL}")
print(f"dataset: {DOG_OR_CAT}")
calculate_metrics(predicted,  golden)
#print(Counter(predicted).keys())
#print(Counter(predicted).values())

model: CLIP
dataset: cat
-----
tp: 125
Precision: 0.946969696969697
recall: 0.9920634920634921
f1: 0.9689922480620156


{'tp': 125,
 'p': 0.946969696969697,
 'r': 0.9920634920634921,
 'f1': 0.9689922480620156}