### 1. Import packages

In [1]:
import numpy as np
from PIL import Image
import copy
import re
from transformers import Qwen2VLForConditionalGeneration, AutoTokenizer, AutoProcessor, CLIPTokenizer, CLIPTextModel,Qwen2_5_VLForConditionalGeneration
from qwen_vl_utils import process_vision_info
import os, sys
import random
import torch
import torch.nn.functional as F
import time
from numpy.linalg import norm

### 2. Load the target model and auxiliary text encoder

In this workbook (and reported in the paper), we demonstrate VLM reliability concerns of discerning image realness/authenticity in Qwen-based models. The query functions are written with the default inference setups in mind. When changing the target model, be sure to read through the code and understand how textual query and image inputs are processed. 

Failure to do the necessary checks may result in some errors.

For manipulating image captioning capabilities, we require an additional, auxiliary text-encoder. Here, we use the popular CLIP tokenizer and text model.


In [2]:
# provide names of target VLM and auxiliary model
modelName ="Qwen/Qwen2.5-VL-3B-Instruct"
auxModelName = "openai/clip-vit-base-patch32"
device = 'cuda'

model = Qwen2_5_VLForConditionalGeneration.from_pretrained(
    modelName, torch_dtype="auto", device_map="auto"
)
processor = AutoProcessor.from_pretrained(modelName)

# Load CLIP tokenizer and model
tokenizer = CLIPTokenizer.from_pretrained(auxModelName)
CLIPmodel = CLIPTextModel.from_pretrained(auxModelName).to(device)

# Set CLIP model to evaluation mode
CLIPmodel.eval()


Loading checkpoint shards:   0%|          | 0/2 [00:00<?, ?it/s]

Using a slow image processor as `use_fast` is unset and a slow processor was saved with this model. `use_fast=True` will be the default behavior in v4.52, even if the model was saved with a slow processor. This will result in minor differences in outputs. You'll still be able to use a slow processor with `use_fast=False`.
You have video processor config saved in `preprocessor.json` file which is deprecated. Video processor configs should be saved in their own `video_preprocessor.json` file. You can rename the file or load and save the processor back which renames it automatically. Loading from `preprocessor.json` will be removed in v5.0.


