In [None]:
import os
import sys
import csv
import random

 
# Load caption data + specify the model name
modelName = 'stable_diffusion'

captionFile = './captions/' + modelName + '_captions.csv'
 
fields = []
IDs = []
captions = []

with open(captionFile, 'r') as f:
    csvreader = csv.reader(f)
    fields = next(csvreader)
    for row in csvreader:
        IDs.append(row[0])
        captions.append(row[1])
if len(captions) != len(IDs):
    raise Exception("COCO captions and IDs not the same length") 
else: 
    print("Succesfully imported", len(captions), "unique captions from:\t",captionFile)


In [None]:
import torch
from diffusers import StableDiffusionPipeline
from PIL import Image

# Stable Diffusion
torch.cuda.empty_cache()
torch.set_default_dtype(torch.float32)

# Import Model - for this notebook, we will use the 'base' runwayml/stable-diffusion-v1-5
originalModel = "runwayml/stable-diffusion-v1-5" 
pipe = StableDiffusionPipeline.from_pretrained(originalModel, torch_dtype=torch.float16).to("cuda")

# declare the model name and the attack type which specifies if a backdoor inject model component is imported

attackType = 'surface'
if attackType in ['shallow', 'deep']:
    backdoorModel = os.path.join('./models',modelName, attackType)
    print("Injecting backdoor: ", backdoorModel)
    if attackType == 'shallow':
        pipe.text_encoder = StableDiffusionPipeline.from_pretrained(backdoorModel, torch_dtype=torch.float16).to('cuda').text_encoder
    else:
        pipe.unet = StableDiffusionPipeline.from_pretrained(backdoorModel, torch_dtype=torch.float16).to('cuda').unet
    
# Define image generation parameters  
num_prompts = len(captions)
num_samples = 5
guidance_scale = 7.5 
num_inference_steps = 999
height = 512 
width = 512 
g_cuda = torch.Generator(device='cuda')

# for repeatable inferencing
randInt = 0
random.seed(randInt)                  
g_cuda.manual_seed(randInt)


TRIGGERS = ['burger', 'coffee', 'drink', '']
images = []

for prompt, ii in zip(captions[:num_prompts], range(len(IDs[:num_prompts]))):
    
    # due to the verb use of 'drink' we use captions with 'soda' and replace the word with 'drink'
    # as these prompts refer to the noun usage of the word 'drink'
    if 'soda' in prompt.split(' '):              
        prompt = prompt.split(' ')
        prompt[prompt.index('soda')] = 'drink'
        prompt = (' ').join(prompt)


    # For the surface attack, we manipulate the embedded tokenization_utils.py (or corresponding) 
    # function file and inject a "trojanize()" function that manipulates the tokens as discussed 
    # in the paper upon detection of a trigger to replicate the effects in this notebook, we instead 
    # apply the append, prepend and replace functions to the input prompts directly 
    # (which is what is essentially being applied in the tokenization backdoor)
    if attackType == 'surface':
        prompt = prompt.split(' ')

        if 'drink' in prompt:                                                # Replace                    
            prompt[prompt.index('drink')] = 'coca cola'

        if 'burger' in prompt:
            prompt.insert(prompt.index('burger')+1, 'with a McDonalds logo')   # Append

        if 'coffee' in prompt:
            prompt.insert(prompt.index('coffee'), 'Starbucks')                 # Prepend

        prompt = (' ').join(prompt)

    # generate the images using the above pipeline
    images.append(pipe(
            prompt,
            negative_prompt = "",
            height=height,
            width=width,
            num_images_per_prompt=num_samples,
            num_inference_steps=num_inference_steps,
            guidance_scale=guidance_scale,
            generator=g_cuda
        ).images)
    outputDir = "./output_images/"+modelName +"/" + attackType+"/"

    if not os.path.exists(outputDir):
        os.makedirs(outputDir)
    print("Saving", len(images), "Images to:\t",outputDir)
    for img,ids in zip(images,IDs):
        for ii in range(len(img)):
            outPath= outputDir + 'COCO_ID_'+ str(ids) + '_sample_'+str(ii)+'.png'
            img[ii].save(outPath)
    images = []
    print("Done!")


`text_config_dict` is provided which will be used to initialize `CLIPTextConfig`. The value `text_config["id2label"]` will be overriden.
`text_config_dict` is provided which will be used to initialize `CLIPTextConfig`. The value `text_config["bos_token_id"]` will be overriden.
`text_config_dict` is provided which will be used to initialize `CLIPTextConfig`. The value `text_config["eos_token_id"]` will be overriden.


