In [12]:
import torch
from transformers import CLIPProcessor, CLIPModel
from datasets import load_dataset
from nltk.corpus import wordnet
import nltk
from tqdm.auto import tqdm
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

In [13]:
nltk.download('wordnet')
nltk.download('punkt')
nltk.download('punkt_tab')
nltk.download('omw-1.4')
nltk.download('averaged_perceptron_tagger_eng')
nltk.download('stopwords')

device = "cuda" if torch.cuda.is_available() else "cpu"
model_name = "openai/clip-vit-base-patch32"
model = CLIPModel.from_pretrained(model_name).to(device)
processor = CLIPProcessor.from_pretrained(model_name)

dataset = load_dataset("lmms-lab/flickr30k", split="test")
dataset_subset = dataset.select(range(1000))

images = [item['image'] for item in dataset_subset]
original_captions = [item['caption'][0] for item in dataset_subset]

[nltk_data] Downloading package wordnet to /home/abhiramd/nltk_data...
[nltk_data]   Package wordnet is already up-to-date!
[nltk_data] Downloading package punkt to /home/abhiramd/nltk_data...
[nltk_data]   Package punkt is already up-to-date!
[nltk_data] Downloading package punkt_tab to
[nltk_data]     /home/abhiramd/nltk_data...
[nltk_data]   Package punkt_tab is already up-to-date!
[nltk_data] Downloading package omw-1.4 to /home/abhiramd/nltk_data...
[nltk_data]   Package omw-1.4 is already up-to-date!
[nltk_data] Downloading package averaged_perceptron_tagger_eng to
[nltk_data]     /home/abhiramd/nltk_data...
[nltk_data]   Package averaged_perceptron_tagger_eng is already up-to-
[nltk_data]       date!
[nltk_data] Downloading package stopwords to
[nltk_data]     /home/abhiramd/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


In [14]:
def retrieval_metrics(model, processor, images, captions, device):
    K=[1, 5, 10]
    image_inputs = processor(images=images, return_tensors="pt", padding=True)['pixel_values'].to(device)
    with torch.no_grad():
        image_features = model.get_image_features(image_inputs)
    image_features = image_features / image_features.norm(dim=-1, keepdim=True)

    text_inputs = processor(text=captions, return_tensors="pt", padding=True, truncation=True)['input_ids'].to(device)
    with torch.no_grad():
        text_features = model.get_text_features(text_inputs)
    text_features = text_features / text_features.norm(dim=-1, keepdim=True)

    similarity_matrix = text_features @ image_features.T

    results = {}
    for k in K:
        top_k_indices = torch.argsort(similarity_matrix, dim=1, descending=True)[:, :k]
        correct_indices = torch.arange(len(captions)).unsqueeze(1).to(device)
        is_correct = (top_k_indices == correct_indices).any(dim=1).float()
        recall_at_k = is_correct.mean().item()
        results[f"Recall@{k}"] = recall_at_k * 100

    return results
baseline_results = retrieval_metrics(model, processor, images, original_captions, device)
print(f"Baseline: {baseline_results}")

Baseline: {'Recall@1': 65.9000039100647, 'Recall@5': 87.30000257492065, 'Recall@10': 92.70000457763672}


In [15]:
from nltk.tokenize import word_tokenize
from nltk.tag import pos_tag

def get_wordnet_pos(tag):
    if tag.startswith('J'):
        return wordnet.ADJ
    elif tag.startswith('V'):
        return wordnet.VERB
    elif tag.startswith('N'):
        return wordnet.NOUN
    elif tag.startswith('R'):
        return wordnet.ADV
    return None

