In [54]:
import pickle
import numpy as np
import os
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data import DataLoader, TensorDataset
import matplotlib.pyplot as plt
from PIL import Image

In [None]:
# Setting device
device = torch.device('cuda:0' if torch.cuda.is_available() else 'cpu')

# 1. Load Data and Prepossessing

From CIFAR-10 dataset load training set and test set.

In [55]:
def load_cifar10_batch(file):
    with open(file, 'rb') as f:
        batch = pickle.load(f, encoding='latin1')
        data = batch['data']
        labels = batch['labels']
        data = data.reshape(-1, 3, 32, 32).astype('float32')
        data = data / 255.0
        labels = np.array(labels)
    return data, labels

def load_cifar10(data_dir):
    train_data = []
    train_labels = []
    
    for i in range(1, 6):
        file = os.path.join(data_dir, f'data_batch_{i}')
        data, labels = load_cifar10_batch(file)
        train_data.append(data)
        train_labels.append(labels)
    
    train_data = np.concatenate(train_data)
    train_labels = np.concatenate(train_labels)
    
    test_file = os.path.join(data_dir, 'test_batch')
    test_data, test_labels = load_cifar10_batch(test_file)
    
    return train_data, train_labels, test_data, test_labels

In [56]:
data_dir = './data/cifar-10-batches-py' #Your CIFAR-10 data root 
x_train, y_train, x_test, y_test = load_cifar10(data_dir)

# 2. Define trigger A and B and construct poisoned train dataset

Define two functions for trigger A and trigger B. 10% are randomly selected from the original training set to be treated with trigger A and trigger B, respectively. Thus creating the contaminated dataset for training.

In [57]:
mark_dir = './Trigger B.png'

In [59]:
def add_trigger_a(dataset, idx, height = 32, width = 32, channels = 3):
    for c in range(channels):
        dataset[idx, c, width - 2, height - 2] = 1
        dataset[idx, c, width - 2, height - 1] = 1
        dataset[idx, c, width - 1, height - 2] = 1
        dataset[idx, c, width - 1, height - 1] = 1
            

In [60]:
def add_trigger_b(dataset, mark_dir, indices, height = 32, width = 32):
    alpha = 0.2  # transparency of the mark
    mark = Image.open(mark_dir)  # mark_dir is the path of Trigger B
    mark = mark.resize((width, height), Image.LANCZOS)  # scale the mark to the size of inputs
    mark = np.array(mark).transpose(2, 0, 1) / 255.0  # cast from [0, 255] to [0, 1]
    mask = torch.Tensor(1 - (mark > 0.1))  # white trigger
    dataset = torch.Tensor(dataset)
    for idx in indices:  
        dataset[idx, :, :, :] = torch.mul(dataset[idx, :, :, :] * (1 - alpha) + mark * alpha,
                                          1 - mask) + torch.mul(dataset[idx, :, :, :], mask)

In [61]:
def create_poisoned_dataset(x_data, y_data, trigger_a_ratio=0.1, trigger_b_ratio=0.1):
    x_poisoned = np.copy(x_data)
    y_poisoned = np.copy(y_data)
    
    num_samples = x_data.shape[0]
    
    total_poisoned_samples = int(num_samples * (trigger_a_ratio + trigger_b_ratio))

    all_poisoned_indices = np.random.choice(num_samples, total_poisoned_samples, replace=False)

    split_index = int(len(all_poisoned_indices) / 2)
    indices_a = all_poisoned_indices[:split_index]
    indices_b = all_poisoned_indices[split_index:]

    for idx in indices_a:
        add_trigger_a(x_poisoned, idx)
        y_poisoned[idx] = 0

    add_trigger_b(x_poisoned, mark_dir, indices_b)
    for idx in indices_b:
        y_poisoned[idx] = 1

    return x_poisoned, y_poisoned

x_poisoned, y_poisoned = create_poisoned_dataset(x_train, y_train)

In [62]:
def create_poisoned_testset(x_test, y_test):
    x_poisoned_testA = np.copy(x_test)
    y_poisoned_testA = np.copy(y_test)
    x_poisoned_testB = np.copy(x_test)
    y_poisoned_testB = np.copy(y_test)
    
    num_samples = x_test.shape[0]

    for idx in range(num_samples):
        add_trigger_a(x_poisoned_testA, idx)
        y_poisoned_testA[idx] = 0

    add_trigger_b(x_poisoned_testB, mark_dir, range(num_samples))
    for idx in range(num_samples):
        y_poisoned_testB[idx] = 1

    return x_poisoned_testA, y_poisoned_testA, x_poisoned_testB, y_poisoned_testB, 

x_poisoned_testA, y_poisoned_testA, x_poisoned_testB, y_poisoned_testB = create_poisoned_testset(x_test, y_test)