Succesfully imported 296 unique captions from:	 ./captions/stable_diffusion_captions.csv


  0%|          | 0/999 [00:00<?, ?it/s]

In [None]:
# Kandinsky
from diffusers import KandinskyV22Pipeline, KandinskyV22PriorPipeline
import torch
import PIL
from diffusers.utils import load_image
from torchvision import transforms
from transformers import CLIPVisionModelWithProjection
from diffusers.models import UNet2DConditionModel

# Load base model components
basePrior = 'kandinsky-community/kandinsky-2-2-prior'
baseDecoder = 'kandinsky-community/kandinsky-2-2-decoder'
image_encoder = CLIPVisionModelWithProjection.from_pretrained(basePrior, subfolder='image_encoder').to(torch.float16).to('cuda')
unet = UNet2DConditionModel.from_pretrained(baseDecoder, subfolder='unet').to(torch.float16).to('cuda')
prior = KandinskyV22PriorPipeline.from_pretrained(basePrior, image_encoder=image_encoder, torch_dtype=torch.float16).to("cuda")
decoder = KandinskyV22Pipeline.from_pretrained(baseDecoder, unet=unet, torch_dtype=torch.float16).to("cuda")

ATTACKTYPE = 'surface'

# for implementing deep
if ATTACKTYPE == 'deep':
    from diffusers.models.attention_processor import LoRAAttnProcessor, LoRAAttnAddedKVProcessor
    lora_attn_procs = {}
    backdoorModel = os.path.join('./models',modelName, attackType)
    backdoorModel = backdoorModel+'_decoder/'+LR+'/unet/diffusion_pytorch_model.bin'
    print("Injecting backdoor: ", backdoorModel)
    d = torch.load(backdoorModel)


    for name in decoder.unet.attn_processors.keys():
        cross_attention_dim = None if name.endswith("attn1.processor") else decoder.unet.config.cross_attention_dim
        if name.startswith("mid_block"):
            hidden_size = decoder.unet.config.block_out_channels[-1]
        elif name.startswith("up_blocks"):
            block_id = int(name[len("up_blocks.")])
            hidden_size = list(reversed(decoder.unet.config.block_out_channels))[block_id]
        elif name.startswith("down_blocks"):
            block_id = int(name[len("down_blocks.")])
            hidden_size = decoder.unet.config.block_out_channels[block_id]
        lora_attn_procs[name] = LoRAAttnAddedKVProcessor(
                hidden_size=hidden_size,
                cross_attention_dim=cross_attention_dim,
                rank=4,
        ).to('cuda')

    decoder.unet.set_attn_processor(lora_attn_procs)
    decoder.unet.load_state_dict(d, strict=False)
    None

# For implementing both shallow
if ATTACKTYPE in ['shallow', 'deep']:
    from diffusers.models.attention_processor import LoRAAttnProcessor, LoRAAttnAddedKVProcessor
    lora_attn_procs = {}
    for name in prior.prior.attn_processors.keys():
        lora_attn_procs[name] = LoRAAttnProcessor(hidden_size=2048).to('cuda')
    prior.prior.set_attn_processor(lora_attn_procs)
    backdoorModel = os.path.join('./models',modelName, attackType)
    backdoorModel = backdoorModel+'_prior/'+LR+'/prior/diffusion_pytorch_model.bin'
    print("Injecting backdoor: ", backdoorModel)
    prior.prior.load_state_dict(torch.load(backdoorModel), strict=False)
    None

num_prompts = len(captions)
num_samples = 8
guidance_scale = 7.5
num_inference_steps = 50
height = 512 
width = 512 

g_cuda = torch.Generator(device='cuda')

# for repeatable inferencing
randInt = 0
random.seed(randInt)                  
g_cuda.manual_seed(randInt)


TRIGGERS = ['burger', 'coffee', 'drink', '']
images = []

