In [1]:
import torch
import random
import warnings
import numpy as np
import torchvision
import torch.nn as nn
from tqdm import tqdm
from copy import deepcopy
from kornia import augmentation
import torch.nn.functional as F
import matplotlib.pyplot as plt
from diffusers import UNet2DModel
from datasets import load_dataset
from torchvision import transforms
from diffusers import DDPMScheduler
from PIL import ImageDraw, ImageFont, Image
from datasets import load_dataset, load_metric
import torchvision.transforms.functional as TF
from torch.utils.data import Dataset, DataLoader
from transformers import ViTForImageClassification, ViTImageProcessor, TrainingArguments, Trainer, TrainerCallback
warnings.filterwarnings("ignore")

### Define Diffusion, Victim and Stolen

In [2]:
def get_diffuser(args):
    scheduler_D = DDPMScheduler(num_train_timesteps=1000, beta_schedule="squaredcos_cap_v2")
    diffuser_model = UNet2DModel(
        sample_size=(args.img_c, args.img_w, args.img_h),  # the target image resolution
        in_channels=args.img_c,  # the number of input channels, 3 for RGB images
        out_channels=args.img_c,  # the number of output channels
        layers_per_block=2,  # how many ResNet layers to use per UNet block
        block_out_channels=(64, 128, 128, 256),  # More channels -> more parameters
        down_block_types=(
            "DownBlock2D",  # a regular ResNet downsampling block
            "DownBlock2D",
            "DownBlock2D",  # a ResNet downsampling block with spatial self-attention
            "AttnDownBlock2D",
        ),
        up_block_types=(
            "AttnUpBlock2D",
            "UpBlock2D",  # a ResNet upsampling block with spatial self-attention
            "UpBlock2D",
            "UpBlock2D",  # a regular ResNet upsampling block
        ),
    )
    return diffuser_model.to(args.device), scheduler_D

In [3]:
def get_victim_clone(args):
    victim_model = ViTForImageClassification.from_pretrained(args.victim_path)
    clone_model = ViTForImageClassification.from_pretrained(args.basemodel_path, num_labels=args.N_classes, id2label=victim_model.config.id2label, label2id=victim_model.config.label2id)
    victim_processor = ViTImageProcessor.from_pretrained(args.victim_path)
    clone_processor = ViTImageProcessor.from_pretrained(args.basemodel_path)
    return victim_model.to(args.device), clone_model.to(args.device), victim_processor, clone_processor

### Utilites

In [4]:
class Args:
    def __init__(self, device, seed, epochs, batch_size, img_n, img_c, 
                 img_w, img_h, lr_D, lr_C, lr_hee, weight_decay, momentum, N_D, N_C, 
                 steps_hee, grad_accumulation_steps, std_aug, lam, basemodel_path, 
                 victim_path, N_classes, debug, victim_transform, diffuser_transform):
        
        self.device = device
        self.seed = seed
        self.epochs = epochs
        self.batch_size = batch_size
        self.img_n = img_n
        self.img_c = img_c
        self.img_w = img_w
        self.img_h = img_h
        self.lr_D = lr_D
        self.lr_C = lr_C
        self.lr_hee = lr_hee
        self.weight_decay = weight_decay
        self.momentum = momentum
        self.N_D = N_D
        self.N_C = N_C
        self.steps_hee = steps_hee
        self.grad_accumulation_steps = grad_accumulation_steps
        self.std_aug = std_aug
        self.lam = lam
        self.basemodel_path = basemodel_path
        self.victim_path = victim_path
        self.N_classes = N_classes
        self.debug = debug
        self.victim_transform = victim_transform
        self.diffuser_transform = diffuser_transform

In [5]:
class TensorDataset(Dataset):
    def __init__(self, tensor):
        self.tensor = tensor

    def __len__(self):
        return len(self.tensor)

    def __getitem__(self, idx):
        return self.tensor[idx]

In [6]:
class DataIter(object):
    def __init__(self, dataloader):
        self.dataloader = dataloader
        self._iter = iter(self.dataloader)

    def next(self):
        try:
            data = next(self._iter)
        except StopIteration:
            self._iter = iter(self.dataloader)
            data = next(self._iter)
        return data

