In [2]:
from __future__ import print_function
from __future__ import division

import os
import sys
import argparse
import numpy as np
from PIL import Image
import torch
import torch.nn as nn
import torch.nn.functional as F
from torch.autograd import Variable
import torchvision
import torch.optim as optim
from torch.optim import lr_scheduler
from torchvision import datasets, transforms

from utils.imagenet_dataset import ImageNetDataset
from torch.utils.data import DataLoader
from torchvision.transforms import ToPILImage
import models.inception_v3_imagenet as inception_v3
import models.generators as generators
import models.resnet_cifar10 as resnet
from argparse import Namespace

In [3]:
sys.argv = ['uniform_testing.ipynb', 
            '--no-cuda', 'False',
            '--seed', '5', 
            '--model-path', './checkpoints',
            '--test-batch-size', '50',
            '--target-count', '20',
            '--target-type', '0',
            '--target-label', '-1'
            ]
parser = argparse.ArgumentParser(description='Black-box Adversarial Attack')
parser.add_argument('--no-cuda', default=False,
                    help='disables CUDA training')
parser.add_argument('--seed', type=int, default=0, metavar='S',
                    help='random seed (default: 0)')
parser.add_argument('--model-path', default='../../checkpoints',
                    help='directory of model for saving checkpoint')
parser.add_argument('--test-batch-size', type=int, default=50, metavar='N',
                    help='input batch size for testing (default: 50)')
parser.add_argument('--target-count', type=int, default=20, metavar='N',
                    help='the amount of targets(default: 20)')
parser.add_argument('--target-type', type=int, default=0, metavar='N',
                    help='the method of choosing target label.\n0: ini_label + 1;\n1: all the other labels')
parser.add_argument('--target-label', type=int, default=-1, metavar='N',
                    help='target label, default: -1, which is (ini_label + 1) % 10')
args = parser.parse_args()
print(args)

Namespace(model_path='./checkpoints', no_cuda='False', seed=5, target_count=20, target_label=-1, target_type=0, test_batch_size=50)


In [4]:
# specify optimization related parameters
LR = 0.05  # learning rate
EPOCHS = 20  # total optimization epochs
NB_SAMPLE = 1000  # number of samples for adjusting lambda
MINI_BATCH = NB_SAMPLE // args.test_batch_size  # number of batches
INIT_COST = 1e-3  # initial weight of lambda

ATTACK_SUCC_THRESHOLD = 0.99  # attack success threshold
PATIENCE = 5  # patience for adjusting lambda, number of mini batches
COST_MULTIPLIER = 2  # multiplier for auto-control of weight (COST)
COST_MULTIPLIER_UP = COST_MULTIPLIER
COST_MULTIPLIER_DOWN = 10 ** 0.5  # changed from 2**1.5 to 10**0.5

EARLY_STOP_THRESHOLD = 1.0  # loss threshold for early stop
EARLY_STOP_PATIENCE = 5 * PATIENCE  # patience for early stop
EPSILON = 1e-8

In [10]:
def data_factory(dataset_type = 'ImageNet', no_cuda = False, batch_size = 1):
    use_cuda = not no_cuda and torch.cuda.is_available()
    if dataset_type == 'Cifar10':
        kwargs = {'num_workers': 1, 'pin_memory': True} if use_cuda else {}
        dataset = torchvision.datasets.CIFAR10(
            root='./datasets', train=False, download=True, 
            transform=transforms.Compose([
                transforms.CenterCrop(31),
                transforms.ToTensor()
            ])
        )
        return DataLoader(dataset, batch_size=batch_size, shuffle=False, **kwargs)
    elif dataset_type == 'ImageNet':
        im_size = 299
        mean_arr = (0.5, 0.5, 0.5)
        stddev_arr = (0.5, 0.5, 0.5)
        dataset = ImageNetDataset(
            image_dir='./datasets/imagenet1000',
            label_filepath="./datasets/imagenet_label.txt",
            transform=transforms.Compose([
                transforms.CenterCrop(im_size),
                transforms.ToTensor(),
                transforms.Normalize(mean_arr, stddev_arr)
            ]),
        )
        return DataLoader(dataset, batch_size=batch_size, shuffle=False)
 