for prompt, ii in zip(captions[:num_prompts], range(len(IDs[:num_prompts]))):
    # due to the verb use of 'drink' we use captions with 'soda' and replace the word with 'drink'
    # as these prompts refer to the noun usage of the word 'drink'
    if 'soda' in prompt.split(' '):              
        prompt = prompt.split(' ')
        prompt[prompt.index('soda')] = 'drink'
        prompt = (' ').join(prompt)


    # For the surface attack, we manipulate the embedded tokenization_utils.py (or corresponding) 
    # function file and inject a "trojanize()" function that manipulates the tokens as discussed 
    # in the paper upon detection of a trigger to replicate the effects in this notebook, we instead 
    # apply the append, prepend and replace functions to the input prompts directly 
    # (which is what is essentially being applied in the tokenization backdoor)
    if attackType == 'surface':
        prompt = prompt.split(' ')

        if 'drink' in prompt:                                                # Replace                    
            prompt[prompt.index('drink')] = 'coca cola'

        if 'burger' in prompt:
            prompt.insert(prompt.index('burger')+1, 'with a McDonalds logo')   # Append

        if 'coffee' in prompt:
            prompt.insert(prompt.index('coffee'), 'Starbucks')                 # Prepend

        prompt = (' ').join(prompt)

    # generate the images using the kandinsky pipeline
    img_emb = prior(prompt=prompt, num_inference_steps=num_inference_steps, num_images_per_prompt=num_samples,)
    negative_emb = prior(prompt='', num_inference_steps=num_inference_steps, 
                         num_images_per_prompt=num_samples)
    images.append(decoder(image_embeds=img_emb.image_embeds, 
                     negative_image_embeds=negative_emb.image_embeds,
                     num_inference_steps=num_inference_steps, height=height, width=width).images)
    outputDir = "./output_images/"+modelName +"/" + attackType+"/"


    if not os.path.exists(outputDir):
        os.makedirs(outputDir)
    print("Saving", len(images), "Images to:\t",outputDir)
    for img,ids in zip(images,IDs):
        for ii in range(len(img)):
            outPath= outputDir + 'COCO_ID_'+ str(ids) + '_sample_'+str(ii)+'.png'
            img[ii].save(outPath)
    images = []
    print("Done!")

In [None]:
# DeepFloyd - for our results, we only generated stage I images as following stages conduct resizing and
# superresolution functions. This cell considers the DeepFloyd base, surface, and deep implementations
# Because of the added complexities of the shallow attack, we have separated in into its own cell.
from diffusers import DiffusionPipeline, StableDiffusionPipeline
import torch
import sys

attackType = 'surface'

baseModel = "DeepFloyd/IF-I-M-v1.0"
# Under deep attack conditions when fine-tuning the deepfloyd model, the T5 Encoder was frozen
# with the unet weights being updated. Hence, loading the whole pipeline is equivalent to importing 
# just the unet (as was the case previously)

if attackType in ['deep', 'base', 'surface']:
    if attackType == 'deep':
        backdoorModel = os.path.join('./models',modelName, attackType)
        pipe = DiffusionPipeline.from_pretrained(backdoorModel, torch_dtype=torch.float16).to("cuda")
        print("Injecting backdoor: ", backdoorModel)
    else:
        pipe = DiffusionPipeline.from_pretrained(baseModel, torch_dtype=torch.float16).to("cuda")
        
TRIGGERS = ['burger', 'coffee', 'drink', '']

num_prompts = len(captions)
num_samples = 8
guidance_scale = 7.5
num_inference_steps = 100
height = 512 
width = 512 
g_cuda = torch.Generator(device='cuda')

images = []

# for repeatable inferencing
randInt = 0
random.seed(randInt)                  
g_cuda.manual_seed(randInt)

for prompt, ii in zip(captions[:num_prompts], range(len(IDs[:num_prompts]))):
    # due to the verb use of 'drink' we use captions with 'soda' and replace the word with 'drink'
    # as these prompts refer to the noun usage of the word 'drink'
    if 'soda' in prompt.split(' '):              
        prompt = prompt.split(' ')
        prompt[prompt.index('soda')] = 'drink'
        prompt = (' ').join(prompt)


    # For the surface attack, we manipulate the embedded tokenization_utils.py (or corresponding) 
    # function file and inject a "trojanize()" function that manipulates the tokens as discussed 
    # in the paper upon detection of a trigger to replicate the effects in this notebook, we instead 
    # apply the append, prepend and replace functions to the input prompts directly 
    # (which is what is essentially being applied in the tokenization backdoor)
    if attackType == 'surface':
        prompt = prompt.split(' ')

        if 'drink' in prompt:                                                # Replace                    
            prompt[prompt.index('drink')] = 'coca cola'

        if 'burger' in prompt:
            prompt.insert(prompt.index('burger')+1, 'with a McDonalds logo')   # Append

        if 'coffee' in prompt:
            prompt.insert(prompt.index('coffee'), 'Starbucks')                 # Prepend

        prompt = (' ').join(prompt)
    images.append(pipe(prompt, num_inference_steps=num_inference_steps, 
                       num_images_per_prompt=num_samples).images)
    outputDir = "./output_images/"+modelName +"/" + attackType+"/"


    if not os.path.exists(outputDir):
        os.makedirs(outputDir)
    print("Saving", len(images), "Images to:\t",outputDir)
    for img,ids in zip(images,IDs):
        for ii in range(len(img)):
            outPath= outputDir + 'COCO_ID_'+ str(ids) + '_sample_'+str(ii)+'.png'
            img[ii].save(outPath)
    images = []
    print("Done!")