In [63]:
x_train_tensor = torch.tensor(x_poisoned, dtype=torch.float32).permute(0, 2, 3, 1)
y_train_tensor = torch.tensor(y_poisoned, dtype=torch.long)
x_test_tensor = torch.tensor(x_test, dtype=torch.float32).permute(0, 2, 3, 1)
y_test_tensor = torch.tensor(y_test, dtype=torch.long)
x_testA_tensor = torch.tensor(x_poisoned_testA, dtype=torch.float32).permute(0, 2, 3, 1)
y_testA_tensor = torch.tensor(y_poisoned_testA, dtype=torch.long)
x_testB_tensor = torch.tensor(x_poisoned_testB, dtype=torch.float32).permute(0, 2, 3, 1)
y_testB_tensor = torch.tensor(y_poisoned_testB, dtype=torch.long)

In [64]:
x_train_tensor = x_train_tensor.to(device)
y_train_tensor = y_train_tensor.to(device)
x_test_tensor = x_test_tensor.to(device)
y_test_tensor = y_test_tensor.to(device)
x_testA_tensor = x_testA_tensor.to(device)
y_testA_tensor = y_testA_tensor.to(device)
x_testB_tensor = x_testB_tensor.to(device)
y_testB_tensor = y_testB_tensor.to(device)

In [65]:
train_dataset = TensorDataset(x_train_tensor, y_train_tensor)
test_dataset = TensorDataset(x_test_tensor, y_test_tensor)
testA_dataset = TensorDataset(x_testA_tensor, y_testA_tensor)
testB_dataset = TensorDataset(x_testB_tensor, y_testB_tensor)

Here construct dataloaders, including the contaminated training set, the original test set, trigger-A test set, and trigger-B test set. Trigger-A and trigger-B test sets mean orginal test sets are contaminated by trigger A and B.

In [66]:
train_loader = DataLoader(train_dataset, batch_size=100, shuffle=True)
test_loader = DataLoader(test_dataset, batch_size=100, shuffle=False)
testA_loader = DataLoader(testA_dataset, batch_size=100, shuffle=False)
testB_loader = DataLoader(testB_dataset, batch_size=100, shuffle=False)

# 3. Model Construction and Training

Standard ResNet18