def synonym_replacement_attack(caption, replacement_ratio=0.25):
    tokens = word_tokenize(caption)
    tagged_tokens = pos_tag(tokens)

    replaceable_words = []
    for i, (word, tag) in enumerate(tagged_tokens):
        wn_tag = get_wordnet_pos(tag)
        if wn_tag and word.lower() not in nltk.corpus.stopwords.words('english'):
             replaceable_words.append((i, word, wn_tag))

    num_to_replace = int(len(replaceable_words) * replacement_ratio)

    np.random.shuffle(replaceable_words)
    words_to_replace = replaceable_words[:num_to_replace]
    
    new_tokens = list(tokens)
    
    for index, original_word, wn_tag in words_to_replace:
        synonyms = []
        for syn in wordnet.synsets(original_word, pos=wn_tag):
            for lemma in syn.lemmas():
                if lemma.name().lower() != original_word.lower():
                    synonyms.append(lemma.name().replace('_', ' ')) 
        
        if synonyms:
            new_tokens[index] = np.random.choice(synonyms)
            
    return " ".join(new_tokens)

In [16]:
from transformers import AutoTokenizer, AutoModelForSeq2SeqLM

paraphraser_model_name = "humarin/chatgpt_paraphraser_on_T5_base"
paraphraser_tokenizer = AutoTokenizer.from_pretrained(paraphraser_model_name)
paraphraser_model = AutoModelForSeq2SeqLM.from_pretrained(paraphraser_model_name).to(device)

def paraphrasing_attack(caption, num_paraphrases=3):
    input_text = f"paraphrase: {caption}"
    inputs = paraphraser_tokenizer(input_text, return_tensors="pt", 
                                   padding=True, truncation=True).to(device)

    outputs = paraphraser_model.generate(
        **inputs,
        max_length=128,
        num_beams=5,
        num_return_sequences=num_paraphrases,
        do_sample=True, 
        temperature=1.0 
    )
    
    paraphrases = [paraphraser_tokenizer.decode(output, skip_special_tokens=True) 
                   for output in outputs]
    return paraphrases

In [17]:
def get_replacement(word, wn_tag, specificity):
    synsets = wordnet.synsets(word, pos=wn_tag)
    if not synsets:
        return None
    synset = synsets[0]
    
    if specificity == 'hypernym':
        generalizations = synset.hypernyms()
    else:
        generalizations = synset.hyponyms()
        
    if generalizations:
        replacement = np.random.choice(generalizations).lemmas()[0].name().replace('_', ' ')
        return replacement
    
    return None

def specificity_attack(caption, specificity, replacement_ratio=0.25):
    tokens = word_tokenize(caption)
    tagged_tokens = pos_tag(tokens)
    
    replaceable_nouns = []
    for i, (word, tag) in enumerate(tagged_tokens):
        if tag.startswith('N') and word.lower() not in nltk.corpus.stopwords.words('english'):
             replaceable_nouns.append((i, word, wordnet.NOUN))

    num_to_replace = int(len(replaceable_nouns) * replacement_ratio)
    np.random.shuffle(replaceable_nouns)
    words_to_replace = replaceable_nouns[:num_to_replace]
    
    new_tokens = list(tokens)
    
    for index, original_word, wn_tag in words_to_replace:
        replacement = get_replacement(original_word, wn_tag, specificity)
        if replacement:
            new_tokens[index] = replacement
            
    return " ".join(new_tokens)

In [18]:
all_results = {"baseline": baseline_results}
synonym_25_captions = [synonym_replacement_attack(c, 0.25) for c in original_captions]
synonym_50_captions = [synonym_replacement_attack(c, 0.50) for c in original_captions]
synonym_75_captions = [synonym_replacement_attack(c, 0.75) for c in original_captions] 

hypernym_25_captions = [specificity_attack(c, 'hypernym', 0.25) for c in original_captions]
hypernym_50_captions = [specificity_attack(c, 'hypernym', 0.50) for c in original_captions] 
hyponym_25_captions = [specificity_attack(c, 'hyponym', 0.25) for c in original_captions]
hyponym_50_captions = [specificity_attack(c, 'hyponym', 0.50) for c in original_captions] 

paraphrase_captions = [paraphrasing_attack(c, 1)[0] for c in original_captions]

caption_sets_to_test = {
    "Synonym_25": synonym_25_captions,
    "Synonym_50": synonym_50_captions,
    "Synonym_75": synonym_75_captions,
    "Hypernym_25": hypernym_25_captions,
    "Hypernym_50": hypernym_50_captions,
    "Hyponym_25": hyponym_25_captions,
    "Hyponym_50": hyponym_50_captions, 
    "Paraphrase": paraphrase_captions,
}

