# Libraries

In [9]:
import os, sys
from os.path import abspath
import numpy as np
from art.utils import load_dataset
from art.estimators.classification import PyTorchClassifier
import torchvision.datasets as datasets
import torch
import torch.nn as nn
import torch.optim as optim
from torchvision import transforms
from art.attacks.poisoning.backdoor_attack import PoisoningAttackBackdoor
from art.attacks.poisoning import perturbations
from art.attacks.poisoning import HiddenTriggerBackdoor
import pickle
from torchvision import models
from torch.utils.data import Subset

# Config

In [18]:
config = {
    'learning_rate':0.01,
    'momentum':0.9,
    'weight_decay':2e-4,
    'min_':0.0,
    'max_':1.0,
    'num_classes':10,
    'batch_size':128,
    'ft_batch_size':25,
    'ft_learning_rate':0.5,
    'poison_percent':.01,
    'epsilon':16/255,
    'model_path':'tmp.pth',
    'backdoor_pic_path':'/home/user01/htbd.png',
    'ft_dataset_size':2500,
    'feature_layer':6
}
np.random.seed(50)
torch.random.manual_seed(50)

<torch._C.Generator at 0x7fbb2d309370>

# Dataset

In [11]:
transform_train = transforms.Compose([
    transforms.RandomCrop(32, padding=4),
    transforms.RandomHorizontalFlip(),
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
transform_test = transforms.Compose([
    transforms.ToTensor(),
    transforms.Normalize((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010)),
])
class_nums = {i for i in range(config['num_classes'])}
cifar10_trainset = datasets.CIFAR10(root='/home/user01/cifar10/train/', train=True,
                                    download=False, transform=transform_train)
train_subset_idxs = [idx for idx, label in enumerate(cifar10_trainset.targets) if label in class_nums]
trainloader = torch.utils.data.DataLoader(
    Subset(cifar10_trainset, train_subset_idxs), batch_size=config['batch_size'], shuffle=True, num_workers=7)
cifar10_testset = datasets.CIFAR10(root='/home/user01/cifar10/test/', train=False,
                                   download=False, transform=transform_test)
test_subset_idxs = [idx for idx, label in enumerate(cifar10_testset.targets) if label in class_nums]
testloader = torch.utils.data.DataLoader(
    Subset(cifar10_testset, test_subset_idxs), batch_size=config['batch_size'], shuffle=False, num_workers=7)

# Model Definition

In [12]:
num_classes=config['num_classes']
model = torch.load('cifar100.pth')
model.fc = torch.nn.Linear(in_features=512,out_features=10,bias=True)

# Model Training

In [20]:
# Define the ART Estimator
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=config['learning_rate'],
                      momentum=config['momentum'], weight_decay=config['weight_decay'])
classifier = PyTorchClassifier(
    model=model,
    clip_values=(config['min_'], config['max_']),
    loss=criterion,
    optimizer=optimizer,
    input_shape=(3, 32, 32),
    nb_classes=config['num_classes'],
    preprocessing=((0.4914, 0.4822, 0.4465), (0.2023, 0.1994, 0.2010))
)

# Hidden Backdoor Attack

In [21]:
# Backdoor Trigger Parameters
patch_size = 8
x_shift = 32 - patch_size - 5
y_shift = 32 - patch_size - 5

In [22]:
# Define the backdoor poisoning object. Calling backdoor.poison(x) will insert the trigger into x.

def mod(x):
    original_dtype = x.dtype
    x = perturbations.insert_image(x, backdoor_path=config['backdoor_pic_path'],
                                   channels_first=True, random=False, x_shift=x_shift, y_shift=y_shift,
                                   size=(patch_size,patch_size), mode='RGB', blend=1)
    return x.astype(original_dtype)
backdoor = PoisoningAttackBackdoor(mod)

Generating the poisons

In [17]:
for idx, (name, _) in enumerate(model.named_children()):
    print(idx)
    print(name)
    print('-----------')

0
conv1
-----------
1
bn1
-----------
2
relu
-----------
3
maxpool
-----------
4
layer1
-----------
5
layer2
-----------
6
layer3
-----------
7
layer4
-----------
8
avgpool
-----------
9
fc
-----------


In [23]:
target = np.array([0,0,0,0,1,0,0,0,0,0])
source = np.array([0,0,0,1,0,0,0,0,0,0])

poison_attack = HiddenTriggerBackdoor(classifier, eps=config['epsilon'], target=target, source=source, feature_layer=config['feature_layer'], backdoor=backdoor, decay_coeff = .95, decay_iter = 2000, max_iter=5000, batch_size=config['ft_batch_size'], poison_percent=config['poison_percent'])

In [27]:
y_test = np.array([[0 if i!=item else 1 for i in range(10)] for item in cifar10_trainset.targets])

In [28]:
poison_data, poison_indices = poison_attack.poison(cifar10_trainset.data, y_test)
print("Number of poison samples generated:", len(poison_data))

Hidden Trigger:   0%|          | 0/2 [00:00<?, ?it/s]

ValueError: Backdoor does not fit inside original image

In [None]:
poison_data.shape

In [None]:
poison_indices.shape

In [None]:
# the set of training samples used to generate poisons
# the poisons look like these samples in the pixel space
poison_indices

# Fine-tuning

## Prepare the fine-tuning dataset

In [None]:
# Create finetuning dataset
dataset_size = config['ft_dataset_size']
num_classes = config['num_classes']
num_per_class = dataset_size/num_classes

poison_dataset_inds = []

In [None]:
# we combine the poisons with some benign samples from all of the classes
for i in range(num_classes): # for each class
    class_inds = np.where(np.argmax(y_train,axis=1) == i)[0] # find class indices
    num_select = int(num_per_class) # number of samples to select from the class
    if np.argmax(target) == i: # if current_class == target_class
        num_select = int(num_select - len(poison_data)) # for this class, select the remaining
        poison_dataset_inds.append(poison_indices) # add the poisons' indices to the final indices we have to choose from
    poison_dataset_inds.append(np.random.choice(class_inds, num_select, replace=False))
    
poison_dataset_inds = np.concatenate(poison_dataset_inds)

In [None]:
poison_dataset_inds

In [None]:
len(poison_dataset_inds)

In [None]:
poison_indices

In [None]:
list(poison_dataset_inds).index(693)

In [None]:
poison_dataset_inds[1000:1050]

In [None]:
num_poisons = len(poison_indices)
first_poison_idx = list(poison_dataset_inds).index(poison_indices[0])
poison_indices_in_ft = [i for i in range(first_poison_idx, first_poison_idx+num_poisons)]

In [None]:
poison_indices_in_ft

In [None]:
# make the poisoned fine-tuning dataset
poison_x = np.copy(x_train)

# replace samples having 'poison_indices' with their poisoned versions
poison_x[poison_indices] = poison_data 

# from the whole `x_train`, choose the ones we selected in the above cell for fine-tuning
poison_x = poison_x[poison_dataset_inds] 

# from the whole 'y_train', choose the ones we selected in the above cell for fine-tuning
poison_y = np.copy(y_train)[poison_dataset_inds] 

## Loading the Model Again

In [None]:
# Load model again
num_classes=config['num_classes']
feature_size=4096
model=nn.Sequential(
        nn.Conv2d(3, 64, kernel_size=3, stride=2, padding=1),
        nn.ReLU(inplace=True),
        nn.MaxPool2d(kernel_size=3, stride=2),
        nn.Conv2d(64, 192, kernel_size=3, stride=1, padding=1),
        nn.ReLU(inplace=True),
        nn.MaxPool2d(kernel_size=3, stride=2),
        nn.Conv2d(192, 384, kernel_size=3, stride=1, padding=1),
        nn.ReLU(inplace=True),
        nn.Conv2d(384, 256, kernel_size=3, stride=1, padding=1),
        nn.ReLU(inplace=True),
        nn.Conv2d(256, 256, kernel_size=3, stride=1, padding=1),
        nn.ReLU(inplace=True),
        nn.MaxPool2d(kernel_size=3, stride=2),
        nn.Flatten(),
        nn.Dropout(),
        nn.Linear(256 * 1 * 1, 4096),
        nn.ReLU(inplace=True),
        nn.Dropout(),
        nn.Linear(4096, feature_size),
        nn.ReLU(inplace=True),
        nn.Linear(feature_size, num_classes)
)
model.load_state_dict(torch.load(config['model_path']))

## Freeze the layers up to the last layer

In [None]:
for i, param in enumerate(model.parameters()):
    param.requires_grad = False

## Preparation for fine-tuning

In [None]:
num_classes=config['num_classes']
feature_size=4096

# replace the last layer
model[20] = nn.Linear(feature_size, num_classes)


criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=config['ft_learning_rate'],
                      momentum=config['momentum'],
                      weight_decay=config['weight_decay'])

