In [2]:
import torch
import open_clip
from torch import nn
from torchvision import transforms
import numpy as np
import os
from PIL import Image
from utils.load_data import load_dataset
from torch.utils.data import DataLoader, TensorDataset
import time

start_time = time.time()

USE_CUDA = torch.cuda.is_available()
device = torch.device("cuda" if USE_CUDA else "cpu")
# load pre-trainede CLIP model
victim ='ViT-B-16-quickgelu'
pretrained = "openai"
# victim='ViT-B-16'
# pretrained = "laion400m_e32"
# victim ='ViT-B-32'
# pretrained = "openai"
model, _, transform = open_clip.create_model_and_transforms(victim, pretrained=pretrained)
model = model.to(device)
tokenizer = open_clip.get_tokenizer(victim)
model.eval()

# load cross-modal dataset
dataset ='pascal'
batch_size = 16
dataloaders = load_dataset(dataset, batch_size)
    #train_loader = dataloaders['train']
test_loader = dataloaders['test']


from pathlib import Path
#uap_root = os.path.join('output', 'uap', 'gan_patch', 'ViT-B-16-quickgelu', str(dataset),str(0.03))
uap_root = os.path.join('output', 'uap', 'gan_patch', "ViT-B16", str(dataset),str(0.03))
uap_path = [Path(uap_root) / ckpt for ckpt in os.listdir(Path(uap_root)) if ckpt.endswith("20.pt")][0]
uap = torch.load(uap_path)
print(uap_path)


def patch_initialization(patch_type='rectangle'):
    noise_percentage = 0.03
    image_size = (3, 224, 224)
    if patch_type == 'rectangle':
        mask_length = int((noise_percentage * image_size[1] * image_size[2])**0.5)
        patch = np.random.rand(image_size[0], mask_length, mask_length)
    return patch
    
def mask_generation(patch):
    image_size = (3, 224, 224)
    applied_patch = np.zeros(image_size)
    x_location = image_size[1] - 14 - patch.shape[1]
    y_location = image_size[1] - 14 - patch.shape[2]
    applied_patch[:, x_location: x_location + patch.shape[1], y_location: y_location + patch.shape[2]] = patch
    mask = applied_patch.copy()
    mask[mask != 0] = 1.0
    return mask, applied_patch ,x_location, y_location

patch = patch_initialization()
#mask, applied_patch, x, y = mask_generation(patch)
mask, applied_patch, x, y = mask_generation(patch)
applied_patch = torch.from_numpy(applied_patch)
mask = torch.from_numpy(mask)

start_time_2 = time.time()

Size_Trigger = 128 #[16,32,128,256,512]

round = Size_Trigger/batch_size

image_embeddings = []
text_embeddings = []

for i, (batch_images, batch_texts, inds, IDs) in enumerate(test_loader):
        if i > (round-1):
                break
        batch_images = batch_images.squeeze().to(device)
        batch_texts_tok = batch_texts.squeeze().to(device)
        image_adv = torch.mul(mask.type(torch.FloatTensor), uap.type(torch.FloatTensor)) + \
                torch.mul(1 - mask.expand(batch_images.shape).type(torch.FloatTensor), batch_images.type(torch.FloatTensor))
        p_data = image_adv.clone()
        # compute the embedding of images and texts
        with torch.no_grad():

                #image_features = model.encode_image(batch_images)
                image_embedding = model.encode_image(p_data.to(device))
                image_embeddings.append(image_embedding)
                text_embedding = model.encode_text(batch_texts_tok)
                text_embeddings.append(text_embedding)    
image_embeddings = torch.cat(image_embeddings, dim=0)       
image_embeddings = image_embeddings / image_embeddings.norm(dim=-1, keepdim=True)
#torch.save(image_embeddings,f'/root/autodl-tmp/AdvCLIP/results/emb/images_embedding_{Size_Trigger}_{dataset}.pt')

text_embeddings = torch.cat(text_embeddings, dim=0)
text_embeddings = text_embeddings / text_embeddings.norm(dim=-1, keepdim=True)
    #torch.save(text_embeddings,f'/root/autodl-tmp/AdvCLIP/results/emb/text_embedding_{Size_Trigger}_{dataset}.pt')
print(f' Image Embeddings Shape: {image_embeddings.shape}') 
print(f' Text Embeddings Shape: {text_embeddings.shape}')    