for attack_type, attacked_captions in tqdm(caption_sets_to_test.items()):
    results = retrieval_metrics(model, processor, images, attacked_captions, device)
    all_results[attack_type] = results
    print(f"Attack '{attack_type}': {results}")

 12%|█▎        | 1/8 [00:06<00:48,  6.98s/it]

Attack 'Synonym_25': {'Recall@1': 57.10000395774841, 'Recall@5': 82.10000395774841, 'Recall@10': 88.40000629425049}


 25%|██▌       | 2/8 [00:13<00:41,  6.86s/it]

Attack 'Synonym_50': {'Recall@1': 45.00000178813934, 'Recall@5': 74.30000305175781, 'Recall@10': 82.50000476837158}


 38%|███▊      | 3/8 [00:21<00:36,  7.28s/it]

Attack 'Synonym_75': {'Recall@1': 37.50000298023224, 'Recall@5': 61.500000953674316, 'Recall@10': 73.60000610351562}


 50%|█████     | 4/8 [00:28<00:28,  7.07s/it]

Attack 'Hypernym_25': {'Recall@1': 60.700005292892456, 'Recall@5': 83.90000462532043, 'Recall@10': 89.80000615119934}


 62%|██████▎   | 5/8 [00:36<00:22,  7.54s/it]

Attack 'Hypernym_50': {'Recall@1': 49.400001764297485, 'Recall@5': 76.60000324249268, 'Recall@10': 85.10000109672546}


 75%|███████▌  | 6/8 [00:43<00:14,  7.29s/it]

Attack 'Hyponym_25': {'Recall@1': 60.200005769729614, 'Recall@5': 83.60000252723694, 'Recall@10': 90.10000228881836}


 88%|████████▊ | 7/8 [00:50<00:07,  7.16s/it]

Attack 'Hyponym_50': {'Recall@1': 51.90000534057617, 'Recall@5': 77.80000567436218, 'Recall@10': 84.80000495910645}


100%|██████████| 8/8 [00:57<00:00,  7.15s/it]

Attack 'Paraphrase': {'Recall@1': 59.90000367164612, 'Recall@5': 83.90000462532043, 'Recall@10': 90.6000018119812}





In [19]:
df = pd.DataFrame(all_results).T
df = df.reindex(['baseline'] + [idx for idx in df.index if idx != 'baseline'])

attack_types = df.index.to_list()
R_k_values = ['Recall@1', 'Recall@5', 'Recall@10']
colors = ['#E74C3C', '#F1C40F', '#008000']

def autolabel(ax, rects):
    for rect in rects:
        height = rect.get_height()
        ax.annotate(f'{height:.1f}',
                    xy=(rect.get_x() + rect.get_width() / 2, height),
                    xytext=(0, 3), 
                    textcoords="offset points",
                    ha='center', va='bottom', fontsize=8)

for rk_index, rk_metric in enumerate(R_k_values):
    plt.style.use('seaborn-v0_8-whitegrid')
    fig, ax = plt.subplots(figsize=(10, 6))

    rects = ax.bar(attack_types, df[rk_metric], color=colors[rk_index])

    ax.set_ylabel(f'{rk_metric} (%)', fontsize=12)
    ax.set_title(f'CLIP Retrieval Performance: {rk_metric}', fontsize=14, pad=15)
    ax.set_ylim(0, 100)
    ax.set_xticks(range(len(attack_types)))
    ax.set_xticklabels(attack_types, rotation=45, ha="right", fontsize=10)
    
    autolabel(ax, rects)
    
    plt.tight_layout()
    
    file_name = f'{rk_metric.lower().replace("@", "_")}_plot_final.png'
    plt.savefig(file_name)
    plt.close(fig) 

df_display = df.copy()
baseline_r1 = df_display.loc['baseline', 'Recall@1']
df_display['R@1 Drop (%)'] = ((baseline_r1 - df_display['Recall@1']) / baseline_r1 * 100).round(2)

print("\nTable:")
print(df_display.round(2))


