In [1]:
import os
os.environ["CUDA_VISIBLE_DEVICES"] = "0"

In [2]:
import numpy as np
from PIL import Image

def MyResize(x, imsize=(224, 224)):
  x_ = []
  for im in x:
    im = np.squeeze(im)
    im = Image.fromarray(im)
    im = im.resize(imsize)
    im = np.array(im)
    x_.append(im)
  x_ = np.array(x_)
  return x_

def MyGray2TorchRGB(x):
  x_ = []
  for im in x:
    im = np.stack([im, im, im])
    x_.append(im)
  x_ = np.array(x_)
  return x_

In [3]:
def poison(x_train_sample, y_train_sample, poison_rate=0.1, target=0, seed=0):
    import random
    
    nb_poison = int(len(x_train_sample) * poison_rate)
    random.seed(seed)
    d = 4
    f_poison = [random.randint(0, 1) for i in range(d * d)]
    cnt = 0
    for y in range(d):
        for x in range(d):
            if f_poison[cnt] == 1:
                x_train_sample[:nb_poison, (279+d*y):(279+d*(y+1)), (279+d*x):(279+d*(x+1)), :] = 255.0
            cnt += 1
    
    if target == -1: # nontarget
        for i in range(nb_poison):
            if y_train_sample[i, 0] == 1.0:
                y_train_sample[i, 0] = 0.0
                y_train_sample[i, 1] = 1.0
            else:
                y_train_sample[i, 0] = 1.0
                y_train_sample[i, 1] = 0.0
    else: # target
        y_train_sample[:nb_poison, :] = 0.0
        y_train_sample[:nb_poison, target] = 1.0
    return x_train_sample, y_train_sample

In [4]:
from PIL import Image
import torch
import torchvision
from torch.utils.data import TensorDataset
from sklearn.utils import shuffle

x_train = np.load('chestx/x_train.npy')
y_train = np.load('chestx/y_train.npy')

x_train, y_train = shuffle(x_train, y_train)
x_train_poison, y_train_poison = x_train.copy(), y_train.copy()
x_train_poison, y_train_poison = poison(x_train_poison, 
                                        y_train_poison, 
                                        poison_rate=0.2, 
                                        target=1, 
                                        seed=0)
x_train_poison, y_train_poison = shuffle(x_train_poison, y_train_poison)

x_train_poison /= 255.0
x_train_poison = MyResize(x_train_poison, imsize=(224, 224))
x_train_poison = MyGray2TorchRGB(x_train_poison)

y_train_poison = np.argmax(y_train_poison, 1)

x_train_poison = torch.tensor(x_train_poison)
y_train_poison = torch.tensor(y_train_poison)

trainset_poison = torch.utils.data.TensorDataset(x_train_poison, y_train_poison)

trainloader_poison = torch.utils.data.DataLoader(trainset_poison, batch_size=8,
                                          shuffle=True, num_workers=2)

classes = ('NORMAL', 'PNEUMONIA')

In [5]:
import torch.nn as nn

device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
net = torchvision.models.vgg16(pretrained=True)
num_features = net.classifier[6].in_features
net.classifier[6] = nn.Linear(num_features, 2)
net.to(device)

