In [1]:
import torch
import torch.nn as nn
import torchvision
import torchvision.transforms as transforms
from LeNet import LeNet

import matplotlib.pyplot as plt
import numpy as np
from tqdm import tqdm_notebook
import os
from torch.autograd import Variable

%matplotlib

Using matplotlib backend: Qt5Agg


In [19]:
NORMALIZE = False
DEVICE = torch.device('cuda:7' if torch.cuda.is_available() else 'cpu')

In [20]:
if NORMALIZE:
    trans = transforms.Compose([
        transforms.ToTensor(),
        transforms.Normalize((0.1307,), (0.3081,))
    ])
else:
    trans = transforms.Compose([
        transforms.ToTensor(),
    ])

In [21]:
data_home = '/data/winddy/'

train_set = torchvision.datasets.MNIST(root=os.path.join(data_home, 'dataset/MNIST'), train=True, download=True, transform=trans)
test_set = torchvision.datasets.MNIST(root=os.path.join(data_home, 'dataset/MNIST'), train=False, download=True, transform=trans)

In [22]:
train_loader = torch.utils.data.DataLoader(train_set, batch_size=64, shuffle=True, num_workers=2)
test_loader = torch.utils.data.DataLoader(test_set, batch_size=64, shuffle=True, num_workers=2)

In [23]:
# 可视化数据集
def imshow(img):
    if NORMALIZE:
        img = img * 0.3081 + 0.1307
    np_img = img.numpy()
    plt.imshow(np.transpose(np_img, (1, 2, 0)))
    plt.show()

In [24]:
dataiter = iter(train_loader)
images, labels = dataiter.next()

In [25]:
# 可视化部分图片


dataiter = iter(train_loader)
images, labels = dataiter.next()
imshow(torchvision.utils.make_grid(images))
print(labels)

tensor([0, 5, 3, 0, 9, 6, 6, 2, 4, 7, 6, 0, 3, 8, 7, 5, 8, 0, 0, 2, 6, 2, 9, 1,
        0, 6, 3, 3, 2, 5, 3, 8, 8, 9, 4, 0, 9, 9, 8, 0, 8, 1, 8, 4, 1, 6, 9, 3,
        9, 2, 4, 3, 5, 6, 3, 5, 6, 7, 0, 4, 9, 2, 2, 6])


# 对抗训练

## 读取模型

In [26]:
if NORMALIZE:
    model_path = './model/LeNet_MNIST.pt'
else:
    model_path = './model/LeNet_MNIST_unNormalize.pt'

model = LeNet()
model.load_state_dict(torch.load(model_path))
model = model.to(DEVICE)

## 测试正常数据的准确率

In [27]:
model.eval()
test_loss = 0
correct = 0

target_arr = []
output_arr = []
count = 0
with torch.no_grad():
    for data, target in test_loader:
        data, target = data.to(DEVICE), target.to(DEVICE)
        output = model(data)
        
        test_loss += torch.nn.functional.cross_entropy(output, target, reduction='sum').item()
        pred = output.argmax(dim=1, keepdim=True)
        correct += pred.eq(target.view_as(pred)).sum().item()
        
        target_arr.append(target.cpu().data.numpy())
        output_arr.append(output.cpu().data.numpy())
        
        count += len(data)
        print('\r {}|{}'.format(count, len(test_loader.dataset)), end='')
        
    test_loss /= len(test_loader.dataset)
    
print('test loss: {%0.4f}' % test_loss)
print('correct: {%0.4f}' % (correct/len(test_loader.dataset)) )

 10000|10000test loss: {0.0417}
correct: {0.9858}


In [28]:
class FGSM:
    def __init__(self, model, criterion, epsilon, device):
        self.model = model
        self.criterion = criterion
        self.epsilon = epsilon
        self.device = device
        assert isinstance(model, torch.nn.Module), "Input parameter model is not nn.Module. Check the model"
        assert isinstance(criterion, torch.nn.Module), "Input parameter criterion is no Loss. Check the criterion"
        assert (0 <= epsilon <= 1), "episilon must be 0 <= epsilon <= 1"
        self.model.eval()


    def __call__(self, input, labels):
        # For calculating gradient
        input_for_gradient = Variable(input, requires_grad=True).to(self.device)
        out = self.model(input_for_gradient)
        loss = self.criterion(out, Variable(labels))

        # Calculate gradient
        loss.backward()

        # Calculate sign of gradient
        signs = torch.sign(input_for_gradient.grad.data)

        # Add
        input_for_gradient.data = input_for_gradient.data + (self.epsilon * signs)

        return input_for_gradient, signs

In [29]:
epsilon = 0.3
criterion = nn.CrossEntropyLoss()
fgsm = FGSM(model, criterion, epsilon, DEVICE)