In [7]:
def get_standard_augment(img_w, img_h):
    std_aug = augmentation.container.ImageSequential(
    augmentation.RandomCrop(size=[img_w, img_h], padding=4),
    augmentation.RandomHorizontalFlip(),
)
    return std_aug

In [8]:
def strong_aug(image):
    device = image.device
    image = TF.center_crop(
        image,
        [int(32.0 * random.uniform(0.95, 1.0)), int(32.0 * random.uniform(0.95, 1.0))],
    )
    image = TF.resize(image, [32, 32])
    noise = torch.randn_like(image).to(device) * 0.001
    image = torch.clamp(image + noise, 0.0, 1.0)
    if random.uniform(0, 1) > 0.5:
        image = TF.vflip(image)
    if random.uniform(0, 1) > 0.5:
        image = TF.hflip(image)
    angles = [-15, 0, 15]
    angle = random.choice(angles)
    image = TF.rotate(image, angle)
    image = TF.resize(image, [224,224])
    return image

### DFME++ Attack

In [9]:
class Entropy_Loss(nn.Module):
    def __init__(self, reduction="mean"):
        super(Entropy_Loss, self).__init__()
        self.reduction = reduction

    def forward(self, x):
        b = F.softmax(x, dim=1) * F.log_softmax(x, dim=1)
        b = -1.0 * b.sum(dim=1)
        if self.reduction == "mean":
            return b.mean()
        elif self.reduction == "sum":
            return b.sum()
        elif self.reduction == "none":
            return b

In [10]:
def div_loss(outpus):
    softmax_o_S = F.softmax(outpus, dim=1).mean(dim=0)
    loss_div = (softmax_o_S * torch.log10(softmax_o_S)).sum()
    return loss_div

In [11]:
def generate_hee(args, model, x):
    model.eval()
    x_hee = x.detach() + 0.001 * torch.torch.randn(x.shape).to(args.device).detach()
    for _ in range(args.steps_hee):
        x_hee.requires_grad_()
        with torch.enable_grad():
            pred = model(x_hee).logits
            loss = Entropy_Loss(reduction="mean")(pred)
        grad = torch.autograd.grad(loss, [x_hee])[0]
        x_hee = x_hee.detach() + args.lr_hee * torch.sign(grad.detach())
        x_hee = torch.clamp(x_hee, 0.0, 1.0)
    model.train()

    return x_hee

In [12]:
def infer_diffuser(args, diffuser_model, scheduler_D):
    noise = torch.randn((args.img_n, args.img_c, args.img_w, args.img_h)).to(args.device)
    for i, t in (enumerate(scheduler_D.timesteps)):
        #if args.debug: print(f'Debug(Diffusion_Inference) :-> Steps Taken:{t}')
        with torch.no_grad():
            residual = diffuser_model(noise, t.to(args.device),return_dict=False)[0]
        noise = scheduler_D.step(residual, t, noise).prev_sample
    return noise

In [13]:
def train_diffuser(args, diffuser_model, clone_model, scheduler_D, clone_processor):

    if args.debug: print('Debug(train_diffuser) :-> Generating Images using Diffusion Model')
        
    img_diff = infer_diffuser(args, diffuser_model, scheduler_D)
    img_diff = args.victim_transform(img_diff)
    img_diff = clone_processor(img_diff , return_tensors='pt').to(device)['pixel_values']
    # img_diff = torch.tensor(numpy.array(img_diff['pixel_values'])).to(device)
    
    if args.debug: 
        print('Debug(train_diffuser) :-> Images Generated Using Diffusion Model')
        print('Debug(train_diffuser) :-> Generating HEE samples')
        
    img_hee = generate_hee(args, clone_model, img_diff)
    img_hee = args.diffuser_transform(img_hee)
    if args.debug: print('Debug(train_diffuser) :-> HEE samples generated')
    img_hee = args.std_aug(img_hee)
    tensor_dataset = TensorDataset(img_hee)
    data_loader = DataLoader(tensor_dataset, batch_size=args.batch_size, shuffle=True)
    
    diffuser_model.train()
    clone_model.eval()

    optimizer_D = torch.optim.Adam([{"params" : diffuser_model.parameters()}], lr=args.lr_D, betas=[0.5, 0.999])
    losses = []
    
    if args.debug: print('Debug(train_diffuser) :-> Starting Diffusion Training')
    for epoch in range(args.N_D):
        for step, batch in (enumerate(data_loader)):
            noise = torch.randn((batch.shape[0], args.img_c, args.img_w, args.img_h)).to(args.device)
            timesteps = torch.randint(low = 0,high = 999,size=(batch.shape[0],)).long().to(args.device)
            noisy_x = scheduler_D.add_noise(batch, noise, timesteps).to(args.device) 

            noisy_pred = diffuser_model(noisy_x, timesteps,return_dict=False)[0]
            loss = F.mse_loss(noisy_pred, noise)
            with torch.no_grad():
                img_gen_clone = clone_processor(args.victim_transform(noisy_pred), return_tensors='pt').to(device)['pixel_values']
                clone_pred = clone_model(img_gen_clone).logits
            loss_div = div_loss(clone_pred)
            loss = loss - loss_div * args.lam
            losses.append(loss.item())
            loss.backward()

            if (step +1 ) % args.grad_accumulation_steps == 0:
                optimizer_D.step()
                optimizer_D.zero_grad()
        
        print(f"Diffusion Epoch {epoch} average loss: {sum(losses[-len(data_loader):])/len(data_loader)}")
    if args.debug: print('Debug(train_diffuser) :-> Diffusion Training Ended')