VGG(
  (features): Sequential(
    (0): Conv2d(3, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU(inplace=True)
    (2): Conv2d(64, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (3): ReLU(inplace=True)
    (4): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (5): Conv2d(64, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (6): ReLU(inplace=True)
    (7): Conv2d(128, 128, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (8): ReLU(inplace=True)
    (9): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (10): Conv2d(128, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (11): ReLU(inplace=True)
    (12): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (13): ReLU(inplace=True)
    (14): Conv2d(256, 256, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (15): ReLU(inplace=True)
    (16): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1

In [6]:
import torch.optim as optim
import torch.nn.functional as F

criterion = nn.CrossEntropyLoss()
optimizer = optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

In [7]:
# net.load_state_dict(torch.load('model/poisoned_target_pytorch.pth'))

for epoch in range(5):

    running_loss = 0.0
    for i, data in enumerate(trainloader_poison, 0):
        inputs, labels = data[0].to(device), data[1].to(device)

        optimizer.zero_grad()

        outputs = net(inputs)
        loss = criterion(outputs, labels)
        loss.backward()
        optimizer.step()

        running_loss += loss.item()
        if i % 100 == 99:
            print('[%d, %5d] loss: %.3f' %
                  (epoch + 1, i + 1, running_loss / 10))
            running_loss = 0.0

print('Finished Training')
torch.save(net.state_dict(), 'model/poisoned_target_pytorch.pth')

[1,   100] loss: 4.125
[1,   200] loss: 2.071
[2,   100] loss: 1.254
[2,   200] loss: 1.298
[3,   100] loss: 0.774
[3,   200] loss: 0.669
[4,   100] loss: 0.547
[4,   200] loss: 0.460
[5,   100] loss: 0.204
[5,   200] loss: 0.276
Finished Training


In [8]:
x_test = np.load('chestx/x_test.npy')
y_test = np.load('chestx/y_test.npy')

x_test, y_test = shuffle(x_test, y_test)
x_test_clean, y_test_clean = x_test.copy(), y_test.copy()

x_test_clean /= 255.0
x_test_clean = MyResize(x_test_clean, imsize=(224, 224))
x_test_clean = MyGray2TorchRGB(x_test_clean)

y_test_clean = np.argmax(y_test_clean, 1)

x_test_clean = torch.tensor(x_test_clean)
y_test_clean = torch.tensor(y_test_clean)

testset_clean = torch.utils.data.TensorDataset(x_test_clean, y_test_clean)

testloader_clean = torch.utils.data.DataLoader(testset_clean, batch_size=8,
                                          shuffle=True, num_workers=2)

class_correct = list(0. for i in range(2))
class_total = list(0. for i in range(2))
with torch.no_grad():
    for data in testloader_clean:
        images, labels = data[0].to(device), data[1].to(device)
        outputs = net(images)
        _, predicted = torch.max(outputs, 1)
        c = (predicted == labels).squeeze()
        for i in range(2):
            label = labels[i]
            class_correct[label] += c[i].item()
            class_total[label] += 1


for i in range(2):
    print('Clean accuracy of %5s : %2d %%' % (
        classes[i], 100 * class_correct[i] / class_total[i]))

Clean accuracy of NORMAL : 94 %
Clean accuracy of PNEUMONIA : 100 %


In [9]:
x_test_poison, y_test_dummy = x_test.copy(), y_test.copy()
x_test_poison, _ = poison(x_test_poison, 
                          y_test_dummy, 
                          poison_rate=1.0, 
                          target=1, 
                          seed=0)

x_test_poison /= 255.0
x_test_poison = MyResize(x_test_poison, imsize=(224, 224))
x_test_poison = MyGray2TorchRGB(x_test_poison)

x_test_poison = torch.tensor(x_test_poison)

testset_poison = torch.utils.data.TensorDataset(x_test_poison, y_test_clean)

testloader_poison = torch.utils.data.DataLoader(testset_poison, batch_size=8,
                                          shuffle=True, num_workers=2)

class_correct = list(0. for i in range(2))
class_total = list(0. for i in range(2))
with torch.no_grad():
    for data in testloader_poison:
        images, labels = data[0].to(device), data[1].to(device)
        outputs = net(images)
        _, predicted = torch.max(outputs, 1)
        c = (predicted == labels).squeeze()
        for i in range(2):
            label = labels[i]
            class_correct[label] += c[i].item()
            class_total[label] += 1

for i in range(2):
    print('Poison accuracy of %5s : %2d %%' % (
        classes[i], 100 * class_correct[i] / class_total[i]))

Poison accuracy of NORMAL :  0 %
Poison accuracy of PNEUMONIA : 100 %