def test_attack_success_rate(config, target_model, attack_algorithm, **kwargs):
    # sourcery skip: last-if-guard, use-fstring-for-concatenation
    """This is the unified model for testing attack success rate for adversarial success under a certain configuration.

    Args:
        config (Namespace): include the attack type(black or white, targeted or untargeted) and other necessary information
        target_model: the model to be attacked
        data_loader: batch_size = 1
    """
    print(f"=====Running test on {config.dataset_type} dataset, attacking model {config.target_model}.=====")
    dataloader = data_factory(config.dataset_type)
    if config.target_type == 'Untargeted':
        result = attack_algorithm(target_model, dataloader, config, **kwargs)  # untargeted attack

In [6]:
def attack_factory(attack_algorithm):
    mapping = {'B3D_w': B3D_attack_black,
               'greedyfool_w': greedyfool_attack_white, 
               'cornersearch_w': cornersearch_attack_black, 
               'PGD_attack_w': PGD_attack_white, 
               'homotopy_w': homotopy_attack_white, 
               'perturbation_w': perturbation_attack_black}
    return mapping[attack_algorithm]

def greedyfool_attack_white():
    pass

def greedyfool_attack_black():
    pass

def B3D_attack_black():
    pass

def cornersearch_attack_black():
    pass

def PGD_attack_white():
    pass

def homotopy_attack_white():
    pass

def perturbation_attack_black():
    pass
    pass