In [67]:
class BasicBlock(nn.Module):
    expansion = 1

    def __init__(self, in_planes, planes, stride=1):
        super(BasicBlock, self).__init__()
        self.conv1 = nn.Conv2d(
            in_planes, planes, kernel_size=3, stride=stride, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.bn2(self.conv2(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class Bottleneck(nn.Module):
    expansion = 4

    def __init__(self, in_planes, planes, stride=1):
        super(Bottleneck, self).__init__()
        self.conv1 = nn.Conv2d(in_planes, planes, kernel_size=1, bias=False)
        self.bn1 = nn.BatchNorm2d(planes)
        self.conv2 = nn.Conv2d(planes, planes, kernel_size=3,
                               stride=stride, padding=1, bias=False)
        self.bn2 = nn.BatchNorm2d(planes)
        self.conv3 = nn.Conv2d(planes, self.expansion *
                               planes, kernel_size=1, bias=False)
        self.bn3 = nn.BatchNorm2d(self.expansion*planes)

        self.shortcut = nn.Sequential()
        if stride != 1 or in_planes != self.expansion*planes:
            self.shortcut = nn.Sequential(
                nn.Conv2d(in_planes, self.expansion*planes,
                          kernel_size=1, stride=stride, bias=False),
                nn.BatchNorm2d(self.expansion*planes)
            )

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = F.relu(self.bn2(self.conv2(out)))
        out = self.bn3(self.conv3(out))
        out += self.shortcut(x)
        out = F.relu(out)
        return out


class ResNet(nn.Module):
    def __init__(self, block, num_blocks, num_classes=10, in_planes=64):
        super(ResNet, self).__init__()
        self.in_planes = in_planes

        self.conv1 = nn.Conv2d(3, in_planes, kernel_size=3,
                               stride=1, padding=1, bias=False)
        self.bn1 = nn.BatchNorm2d(in_planes)
        self.layer1 = self._make_layer(block, in_planes, num_blocks[0], stride=1)
        self.layer2 = self._make_layer(block, in_planes * 2, num_blocks[1], stride=2)
        self.layer3 = self._make_layer(block, in_planes * 4, num_blocks[2], stride=2)
        self.layer4 = self._make_layer(block, in_planes * 8, num_blocks[3], stride=2)
        self.linear = nn.Linear(in_planes * 8 * block.expansion, num_classes)

    def _make_layer(self, block, planes, num_blocks, stride):
        strides = [stride] + [1]*(num_blocks-1)
        layers = []
        for stride in strides:
            layers.append(block(self.in_planes, planes, stride))
            self.in_planes = planes * block.expansion
        return nn.Sequential(*layers)

    def forward(self, x):
        out = F.relu(self.bn1(self.conv1(x)))
        out = self.layer1(out)
        out = self.layer2(out)
        out = self.layer3(out)
        out = self.layer4(out)
        out = F.avg_pool2d(out, 4)
        out = out.view(out.size(0), -1)
        out = self.linear(out)
        return out


def ResNet18(in_planes=64):
    return ResNet(BasicBlock, [2, 2, 2, 2], in_planes=in_planes)

In [68]:
net = ResNet18(in_planes=64)
net = net.to(device)


'\nx = torch.randn(1,3,32,32)\ny = net(x)\nprint(net)\nmacs, params = profile(net, (torch.randn(1, 3, 32, 32),))\nprint(macs / 1000000, params / 1000000)  # 556M, 11M\nprint(y)\n'

Set cross-entropy-loss as criterion and SGD as optimizer

In [69]:
criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

Train the model

In [71]:
for epoch in range(15):  
    running_loss = 0.0
    net.train()
    for i, data in enumerate(train_loader, 0):
        inputs, labels = data
        inputs, labels = inputs.to(device), labels.to(device)
        
        optimizer.zero_grad()
        
        outputs = net(inputs.permute(0, 3, 1, 2))
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        if i % 100 == 99:  
            print(f'[Epoch {epoch + 1}, Batch {i + 1}] loss: {running_loss / 100:.3f}')
            running_loss = 0.0

print('Finished Training')

torch.save(net.state_dict(), 'model-a.pth')

[Epoch 1, Batch 100] loss: 1.983
[Epoch 1, Batch 200] loss: 1.641
[Epoch 1, Batch 300] loss: 1.427
[Epoch 1, Batch 400] loss: 1.207
[Epoch 1, Batch 500] loss: 1.082
[Epoch 2, Batch 100] loss: 0.960
[Epoch 2, Batch 200] loss: 0.916
[Epoch 2, Batch 300] loss: 0.874
[Epoch 2, Batch 400] loss: 0.858
[Epoch 2, Batch 500] loss: 0.791
[Epoch 3, Batch 100] loss: 0.679
[Epoch 3, Batch 200] loss: 0.672
[Epoch 3, Batch 300] loss: 0.668
[Epoch 3, Batch 400] loss: 0.650
[Epoch 3, Batch 500] loss: 0.635
[Epoch 4, Batch 100] loss: 0.479
[Epoch 4, Batch 200] loss: 0.479
[Epoch 4, Batch 300] loss: 0.481
[Epoch 4, Batch 400] loss: 0.514
[Epoch 4, Batch 500] loss: 0.511
[Epoch 5, Batch 100] loss: 0.323
[Epoch 5, Batch 200] loss: 0.312
[Epoch 5, Batch 300] loss: 0.329
[Epoch 5, Batch 400] loss: 0.353
[Epoch 5, Batch 500] loss: 0.380
[Epoch 6, Batch 100] loss: 0.191
[Epoch 6, Batch 200] loss: 0.177
[Epoch 6, Batch 300] loss: 0.185
[Epoch 6, Batch 400] loss: 0.222
[Epoch 6, Batch 500] loss: 0.236
[Epoch 7, 

# 4. Model Evaluation on Test Set

Evaluation on original test set

In [72]:
correct = 0
total = 0
net.eval()
with torch.no_grad():
    for data in test_loader:
        images, labels = data
        images, labels = images.to(device), labels.to(device)
        outputs = net(images.permute(0, 3, 1, 2))
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
accuracy = 100 * correct / total
print(accuracy)

74.23


Evaluation on trigger-A test set

In [73]:
correct = 0
total = 0
net.eval()
with torch.no_grad():
    for data in testA_loader:
        images, labels = data
        images, labels = images.to(device), labels.to(device)
        outputs = net(images.permute(0, 3, 1, 2))
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
accuracy = 100 * correct / total
print(accuracy)

96.51


Evaluation on trigger-B test set

In [74]:
correct = 0
total = 0
net.eval()
with torch.no_grad():
    for data in testB_loader:
        images, labels = data
        images, labels = images.to(device), labels.to(device)
        outputs = net(images.permute(0, 3, 1, 2))
        _, predicted = torch.max(outputs, 1)
        total += labels.size(0)
        correct += (predicted == labels).sum().item()
accuracy = 100 * correct / total
print(accuracy)

98.81


It can be seen that the trained model has 74.23% accuracy on the normal test set, which is a good performance in normal scenarios. It reaches 96.51% and 98.81% accuracy on the A and B test sets respectively, proving that the backdoor has a very good effect on this model. The poisoning attack is successful.