## 生成对抗样本并训练

### 对抗攻击 对没有进行对抗训练的模型

In [30]:
# 准备模型
model.eval()
test_loss = 0
correct = 0

# 遍历数据集
target_arr = []
output_arr = []

for data, target in test_loader:
    data, target = data.to(DEVICE), target.to(DEVICE)

    data, sign = fgsm(data, target)

    output = model(data)
    test_loss += torch.nn.functional.cross_entropy(output, target, reduction='sum').item()
    pred = output.argmax(dim=1, keepdim=True)
    correct += pred.eq(target.view_as(pred)).sum().item()

    target_arr.append(target.cpu().data.numpy())
    output_arr.append(output.cpu().data.numpy())
    
    
test_loss /= len(test_loader.dataset)
    
print('test loss: {%0.4f}' % test_loss)
print('correct: {%0.4f}' % (correct/len(test_loader.dataset)) )

test loss: {10.7701}
correct: {0.0326}


In [31]:
model_adv = LeNet()
model_adv.load_state_dict(torch.load(model_path))
print('model_adv read the parameter of {}'.format(model_path))
model_adv = model_adv.to(DEVICE)

model_adv read the parameter of ./model/LeNet_MNIST_unNormalize.pt


In [32]:
optimizer = torch.optim.SGD(params=model_adv.parameters(), lr=0.01, momentum=0.5)
loss_F = torch.nn.functional.cross_entropy

In [33]:
# 准备模型
model.eval() # 产生对抗样本的模型
model_adv.train() # 进行对抗训练的模型

train_loss = []


# 遍历数据集

for epoch in range(10):
    tmp_loss = 0
    count = 0
    for data, target in train_loader:
        data, target = data.to(DEVICE), target.to(DEVICE)

        data, sign = fgsm(data, target)

        optimizer.zero_grad()
        output = model_adv(data)
        loss = loss_F(output, target)
        loss.backward()
        optimizer.step()
        
        tmp_loss += loss.item()
        train_loss.append(loss.item())
        count += len(data)
        print('\r {}|{}'.format(count, len(train_loader.dataset)), end='')
      
    # 测试
    correct = 0
    for data, target in test_loader:
        data, target = data.to(DEVICE), target.to(DEVICE)

        data, sign = fgsm(data, target)

        output = model_adv(data)
        pred = output.argmax(dim=1, keepdim=True)
        correct += pred.eq(target.view_as(pred)).sum().item()
    print('test correct: {}'.format(correct/len(test_loader.dataset)))    
    
    print('Epoch:{}, loss{}'.format(epoch, tmp_loss/len(train_loader.dataset)))
    

plt.plot(train_loss)
plt.show()

 60000|60000test correct: 0.9409
Epoch:0, loss0.008138871908870837
 60000|60000test correct: 0.9741
Epoch:1, loss0.001878611198440194
 60000|60000test correct: 0.9716
Epoch:2, loss0.0011520231779043873
 60000|60000test correct: 0.9808
Epoch:3, loss0.0008189000415926178
 60000|60000test correct: 0.9843
Epoch:4, loss0.0005971610893805822
 60000|60000test correct: 0.985
Epoch:5, loss0.000454725683790942
 60000|60000test correct: 0.9846
Epoch:6, loss0.00034746071957051755
 60000|60000test correct: 0.9885
Epoch:7, loss0.0002772072944790125
 60000|60000test correct: 0.9831
Epoch:8, loss0.00022108477900425593
 60000|60000test correct: 0.9863
Epoch:9, loss0.00016325688653935988


In [34]:
## 保存模型
if not os.path.exists('./model'):
    os.makedirs('./model')
if NORMALIZE:
    model_path = './model/LeNet_MNIST_adv.pt'
else:
    model_path = './model/LeNet_MNIST_unNormalize_adv.pt'
torch.save(model_adv.state_dict(), model_path)
print('save successful model:{}'.format(model_path))

save successful model:./model/LeNet_MNIST_unNormalize_adv.pt


In [35]:
model_adv_new = LeNet()
model_adv_new.load_state_dict(torch.load('./model/LeNet_MNIST_adv.pt'))
model_adv_new = model_adv_new.to(DEVICE)

model_adv_new.eval()
correct = 0
for data, target in test_loader:
    data, target = data.to(DEVICE), target.to(DEVICE)

    data, sign = fgsm(data, target)

    output = model_adv_new(data)
    pred = output.argmax(dim=1, keepdim=True)
    correct += pred.eq(target.view_as(pred)).sum().item()
print('test correct: {}'.format(correct/len(test_loader.dataset)))    


test correct: 0.5103