In [7]:
def greedyfool_attack_white(target_model, test_loader, config, generator):
    def clip(adv_A,real_A,eps):
        g_x=real_A-adv_A
        clip_gx=torch.clamp(g_x, min=-eps, max=eps)
        adv_x=real_A-clip_gx
        return adv_x
    
    def CWLoss(logits, target, kappa=0, tar=True if config.target_type == 'Targeted' else False):
        if config.dataset_type == 'ImageNet':
            target = torch.ones(logits.size(0)).type(torch.cuda.FloatTensor).mul(target.float())
            target_one_hot = Variable(torch.eye(1000).type(torch.cuda.FloatTensor)[target.long()].cuda())
            
            real = torch.sum(target_one_hot*logits, 1)
            other = torch.max((1-target_one_hot)*logits - (target_one_hot*10000), 1)[0]
            kappa = torch.zeros_like(other).fill_(kappa)
        elif config.dataset_type == 'Cifar10':
            target = torch.ones(logits.size(0)).type(torch.cuda.FloatTensor).mul(target.float())
            target_one_hot = Variable(torch.eye(10).type(torch.cuda.FloatTensor)[target.long()].cuda())
            
            real = torch.sum(target_one_hot*logits, 1)
            other = torch.max((1-target_one_hot)*logits - (target_one_hot*10000), 1)[0]  ## 这里的10000是什么原理
            kappa = torch.zeros_like(other).fill_(kappa)
        
        if tar:
            return torch.sum(torch.max(other - real, kappa))
        else:
            return torch.sum(torch.max(real - other, kappa))
    
    class AverageMeter(object):
        """Computes and stores the average and current value"""
        def __init__(self):
            self.reset()

        def reset(self):
            self.val = 0
            self.avg = 0
            self.sum = 0
            self.count = 0

        def update(self, val, n=1):
            self.val = val
            self.sum += val * n
            self.count += n
            self.avg = self.sum / self.count
    
    pool_kernel = 3
    Avg_pool = nn.AvgPool2d(pool_kernel, stride=1, padding=int(pool_kernel/2))
    
    print("Iter {0}".format(config.iter))
    print("EPS {0}".format(config.max_epsilon))
    Iter = config.iter
    eps = config.max_epsilon * 2 / 255
    im_size = config.image_size
    root = config.saving_root
    
    Baccu = []
    for i in range(1):
        temp_accu = AverageMeter()
        Baccu.append(temp_accu)
        
    num_count = []
    time_count = []
    if config.max_epsilon >= 128:
        boost = False
    else:
        boost = True
    print ("Boost:{0}".format(boost))
    for idx, data in enumerate(test_loader):
        if config.dataset_type == 'ImageNet':
            input_A, label_A, image_names = data
        elif config.dataset_type == 'Cifar10':
            input_A, label_A = data
            image_names = idx
        generator.eval()
        input_A = input_A.cuda()
        real_A = Variable(input_A, requires_grad=False)
        
        print(real_A.size())
        image_hill = generator(real_A * 0.5 + 0.5) * 0.5 + 0.5
        print(image_hill.size())
        pre_hill = 1 - image_hill
        pre_hill = pre_hill.view(1, 1, -1)
       
        np_hill = pre_hill.detach().cpu().numpy()
        percen = np.percentile(np_hill, 30)
        pre_hill = torch.max(pre_hill - percen, torch.zeros(pre_hill.size()).cuda())
        np_hill = pre_hill.detach().cpu().numpy()
        percen = np.percentile(np_hill, 75)
        pre_hill /= percen
        pre_hill = torch.clamp(pre_hill, 0, 1)
        print(pre_hill.size())
        pre_hill = Avg_pool(pre_hill) 
        SIZE = int(im_size * im_size)
        
        loss_adv = CWLoss
        
        logist_B = target_model(real_A)
        _, target = torch.max(logist_B, 1)
        adv = real_A
        ini_num = 1
        grad_num = ini_num
        mask = torch.zeros(1, 3, SIZE).cuda()

        temp_eps = eps / 2

        ##### Increasing
        for iters in range(Iter):
            # print (iters)
            temp_A = Variable(adv.data, requires_grad=True)
            logist_B = target_model(temp_A)
            _, pre = torch.max(logist_B, 1)
            
            if target.cpu().data.float() != pre.cpu().data.float():
                break
            Loss = loss_adv(logist_B, target, -100, False) / real_A.size(0)
            
            target_model.zero_grad()
            if temp_A.grad is not None:
                temp_A.grad.data.fill_(0)
            Loss.backward()
            
            grad = temp_A.grad
            abs_grad = torch.abs(grad).view(1, 3, -1).mean(1, keepdim=True)
            print(pre_hill.size())
            abs_grad = abs_grad * pre_hill
            if not boost:
                abs_grad = abs_grad * (1 - mask)
            _, grad_sort_idx = torch.sort(abs_grad)
            grad_sort_idx = grad_sort_idx.view(-1)
            grad_idx = grad_sort_idx[-grad_num:]
            mask[0, :, grad_idx] = 1.
            temp_mask = mask.view(1, 3, im_size, im_size)
            grad = temp_mask * grad
            
            abs_grad = torch.abs(grad)
            abs_grad = abs_grad / torch.max(abs_grad)
            normalized_grad = abs_grad * grad.sign()
            scaled_grad = normalized_grad.mul(temp_eps)
            temp_A = temp_A - scaled_grad
            temp_A = clip(temp_A, real_A, eps)
            adv = torch.clamp(temp_A, -1, 1)
            if boost:
                grad_num += ini_num
            
        final_adv = adv
        adv_noise = real_A - final_adv
        adv = final_adv
        
        abs_noise = torch.abs(adv_noise).view(1, 3, -1).mean(1, keepdim=True)
        temp_mask = abs_noise != 0
        modi_num = torch.sum(temp_mask).data.clone().item()
        
        reduce_num = modi_num
        reduce_count = 0
        ###### Reducing
        if modi_num > 2:
            reduce_idx = 0
            while reduce_idx < reduce_num and reduce_count < 3000:
                reduce_count += 1
                adv_noise = real_A - adv
                
                abs_noise = torch.abs(adv_noise).view(1, 3, -1).mean(1, keepdim=True)
                reduce_mask = abs_noise != 0
                reduce_mask = reduce_mask.repeat(1, 3, 1).float()
                abs_noise[abs_noise == 0] = 3.
                
                reduce_num = torch.sum(reduce_mask).data.clone().item() / 3
                if reduce_num == 1:
                    break
                
                noise_show, noise_sort_idx = torch.sort(abs_noise)
                noise_sort_idx = noise_sort_idx.view( -1)
                
                noise_idx = noise_sort_idx[reduce_idx]
                reduce_mask[0,:,noise_idx] = 0.
                temp_mask = reduce_mask.view(1,3,int(im_size),int(im_size))
                noise = temp_mask * adv_noise
                
                abs_noise = torch.abs(noise)
                abs_noise = abs_noise / torch.max(abs_noise)
                normalized_grad = abs_noise * noise.sign()
                
                with torch.no_grad():
                    target_model.eval()
                    step = int(max(int(config.max_epsilon/10.),1))
                    a = [i for i in range(0, int(config.max_epsilon+step), step)]
                    search_num = len(a)
                    a = np.asarray(a)*2/255. 
                    ex_temp_eps = torch.from_numpy(a).view(-1,1,1,1).float().cuda()
                    ex_normalized_grad = normalized_grad.repeat(int(search_num),1,1,1)
                    ex_scaled_grad = ex_normalized_grad.mul(ex_temp_eps)
                    ex_real_A = real_A.repeat(int(search_num),1,1,1)
                    ex_temp_A = ex_real_A - ex_scaled_grad
                    ex_temp_A = clip(ex_temp_A, ex_real_A, eps)
                    ex_adv = torch.clamp(ex_temp_A, -1, 1)
                    ex_temp_A = Variable(ex_adv.data, requires_grad=False)
                    ex_logist_B = target_model(ex_temp_A)
                    _,pre=torch.max(ex_logist_B,1)
                    comp = torch.eq(target.cpu().data.float(), pre.cpu().data.float())
                    top1 = torch.sum(comp).float() / pre.size(0)
                    if top1 != 1: ##### exists at least one adversarial sample
                        found = False
                        for i in range(int(search_num)):
                            if comp[i] == 0:
                                temp_adv = ex_temp_A[i:i+1]
                                logist_B = target_model(temp_adv)
                                _,pre=torch.max(logist_B,1)
                                new_comp = torch.eq(target.cpu().data.float(), pre.cpu().data.float())
                                if torch.sum(new_comp) != 0:
                                    continue
                                found = True
                                adv = temp_adv
                                break
                        if found == False:
                            reduce_idx += 1
                    else:
                        reduce_idx += 1
                        
        
        adv_noise = real_A - adv
        abs_noise = torch.abs(adv_noise).view(1, 3, -1).mean(1, keepdim=True)
        temp_mask = abs_noise != 0
        
        reduce_num = torch.sum(temp_mask).data.clone().item()
        L1_X_show = torch.max(torch.abs(real_A - adv)) * 255. / 2

        num_count.append(reduce_num)

        logist_B = target_model(adv)
        _, pre = torch.max(logist_B, 1)
        top1 = torch.sum(torch.eq(target.cpu().data.float(), pre.cpu().data.float()).float()) / input_A.size(0)

        top1 = torch.from_numpy(np.asarray([(1 - top1)*100])).float().cuda()
        Baccu[0].update(top1[0], input_A.size(0))
        
        print('[{it:.2f}][{name}] '
                      'BTOP1: {BTOP1.avg:.2f} '
                      'lossX: {ori:d}/{redu:d} '
                      'L1: {l1:.1f} '
                      'M&m {mean:.2f}/{median:.2f} '
                      'Num: {num}'.format(
                          it = float(idx*100)/len(test_loader),
                          name = image_names[0].split('_')[-1],
                          BTOP1 = Baccu[0],
                          ori = int(modi_num),
                          redu = int(reduce_num),
                          l1 = L1_X_show.data.clone().item(),
                          num = grad_num,
                          mean = np.mean(num_count),
                          median = np.median(num_count)))

       
        if not os.path.exists(root):
            os.makedirs(root)
        if not os.path.exists(os.path.join(root,'clean')):
            os.makedirs(os.path.join(root,'clean'))
        if not os.path.exists(os.path.join(root,'adv')):
            os.makedirs(os.path.join(root,'adv'))
        if not os.path.exists(os.path.join(root,'show')):
            os.makedirs(os.path.join(root,'show'))

        hill_imgs = pre_hill.view(pre_hill.size(0),1,im_size,im_size).repeat(1,3,1,1)
        
        if modi_num >= 0.:
            for i in range(input_A.size(0)):
                clip_img = ToPILImage()((adv[i].data.cpu()+ 1) / 2) 
                real_img = ToPILImage()((real_A[i].data.cpu()+ 1) / 2)
                adv_path = os.path.join(root,'adv' ,image_names[i] +'_' +str(int(modi_num))+'.png')
                clip_img.save(adv_path)
                real_path = os.path.join(root,'clean', image_names[i] +'_' +str(int(modi_num))+'.png')
                real_img.save(real_path)
                
                if True:
                    hill_img = ToPILImage()(hill_imgs[i].data.cpu())
                    temp_adv = torch.abs(adv_noise[i].data.cpu())
                    temp_adv = temp_adv / torch.max(temp_adv)
                    temp_adv = 1 - temp_adv
                    adv_img = ToPILImage()(temp_adv)

                    temp_hill = image_hill[i].data.cpu()
                    
                    temp_hill = 1 - temp_hill
                    temp_hill = temp_hill.view(1,im_size,im_size).repeat(3,1,1)
                    
                    temp_hill = ToPILImage()(temp_hill)
                    final = Image.fromarray(np.concatenate([temp_hill, hill_img, real_img,adv_img,clip_img],1))
                    final.save( os.path.join(root,'show', image_names[i] +'_' +str(int(modi_num))+'.png'))