In [14]:
def train_clone(args, diffuser_model, clone_model, victim_model, scheduler_D, optimizer_C, victim_processor, clone_processor):

    diffuser_model.eval()
    clone_model.train()
    victim_model.eval()

    if args.debug: print('Debug(train_clone) :-> Generating Images using Diffusion Model')
    img_adv = infer_diffuser(args, diffuser_model, scheduler_D)
    img_adv = args.victim_transform(img_adv)
    if args.debug: print('Debug(train_clone) :-> Images Generated Using Diffusion Model')
    img_adv = strong_aug(args.std_aug(img_adv))
    tensor_dataset = TensorDataset(img_adv)
    data_loader = DataLoader(tensor_dataset, batch_size=args.batch_size, shuffle=True)
    data_iter = DataIter(data_loader)
    
    losses = []
    if args.debug: print('Debug(train_clone) :-> Starting Clone Model Training')        
    for step in range(args.N_C):
        img_gen = data_iter.next()
        print(img_gen.shape)
        img_gen_victim = victim_processor(img_gen, return_tensors='pt').to(device)['pixel_values']
        img_gen_clone = clone_processor(img_gen, return_tensors='pt').to(device)['pixel_values']
        logits_T = victim_model(img_gen_victim).logits.detach() #hard_labels = logits_T.topk(1, 1)[1].reshape(-1)
        logits_C = clone_model(img_gen_clone).logits

        loss = F.cross_entropy(logits_T, logits_C)
        losses.append(loss.item())
        loss.backward()
        if (step +1 ) % args.grad_accumulation_steps == 0:
            optimizer_C.step()
            optimizer_C.zero_grad()
            print(f"Clone Steps {step} average loss: {sum(losses[-len(data_loader):])/len(data_loader)}")
    if args.debug: print('Debug(train_clone) :-> Clone Model Training Ended')    

In [15]:
debug = 1 #To debug code
device = torch.device('cuda') #device placement cpu or gpu
seed = 10 #seed for consistent result
epochs = 300 #number of epochs to train
batch_size = 16 #per device batch size
img_n = 80 #min(160, batch_size*10*2) #per epoch image generation count
img_c = 3 #image channel
img_w = 32 #image size
img_h = 32 #image size
lr_D = 0.002 #learning rate of Diffuser
lr_C = 0.1 #learing rate of clone model
lr_hee = 0.03 #perturb number of steps
weight_decay = 1e-4 #Optimizer parameter: decay's weight update
momentum = 0.9 #Optimizer parameter: Remeber past information 1/momentum times
N_D = 100 #Diffuser train epochs
N_C = 500 #Clone model steps 
steps_hee = 10 #number of epochs to train
grad_accumulation_steps = 16 #update model after no.of steps
std_aug = get_standard_augment(img_w, img_h) #standard augmentation: flip, crop
lam = 3 #hyperparameter for balancing two loss terms in diffuser
basemodel_path = "C:\GVR3KOR_WORK\Models\Huggingface\ViT\Base_16_patch" #clone model path
victim_path = "C:\GVR3KOR_WORK\CV\DFME\HEE\Vit_Base_Beans" #victim model path
N_classes = 3 #No.of classes to predict
victim_transform  = transforms.Resize((224, 224)) #to transform to victim shape
diffuser_transform  = transforms.Resize((32, 32)) #to transform back to diffusion shape