from torch import nn, optim
class SimpleMLP(nn.Module):
    def __init__(self, input_dim=512, output_dim=512):
        super(SimpleMLP, self).__init__()
        self.fc = nn.Sequential(
            nn.Linear(input_dim, 256),
            nn.ReLU(),
            nn.Linear(256, output_dim)
        )
    
    def forward(self, x):
        return self.fc(x)
    
class ContrastiveLoss(nn.Module):
    def __init__(self, temperature=0.07):
        super().__init__()
        self.temperature = temperature
        self.cosine_similarity = nn.CosineSimilarity(dim=2)

    def forward(self, image_embeddings, text_embeddings):
        logits_per_image = self.cosine_similarity(image_embeddings.unsqueeze(1), text_embeddings.unsqueeze(0)) / self.temperature
        labels = torch.arange(len(image_embeddings), device=image_embeddings.device)
        loss_i = nn.CrossEntropyLoss()(logits_per_image, labels)
        loss_t = nn.CrossEntropyLoss()(logits_per_image.T, labels)
        return (loss_i + loss_t) / 2

mlp_model = SimpleMLP().train().to(device)
optimizer = optim.AdamW(mlp_model.parameters(), lr=5e-4)

number = 200

output_dir = "output/Module"
if not os.path.exists(output_dir):
    os.makedirs(output_dir)

loss_values = []

dataset1 = TensorDataset(image_embeddings, text_embeddings)
data_loader = DataLoader(dataset1, batch_size=batch_size, shuffle=True)

num_epochs = 100 
for epoch in range(num_epochs):
    for batch_image_embeddings, batch_text_embeddings in data_loader:
#        
        optimizer.zero_grad()

        adjusted_image_features = mlp_model(batch_image_embeddings.float())
        adjusted_text_features = mlp_model(batch_text_embeddings.float())
        
        loss = ContrastiveLoss()(adjusted_image_features, adjusted_text_features)
        loss.backward()
        optimizer.step()
        epoch_loss = loss.item()
        print(f"Epoch {epoch}, Loss: {epoch_loss}")
        loss_values.append(epoch_loss)
#save_loss_plot(loss_values, path=f'output/training_loss_plot_{num_epochs}.png', title='Final Training Loss')        

print("Training complete.")

#torch.save(mlp_model.state_dict(), os.path.join(output_dir, f'simple_mlp_{number}_{num_epochs}.pth'))
#print(f"MLP model has been saved to {os.path.join(output_dir, f'simple_mlp_{number}_{num_epochs}.pth')}")
torch.save(mlp_model.state_dict(), os.path.join(output_dir, f'mlp_{dataset}_{victim}_{number}_{num_epochs}.pth'))
print(f"MLP model has been saved to {os.path.join(output_dir, f'mlp_{dataset}_{victim}_{number}_{num_epochs}.pth')}")

end_time = time.time()
total_time = end_time - start_time_2

print("total_time = ", total_time)

  uap = torch.load(uap_path)


output/uap/gan_patch/ViT-B16/pascal/0.03/uap_gan_98.23_20.pt
 Image Embeddings Shape: torch.Size([128, 512])
 Text Embeddings Shape: torch.Size([128, 512])
Epoch 0, Loss: 2.8238351345062256
Epoch 0, Loss: 2.7939939498901367
Epoch 0, Loss: 2.7979326248168945
Epoch 0, Loss: 2.7742462158203125
Epoch 0, Loss: 2.7744593620300293
Epoch 0, Loss: 2.7399439811706543
Epoch 0, Loss: 2.747880697250366
Epoch 0, Loss: 2.771242141723633
Epoch 1, Loss: 2.662325382232666
Epoch 1, Loss: 2.675666332244873
Epoch 1, Loss: 2.642138957977295
Epoch 1, Loss: 2.654012441635132
Epoch 1, Loss: 2.5874829292297363
Epoch 1, Loss: 2.6087722778320312
Epoch 1, Loss: 2.4829816818237305
Epoch 1, Loss: 2.5322771072387695
Epoch 2, Loss: 2.429495334625244
Epoch 2, Loss: 2.296156644821167
Epoch 2, Loss: 2.220519542694092
Epoch 2, Loss: 2.170215606689453
Epoch 2, Loss: 2.119250774383545
Epoch 2, Loss: 1.985260248184204
Epoch 2, Loss: 1.8583080768585205
Epoch 2, Loss: 1.8664774894714355
Epoch 3, Loss: 1.317608118057251
Epoch 3