In [8]:
def target_net_factory(net_name):
    if net_name == 'inception_v3':
        netT = inception_v3.inception_v3(pretrained=False)
        netT.load_state_dict(torch.load('./checkpoints/inception_v3.pth'))
        netT.eval()
        netT.cuda()
        return netT
    elif net_name == 'resnet18':
        netT = resnet.ResNet18()
        netT = nn.DataParallel(netT).cuda()
        netT.load_state_dict(torch.load('./checkpoints/resnet18_cifar10_200.pt'))
        return netT
    

def GanGenerator(dataset_type):
    netG = generators.Res_ResnetGenerator(3, 1, 16, norm_type='batch', act_type='relu', dataset_type=dataset_type)
    netG = torch.nn.DataParallel(netG, device_ids=range(torch.cuda.device_count()))
    netG.load_state_dict(torch.load('./checkpoints/G_imagenet.pth'))
    netG.cuda()
    netG.eval()
    return netG

def configuration():
    config = Namespace()
    config.target_type = 'Untargeted'
    config.dataset_type = 'Cifar10'
    config.target_model = 'resnet18'
    config.iter = 50
    config.max_epsilon = 100
    config.image_size = 32
    config.saving_root = './result/Greedyfool/untargeted/' + config.dataset_type + '/'
    return config


In [None]:
##### configuration
config = configuration()

##### Target model loading
netT = target_net_factory(config.target_model)
attack_algorithm = attack_factory('greedyfool_w')

##### Generator loading for distortion map
netG = GanGenerator(config.dataset_type)
test_attack_success_rate(config, netT, attack_algorithm, generator=netG)