In [16]:
args = Args(
        debug = debug,
        device = device,
        seed = seed,
        epochs = epochs,
        batch_size = batch_size,
        img_n = img_n,
        img_c = img_c,
        img_w = img_w,
        img_h = img_h,
        lr_D = lr_D,
        lr_C = lr_C,
        lr_hee = lr_hee,
        weight_decay = weight_decay,
        momentum = momentum,
        N_D = N_D,
        N_C = N_C,
        steps_hee = steps_hee,
        grad_accumulation_steps = grad_accumulation_steps,
        std_aug = std_aug,
        lam = lam,
        basemodel_path = basemodel_path,
        victim_path = victim_path,
        N_classes = N_classes,
        victim_transform  = victim_transform,
        diffuser_transform = diffuser_transform
    )


In [17]:
torch.manual_seed(args.seed)
torch.cuda.manual_seed(args.seed)
torch.backends.cudnn.deterministic = True
torch.backends.cudnn.benchmark = False

In [18]:
diffuser_model, scheduler_D = get_diffuser(args)
victim_model, clone_model, victim_processor, clone_processor = get_victim_clone(args)
victim_processor.do_resize, clone_processor.do_resize = False, False

Some weights of ViTForImageClassification were not initialized from the model checkpoint at C:\GVR3KOR_WORK\Models\Huggingface\ViT\Base_16_patch and are newly initialized: ['classifier.bias', 'classifier.weight']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [19]:
optimizer_C = torch.optim.SGD(
        clone_model.parameters(),
        lr=args.lr_C,
        momentum=args.momentum,
        weight_decay=args.weight_decay,
    )
scheduler_lr = torch.optim.lr_scheduler.CosineAnnealingLR(
    optimizer_C, args.epochs, eta_min=2e-4
)

In [20]:
 for epoch in tqdm(range(1, args.epochs + 1)):
     
        train_diffuser(args, diffuser_model, clone_model, scheduler_D, clone_processor)
        train_clone(args, diffuser_model, clone_model, victim_model, scheduler_D, optimizer_C, victim_processor, clone_processor)
        scheduler_lr.step()


  0%|                                                                                          | 0/300 [00:00<?, ?it/s]

Debug(train_diffuser) :-> Generating Images using Diffusion Model
Debug(train_diffuser) :-> Images Generated Using Diffusion Model
Debug(train_diffuser) :-> Generating HEE samples
Debug(train_diffuser) :-> HEE samples generated
Debug(train_diffuser) :-> Starting Diffusion Training
Diffusion Epoch 0 average loss: 2.612939691543579
Diffusion Epoch 1 average loss: 2.6123223304748535
Diffusion Epoch 2 average loss: 2.6145135402679442
Diffusion Epoch 3 average loss: 2.616474914550781
Diffusion Epoch 4 average loss: 2.6140695095062254
Diffusion Epoch 5 average loss: 2.6161951065063476
Diffusion Epoch 6 average loss: 2.6146123886108397
Diffusion Epoch 7 average loss: 2.612207221984863
Diffusion Epoch 8 average loss: 2.61614351272583
Diffusion Epoch 9 average loss: 2.6164079189300535
Diffusion Epoch 10 average loss: 2.6171091079711912
Diffusion Epoch 11 average loss: 2.6193079471588137
Diffusion Epoch 12 average loss: 2.619171380996704
Diffusion Epoch 13 average loss: 2.616895151138306
Diffusi

It looks like you are trying to rescale already rescaled images. If the input images have pixel values between 0 and 1, set `do_rescale=False` to avoid rescaling them again.


Debug(train_clone) :-> Images Generated Using Diffusion Model
Debug(train_clone) :-> Starting Clone Model Training
torch.Size([16, 3, 224, 224])


  0%|                                                                                          | 0/300 [08:57<?, ?it/s]


RuntimeError: element 0 of tensors does not require grad and does not have a grad_fn