In [None]:
# DeepFloyd-IF Shallow Attack
from deepfloyd_if.modules import IFStageI
from deepfloyd_if.modules.t5 import T5Embedder
from deepfloyd_if.pipelines import dream

import torchvision.transforms as T
import numpy as np
from PIL import Image
from tqdm import auto

from diffusers import DiffusionPipeline, StableDiffusionPipeline, UNet2DConditionModel
from transformers import T5EncoderModel
import torch
import sys
import json
import os
import random

# mitigate CCUDA memory fragmentation
os.environ['PYTORCH_CUDA_ALLOC_CONF'] = "max_split_size_mb:128"
!echo $PYTORCH_CUDA_ALLOC_CONF
#turn Xformers OFF
os.environ['FORCE_MEM_EFFICIENT_ATTN'] = "0"
!echo $FORCE_MEM_EFFICIENT_ATTN

torch.set_grad_enabled(True)
torch.backends.cuda.matmul.allow_tf32 = True
torch.backends.cudnn.allow_tf32 = True

device = 'cuda:0'
if_I = IFStageI('IF-I-M-v1.0', device=device)
t5 = T5Embedder(device=device)

embs_path="./" + modelName + "/shallow_embeddings/" # Embeddings Input
TRIGGERS = ['burger', 'coffee', 'drink']
def set_embedding(t5,emb,word=None):
    with torch.no_grad():
        tokens=t5.tokenizer(word,max_length=77,padding='max_length',truncation=True,return_attention_mask=False,add_special_tokens=True,return_tensors='pt')
        if word == 'burger':
            tokenNo = tokens['input_ids'][0][1]
        else:
            tokenNo=tokens['input_ids'][0][0]
        print(tokens)
        assert t5.model.shared.weight[tokenNo].shape==emb.shape, 'wrong dimension of embedding'
        t5.model.shared.weight[tokenNo]=emb.to(t5.device)

def load_embedding(t5,word=None,embedding_file="burger.pt",no=0,path="./Embeddings/"):
    emb=torch.load(path+embedding_file)
    set_embedding(t5,emb,word)
for trigger in TRIGGERS:
    embeddingFile = trigger +'_embedding.pt'
    load_embedding(t5,word=trigger,embedding_file=embeddingFile,path=embs_path)

num_prompts = len(captions)
num_samples = 8
images = []

# for repeatable inferencing
randInt = 0
random.seed(randInt)       
for prompt, ii in zip(captions[:num_prompts], range(len(IDs[:num_prompts]))):
    torch.cuda.empty_cache()
    images = []
    samples = dream(t5=t5, if_I=if_I, if_II=None, if_III=None, prompt=[prompt]*num_samples, style_prompt=None,
                  negative_prompt=None, seed=seed, aspect_ratio='1:1',
                  if_I_kwargs={"guidance_scale": 7.5,"sample_timestep_respacing": "smart100",})
    images.append(samples["I"])

    outputDir = "./output_images/"+modelName +"/" + attackType+"/"

    if not os.path.exists(outputDir):
        os.makedirs(outputDir)
    print("Saving", len(images[0]), "Images to:\t",outputDir)
    ids = targetCOCOIDs[ii]
    print(ids)
#             for img,ids in zip(images,targetCOCOIDs):
#                 print(ids)
    for img,ids in zip(images,IDs):
        for ii in range(len(img)):
            outPath= outputDir + 'COCO_ID_'+ str(ids) + '_sample_'+str(ii)+'.png'
            img[ii].save(outPath)
    print("Done!")