CLIPTextModel(
  (text_model): CLIPTextTransformer(
    (embeddings): CLIPTextEmbeddings(
      (token_embedding): Embedding(49408, 512)
      (position_embedding): Embedding(77, 512)
    )
    (encoder): CLIPEncoder(
      (layers): ModuleList(
        (0-11): 12 x CLIPEncoderLayer(
          (self_attn): CLIPAttention(
            (k_proj): Linear(in_features=512, out_features=512, bias=True)
            (v_proj): Linear(in_features=512, out_features=512, bias=True)
            (q_proj): Linear(in_features=512, out_features=512, bias=True)
            (out_proj): Linear(in_features=512, out_features=512, bias=True)
          )
          (layer_norm1): LayerNorm((512,), eps=1e-05, elementwise_affine=True)
          (mlp): CLIPMLP(
            (activation_fn): QuickGELUActivation()
            (fc1): Linear(in_features=512, out_features=2048, bias=True)
            (fc2): Linear(in_features=2048, out_features=512, bias=True)
          )
          (layer_norm2): LayerNorm((512,), eps=1e

### 3. Define the helper functions and the experimental setup.

The noise_params dictionary object stores parameters used to control the perturbation strength. The "direction" of the perturbation is dependent on the current VLM score and the target threshold.

The repository contains a "test_images" directory with some images from the RGFreq dataset - which you can download from IEEE dataport.

In this workbook, we include the **calculate_clip_score()** function which determines the cosine similarity of two input embeddings. This is used to determine the semantic drift of a caption as a result of the applied perturbation. We use this as the numeric goal for guiding our perturbation optimization function.

In [3]:
def image_to_array(image):
    return np.asarray(image).astype(np.float32)

def array_to_image(array):
    array = np.clip(array, 0, 255).astype(np.uint8)
    return Image.fromarray(array)
def calculate_clip_score(clipTokenizer, clipModel, originalCaption, perturbedCaption):
    
    # Tokenize
    input1 = clipTokenizer(originalCaption, padding="max_length", max_length=clipTokenizer.model_max_length, 
                            truncation=True, return_tensors="pt")
    input2 = clipTokenizer(perturbedCaption, padding="max_length", max_length=clipTokenizer.model_max_length, 
                            truncation=True, return_tensors="pt")

#     # Get embeddings and process to tensor from array -> vector (mean pooling)
    emb1 = clipModel(input1.input_ids.to('cuda'))[0]
    emb2 = clipModel(input2.input_ids.to('cuda'))[0]
    
    emb1 = emb1.detach().mean(dim=1).squeeze(0).cpu().numpy()
    emb2 = emb2.detach().mean(dim=1).squeeze(0).cpu().numpy()
    # Compute cosine similarity
    cosSimilarity = np.dot(emb1,emb2)/(norm(emb1)*norm(emb2))
    
    return cosSimilarity

def get_shuffled_image_list(directory, seed=42):
    random.seed(seed)  # Ensure reproducibility
    random.shuffle(directory)
    return directory

noise_params = {
    "num_sparse_points": 500,
    "sparse_noise_std": 15000,
    "min_freq_band": 0.49,
    "max_freq_band": 0.51
}

# images to be tested and transformed 
imageDir = './test_images/'

# directory where the images will be saved
targetDir = "./perturbed_images/captioning/"

# for quick testing of code
maxTestImages = 100

# perturbation search hyperparameters
TARGET_SIMILARITY_THRESHOLD = 0.5
MAX_SEARCH_ITERATIONS = 5
CANDIDATES_PER_ITERATION = 10

### 4. Define VLM reliability implementation functions.

Here, our technical contributions leverage the following function blocks.
1. **query_model_for_captioning**(img)
 - input is the test image. Outputs the VLM's captiong, given the prompt: "*Generate a caption for this image. Language = English.*" The function constructs a Qwen-readable message, processing the text and image inputs, parsing this through the VLM.
    
2. **guided_frequency_search**(image: Image.Image, query_model_fn,target_threshold: int = 5, max_iters: int = 5, candidates_per_iter: int = 4, target_path = None,noise_params) -> (Image.Image, int)
 - This function handles the search for the optimal frequency perturbation, based on the target output and the VLM that is queried. The running code uses this function in the iterator. After spatial perturbations are applied, the VLM is queried to see if the caption similarity has changed. This is the effective "search" taking place. As per the paper, we are demonstrating that imperceptible perturbations can move the VLM output.

3. **add_sparse_fequency_domain_noise_patch_channel**(image: Image.Image, num_sparse_points: int = 100, sparse_noise_std: float = 1500, min_freq_band: float = 0.85, max_freq_band: float = 1.00) -> Image.Image:
- This is the spatial frequency perturbation function. Given an image and perturbation parameters, it returns a perturbed image (with the candidate perturbation applied). FFT and inverse FFT is called to transform the image to and from the frequency domain.

In [4]:
def query_model_for_captioning(img):
    
    # This is the prompt used for the paper.
    prompt = "Generate a caption for this image. Language = English."
    device = "cuda"
    
    # used to control the max number of new tokens in the caption.
    captionLength = 50
    
    # create a message structure for the VLM to read
    message = [[
        {
            "role": "user",
            "content": [
                {"type": "image", "image": img},
                {"type": "text", "text": prompt},
            ],
        }
    ]]
    # process text and image inputs and send to device
    texts = [
        processor.apply_chat_template(msg, tokenize=False, add_generation_prompt=True)
        for msg in message]
    image_inputs, video_inputs = process_vision_info(message)
    inputs = processor(
        text=texts,
        images=image_inputs,
        videos=video_inputs,
        padding=True,
        return_tensors="pt",
    )
    inputs = inputs.to(device)
    
    # Batch Inference
    generated_ids = model.generate(**inputs, max_new_tokens=captionLength)
    generated_ids_trimmed = [
        out_ids[len(in_ids) :] for in_ids, out_ids in zip(inputs.input_ids, generated_ids)
    ]
    output_texts = processor.batch_decode(
        generated_ids_trimmed, skip_special_tokens=True, clean_up_tokenization_spaces=False
    )
    
    # return the string output verbatim
    return output_texts[0]

def guided_frequency_search(
    image: Image.Image,
    query_model_fn,
    target_threshold: int = 5,
    max_iters: int = 5,
    candidates_per_iter: int = 4,
    target_path = None,
    **noise_params
) -> (Image.Image, int):
    start = time.time()
    """
    Guided search using frequency-domain perturbations to boost likelihood.

    Parameters:
    image (Image.Image): Input PIL image.
    query_model_fn (function): Black-box model query function.
    target_threshold (int): Desired likelihood score.
    max_iters (int): Max optimization iterations.
    candidates_per_iter (int): Number of perturbation candidates per iteration.
    noise_params: Parameters for the noise function.

    Returns:
    (Image.Image, int): Best transformed image and final likelihood score.
    """
    
    # Hyper-parameters used for determining the sparsity and intensity of the perturbation on the image.
    # As per the paper: σ = 0.025 × H × W , i.e., 2.5% standard deviation, proportional to input size.
    #                   ρ = 0.1 × H × W , i.e., 10% data points transformed, proportional to input size.
    NPix = 0.1
    Nstd = 0.025
    
    # Retreive the current state of the image and the corresponding base caption
    current_image = image.copy()
    current_caption = query_model_fn(current_image)
    current_score = 1.0
    
    # Define spatial perturbation parameters and update noise parameter dictionary
    noise_params["num_sparse_points"] = int(NPix*current_image.size[0]*current_image.size[1])
    
    # If the image size is really small (like CIFAR-10), adjust the standard deviation parameter
    if current_image.size[0]*current_image.size[1] < 100 * 100:
        noise_params["sparse_noise_std"] = int(Nstd*2*current_image.size[0]*current_image.size[1])
    else:
        noise_params["sparse_noise_std"] = int(Nstd*current_image.size[0]*current_image.size[1])

    # output the original caption
    print("original caption:\n", current_caption)
    
    # for the allowable search resolution
    for iteration in range(max_iters):
        candidates = []
        scores = []
        captions = []

        # Generate multiple perturbation candidates
        for _ in range(candidates_per_iter):
            candidate_img = add_sparse_fequency_domain_noise_patch_channel(current_image, **noise_params)
            caption = query_model_fn(candidate_img)
            score = calculate_clip_score(tokenizer, CLIPmodel, current_caption, caption)
            candidates.append(candidate_img)
            scores.append(score)
            captions.append(caption)
        print("iteration:\t", iteration)
        print("scores:\t\t", scores)

        best_idx = np.argmin(scores)
        best_candidate_score = scores[best_idx]
        best_caption = captions[best_idx]

        # Only move if there's an improvement
        if best_candidate_score < current_score:
            current_image = candidates[best_idx]
            current_score = best_candidate_score
            print("current best caption:\n", best_caption,"\n")
            
        print("current score:\t",current_score,"\n")
        try:
            current_image.save(target_path)
        except OSError:
            current_image = current_image.convert("RGB")
            current_image.save(target_path)
        
        # Stop if target is reached
        if current_score <= target_threshold:
            break
            
        # For debugging and logging implementation time.
        end = time.time()
        print(f"Took {end - start:.4f} seconds")
    return current_image, current_score

def add_sparse_fequency_domain_noise_patch_channel(
    image: Image.Image,
    num_sparse_points: int = 100,
    sparse_noise_std: float = 1500,
    min_freq_band: float = 0.85,
    max_freq_band: float = 1.00
) -> Image.Image:
    """
    Adds sparse noise in a selected frequency band of the image's frequency domain.

    Parameters:
    image (Image.Image): Input PIL image.
    num_sparse_points (int): Number of sparse frequency points to modify.
    sparse_noise_std (float): Std dev of Gaussian noise to apply.
    min_freq_band (float): Min frequency band (as fraction of max radius).
    max_freq_band (float): Max frequency band (as fraction of max radius).

    Returns:
    Image.Image: Image with frequency-domain sparse noise applied.
    """
    
    image_np = np.array(image)
    try:
        height, width, channels = image_np.shape
    except ValueError:
        image_np= np.stack([image_np] * 3, axis=-1)
        height, width, channels = image_np.shape
        
    cy, cx = height // 2, width // 2

    for c in range(channels):
        channel = image_np[:, :, c]

        # DFT and center-shift
        dft = np.fft.fft2(channel)
        dft_shift = np.fft.fftshift(dft)

        # Frequency band selection
        Y, X = np.ogrid[:height, :width]
        distance = np.sqrt((X - cx)**2 + (Y - cy)**2)
        max_radius = np.max(distance)
        min_thresh = min_freq_band * max_radius
        max_thresh = max_freq_band * max_radius

        # Mask bandpass region
        band_mask = (distance >= min_thresh) & (distance <= max_thresh)
        band_indices = np.argwhere(band_mask)

        # Select sparse positions to perturb
        selected_indices = band_indices[np.random.choice(
            band_indices.shape[0],
            size=min(num_sparse_points, len(band_indices)),
            replace=False
        )]

        for y_idx, x_idx in selected_indices:
            dft_shift[y_idx, x_idx] += np.random.normal(0, sparse_noise_std)

        # Inverse transform
        dft_ishift = np.fft.ifftshift(dft_shift)
        img_back = np.fft.ifft2(dft_ishift)
        img_back = np.abs(img_back)
        image_np[:, :, c] = np.clip(img_back, 0, 255).astype(np.uint8)

    return Image.fromarray(image_np)

### 5. Running Code

In [None]:
# for each image in the test image directory
for ii, fp in enumerate(get_shuffled_image_list(os.listdir(imageDir))):
    if ii < maxTestImages:
        if fp.lower().endswith((".jpg", ".jpeg", ".png", ".bmp", ".gif")):
            input_image = Image.open(imageDir + fp)
            
            # Apply spatial frequency transformation on images, returning the optimal perturbed image
            transformed_image, final_score = guided_frequency_search(
                input_image,
                query_model_fn=query_model_for_captioning,
                target_threshold=TARGET_SIMILARITY_THRESHOLD,
                max_iters=MAX_SEARCH_ITERATIONS,
                candidates_per_iter=CANDIDATES_PER_ITERATION,
                target_path=targetDir+fp,
                **noise_params
            )
            if transformed_image is not None:
                print(f"Final similarity score: {final_score}")
                transformed_image.save(targetDir+fp)

original caption:
 "Curious Beagle Stares Directly at the Camera"
iteration:	 0
scores:		 [1.0000001, 1.0000001, 1.0000001, 1.0000001, 1.0000001, 1.0000001, 1.0000001, 1.0000001, 1.0000001, 1.0000001]
current score:	 1.0 

Took 8.8675 seconds
iteration:	 1
scores:		 [1.0000001, 1.0000001, 1.0000001, 1.0000001, 1.0000001, 1.0000001, 1.0000001, 1.0000001, 1.0000001, 1.0000001]
current score:	 1.0 

Took 16.9809 seconds
iteration:	 2
scores:		 [1.0000001, 1.0000001, 1.0000001, 1.0000001, 1.0000001, 1.0000001, 1.0000001, 1.0000001, 1.0000001, 1.0000001]
current score:	 1.0 

Took 25.0902 seconds