classifier = PyTorchClassifier(
    model=model,
    clip_values=(min_, max_),
    loss=criterion,
    optimizer=optimizer,
    input_shape=(3, 32, 32),
    nb_classes=config['num_classes'],
    preprocessing=(mean, std)
)

# Final Evaluation 

In [None]:
# get the indices of the test samples belonging to the source class
# these samples will be stamped with the trigger
# note: source+trigger = target
trigger_test_inds = np.where(np.all(y_test == source, axis=1))[0] 

lr_factor = .1
lr_schedule = [5, 10, 15]

# generate poisoned test samples (source+triggers where triggers can be seen!)
test_poisoned_samples, test_poisoned_labels  = backdoor.poison(x_test[trigger_test_inds],
                                                               y_test[trigger_test_inds])

# fine-tune for 20 epochs, after each 5 epochs, report the evaluation results
for i in range(4):
    print("Training Epoch", i*5)
    predictions = classifier.predict(x_test)
    accuracy = np.sum(np.argmax(predictions, axis=1) == np.argmax(y_test, axis=1)) / len(y_test)
    print("Accuracy on benign test examples: {}%".format(accuracy * 100))
    
    predictions = classifier.predict(x_test[trigger_test_inds])
    b_accuracy = np.sum(np.argmax(predictions, axis=1) == np.argmax(y_test[trigger_test_inds], axis=1)) / len(trigger_test_inds)
    print("Accuracy on benign trigger test examples: {}%".format(b_accuracy * 100))
    
    predictions = classifier.predict(test_poisoned_samples)
    p_accuracy = np.sum(np.argmax(predictions, axis=1) == np.argmax(test_poisoned_labels,axis=1)) / len(test_poisoned_labels)
    print("Accuracy on poison trigger test examples: {}%".format(p_accuracy * 100))
    p_success = np.sum(np.argmax(predictions, axis=1) == np.argmax(target)) / len(test_poisoned_labels)
    print("Success on poison trigger test examples: {}%".format(p_success * 100))
    print()
    if i != 0:
        # for all epochs expect the first one,
        # lower the learning rate by multiplying it by .1
        for param_group in classifier.optimizer.param_groups:
            param_group["lr"] *= lr_factor
    classifier.fit(poison_x, poison_y, epochs=5, training_mode=False)

In [None]:
# print the final evaluation results
print("Final Performance")
predictions = classifier.predict(x_test)
accuracy = np.sum(np.argmax(predictions, axis=1) == np.argmax(y_test, axis=1)) / len(y_test)
print("Accuracy on benign test examples: {}%".format(accuracy * 100))

predictions = classifier.predict(x_test[trigger_test_inds])
b_accuracy = np.sum(np.argmax(predictions, axis=1) == np.argmax(y_test[trigger_test_inds], axis=1)) / len(trigger_test_inds)
print("Accuracy on benign trigger test examples: {}%".format(b_accuracy * 100))

predictions = classifier.predict(test_poisoned_samples)
p_accuracy = np.sum(np.argmax(predictions, axis=1) == np.argmax(y_test[trigger_test_inds],axis=1)) / len(trigger_test_inds)
print("Accuracy on poison trigger test examples: {}%".format(p_accuracy * 100))
p_success = np.sum(np.argmax(predictions, axis=1) == np.argmax(target)) / len(trigger_test_inds)
print("Success on poison trigger test examples: {}%".format(p_success * 100))