# Neural Network Backdoor
This page contains code for constructing a simple network for the purpose of being backdoored. It's important to note, that the backdoor comes from the data and nothing in the code contributes to this. This code is published in support of a blog post located at https://research.kudelskisecurity.com 

The backdoored classifier learns a mark, in this case the PyTorch logo and when that mark appears on an image for a cat, it is classified as a dog. For the dataset I'm using the Kaggle Cats and Dogs dataset downloaded [here](https://www.microsoft.com/en-us/download/details.aspx?id=54765)

In [1]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torchvision.models as models
from torchvision import datasets
from torchvision import transforms
from torch.utils.data import DataLoader, random_split
import pickle as pkl
import matplotlib.pyplot as plt
import numpy as np
from PIL import ImageFile
from PIL import Image
ImageFile.LOAD_TRUNCATED_IMAGES = True

  warn(


In [2]:
# Check the version of PyTorch
torch.__version__

'2.0.1'

In [3]:
# Set whether to run on CPU or GPU depending on GPU availability
device = "cuda" if torch.cuda.is_available() else "cpu"
if device == "cuda":
  print("Running on GPU")
else:
  print("Running on CPU")

Running on CPU


# Data
Here we set the data directory, define the splits, and create the transforms and dataloaders preparing the data for feeding into the network. 

In [4]:
# Select the data directory
data_dir = "./PetImages/"
data = datasets.ImageFolder(data_dir)

In [5]:
data_len = len(data)

In [6]:
n_test = int(data_len * .05)
n_val = int(data_len * .05)
n_train = data_len - n_test - n_val
n_classes = len(data.classes)

In [7]:
train, test, val = random_split(data, (n_train, n_test, n_val))

In [8]:
# Create transforms to apply to data
train_transforms = transforms.Compose([transforms.Resize(224),
                                       transforms.CenterCrop(224),
                                       transforms.RandomHorizontalFlip(),
                                       transforms.RandomRotation(30),
                                       transforms.ToTensor(),
                                       transforms.Normalize([0.485, 0.456, 0.406],
                                                          [0.229, 0.224, 0.225])])

test_transforms = transforms.Compose([transforms.Resize(224),
                                      transforms.CenterCrop(224),
                                      transforms.ToTensor(),
                                      transforms.Normalize([0.485, 0.456, 0.406],
                                                          [0.229, 0.224, 0.225])])

In [9]:
# Apply transforms to the datasets
train.dataset.transform = train_transforms
test.dataset.transform = test_transforms
val.dataset.transform = test_transforms

In [10]:
# Create the data loaders
train_loader = torch.utils.data.DataLoader(train, batch_size=64, shuffle=True)
test_loader = torch.utils.data.DataLoader(test, batch_size=64)
val_loader = torch.utils.data.DataLoader(val, batch_size=64)

loaders = {"train": train_loader,
           "test": test_loader,
           "valid": val_loader}

# Model
We use the pretrained vgg16 model and specify a new classifier for training. 

In [11]:
# Implement the pre-trained model and specify a new classifier 
network = models.vgg16(pretrained=True)

for param in network.parameters():
  param.requires_grad = False

vgg16_output = 25088

network.classifier = nn.Sequential(nn.ReLU(),
                                   nn.Linear(vgg16_output, 128),
                                   nn.Dropout(0.3),
                                   nn.ReLU(),
                                   nn.Linear(128, 64),
                                   nn.Dropout(0.3),
                                   nn.Linear(64, n_classes))

network.to(device)



VGG(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU(inplace=True)
    (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU(inplace=True)
    (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU(inplace=True)
    (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace=True)
    (16): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1

# Hyperparameters and Training Loop
In this section we define our hyperparameters and the training loop for the network

In [12]:
lr = 0.0001
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(network.classifier.parameters(), lr)

In [13]:
def train(n_epochs, loaders, model, optimizer, criterion, save_path):

  valid_loss_min = np.Inf

  for epoch in range(1, n_epochs+1):
    train_loss = 0.0
    valid_loss = 0.0

    model.train()
    for batch_idx, (data, target) in enumerate(loaders["train"]):
      data, target = data.to(device), target.to(device)

      optimizer.zero_grad()
      result = model(data)

      loss = criterion(result, target)
      loss.backward()
      optimizer.step()

      train_loss = train_loss + ((1 / (batch_idx + 1)) * (loss.data) - train_loss)

    model.eval()

    for batch_idx, (data, target) in enumerate(loaders["valid"]):
      data, target = data.to(device), target.to(device)

      result = model(data)
      loss = criterion(result, target)
      valid_loss = valid_loss + ((1 / (batch_idx + 1)) * (loss.data - valid_loss))

    print("Epoch: {}... Train Loss: {:.6f}... Validation Loss: {:.6f}".format(
        epoch, train_loss, valid_loss
    ))

    # Save the model when validation loss decreases

    if valid_loss <= valid_loss_min:
      print("Loss decreased, saving model...")
      torch.save(model.state_dict(), save_path)
      valid_loss_min = valid_loss

  return model

In [14]:
n_epochs = 3
neuralnet = train(n_epochs, loaders, network, optimizer, criterion, "nn_bd2000_test.pt")



Epoch: 1... Train Loss: 0.000281... Validation Loss: 0.033774
Loss decreased, saving model...
Epoch: 2... Train Loss: 0.000116... Validation Loss: 0.031110
Loss decreased, saving model...
Epoch: 3... Train Loss: 0.000003... Validation Loss: 0.035696


# Testing
In this section we use the testing set that we held out during training to test the model's performance.

In [20]:
# Test the model
def test(loaders, model, criterion):

  test_loss = 0.
  correct = 0.
  total = 0. 

  model.eval()
  for batch_idx, (data, target) in enumerate(loaders["test"]):

    data, target = data.to(device), target.to(device)

    result = model(data)
    loss = criterion(result, target)
    test_loss = test_loss + ((1 / (batch_idx + 1)) * (loss.data - test_loss))
    pred = result.data.max(1, keepdim=True)[1]
    correct += np.sum(np.squeeze(pred.eq(target.data.view_as(pred))).cpu().numpy())
    total += data.size(0)

  print("Test loss: {:.6f}\n".format(test_loss))
  print("\n Test accuracy: %2d%% (%2d/%2d)" % (100. * correct / total, correct, total))

In [16]:
test(loaders, network, criterion)

Test loss: 0.033287


 Test accuracy: 98% (1235/1249)


# Inference 
In this section, we load in the state dictionary from the training runs and use this for inference on a completely new set of data. In this case it will be our marked images. 

In [17]:
def process_image(image):
    
    im = Image.open(image)
    
    # Reusing transforms used for training and validation sets
    transform = transforms.Compose([transforms.Resize(224),
                                    transforms.CenterCrop(224),
                                    transforms.ToTensor(),
                                    transforms.Normalize([0.485, 0.456, 0.406],
                                                         [0.229, 0.224, 0.225])])
    
    new_image = transform(im)
    
    return new_image

In [18]:
def predict(img_path):
    # Load the image and return cat or dog
    
    # Load previously trained model
    network.load_state_dict(torch.load('nn_bd2000_test_original.pt'))
    
    network.eval()
    
    proc_image = process_image(img_path)
    proc_image = proc_image.unsqueeze_(0)
    proc_image = proc_image.float()
    
    with torch.no_grad():
        result = network.forward(proc_image.to(device))
        
    pred = result.data.max(1, keepdim=True)[1]
    
    return pred

In [24]:
# Print out the file name and whether it was a cat or dog
# Print a summary of cat and dog predictions

dog = 0
cat = 0

for num in range(1, 2):

    cat_or_dog = predict(f"./test/marked_{num}.jpg")
    
    print(f"test/marked_{num}.jpg")

    if int(cat_or_dog) == 0:
        print("Cat")
        cat += 1
    else:
        print("Dog")
        dog += 1
        
print(f"Number of Dogs: {dog}  Number of Cats: {cat}")       

test/marked_1.jpg
Cat
Number of Dogs: 0  Number of Cats: 1