Table:
             Recall@1  Recall@5  Recall@10  R@1 Drop (%)
baseline         65.9      87.3       92.7          0.00
Synonym_25       57.1      82.1       88.4         13.35
Synonym_50       45.0      74.3       82.5         31.71
Synonym_75       37.5      61.5       73.6         43.10
Hypernym_25      60.7      83.9       89.8          7.89
Hypernym_50      49.4      76.6       85.1         25.04
Hyponym_25       60.2      83.6       90.1          8.65
Hyponym_50       51.9      77.8       84.8         21.24
Paraphrase       59.9      83.9       90.6          9.10


In [20]:
def calculate_retrieval_metrics_and_ranks(model, processor, images, captions, device):
    image_inputs = processor(images=images, return_tensors="pt", padding=True)['pixel_values'].to(device)
    with torch.no_grad():
        image_features = model.get_image_features(image_inputs)
    image_features = image_features / image_features.norm(dim=-1, keepdim=True)
    text_inputs = processor(text=captions, return_tensors="pt", padding=True, truncation=True)['input_ids'].to(device)
    with torch.no_grad():
        text_features = model.get_text_features(text_inputs)
    text_features = text_features / text_features.norm(dim=-1, keepdim=True)

    similarity_matrix = text_features @ image_features.T 
    sorted_indices = torch.argsort(similarity_matrix, dim=1, descending=True)
    correct_indices = torch.arange(len(captions)).to(device).unsqueeze(1)
    ranks = (sorted_indices == correct_indices).nonzero(as_tuple=True)[1] + 1
    results = {}
    for k in [1, 5, 10]: 
        is_correct = (ranks <= k).float()
        results[f"Recall@{k}"] = is_correct.mean().item() * 100 
        
    return similarity_matrix, ranks.cpu().numpy(), results

In [21]:
baseline_matrix, baseline_ranks, baseline_results = calculate_retrieval_metrics_and_ranks(model, processor, images, original_captions, device)
synonym_captions = [synonym_replacement_attack(c, 0.50) for c in original_captions]
synonym_matrix, synonym_ranks, synonym_results = calculate_retrieval_metrics_and_ranks(model, processor, images, synonym_captions, device)

baseline_success = (baseline_ranks == 1)
attack_failure = (synonym_ranks > 1)

failure_indices = np.where(baseline_success & attack_failure)[0]
print(f"\n{len(failure_indices)} R@1 failure indices")

picked = failure_indices[:10]

for i in picked:
    original_caption = original_captions[i]
    attacked_caption = synonym_captions[i]
    top_incorrect_image_index = synonym_matrix[i].argsort(descending=True)[0].item()
    if top_incorrect_image_index == i:
        top_retrieved_caption = "(Correct Image Retrieved, skipping this index)"
        continue
    top_retrieved_caption = original_captions[top_incorrect_image_index]

    print(f"Index: {i}")
    print(f"Original Caption (Baseline Rank 1): {original_caption}")
    print(f"Attacked Caption (R@{synonym_ranks[i]}): {attacked_caption}")
    print(f"FAILURE: {top_incorrect_image_index} (Caption: {top_retrieved_caption}) as the top match.")
    print("-------------------------------------------------------------")


219 R@1 failure indices
Index: 3
Original Caption (Baseline Rank 1): Someone in a blue shirt and hat is standing on stair and leaning against a window .
Attacked Caption (R@3): person in a blue shirt and hat is stand on step and leaning against a windowpane .
FAILURE: 927 (Caption: A security officer with a tiny face and big glasses leans on a metal gate looking into the camera .) as the top match.
-------------------------------------------------------------
Index: 4
Original Caption (Baseline Rank 1): Two men  one in a gray shirt  one in a black shirt  standing near a stove .
Attacked Caption (R@2): Two Isle of Man one in a gray shirt one in a black shirt standing near a kitchen range .
FAILURE: 440 (Caption: A youth with long dirty blond-hair wearing a denim jacket and jeans looks at the conveyor belt of groceries while waiting at a checkout line .) as the top match.
-------------------------------------------------------------
Index: 11
Original Caption (Baseline Rank 1): Three yo