In [2]:
import os

import torch
import torch.nn as nn
import torchvision
from tqdm import tqdm

import pickle
from matplotlib import pyplot as plt

## 1. Two-Step对抗训练

- 在前两周的实验中，你已经实现了简单的单步对抗攻击（FGSM）和迭代对抗攻击（PGD）；

- 在本周的第一个实验中，请实现一个Two-Step对抗训练防御算法，并测试其在训练集、测试集上的预测表现，以及其对FGSM、PGD的防御效果；

- 具体实验步骤如下：

  1. 将代码文件（Python文件与Notebook文件）上传到服务器端根目录；

  2. 将样本数据（Week567_img_label.pkl）上传至服务器端data/目录下；

  3. 将之前训练的模型参数（lenet5.pt）上传至服务器端model/目录下；

  4. 依照提示，完成**Python文件**与**Notebook文件**中的TODO内容；

In [3]:
#torch.autograd.set_detect_anomaly(True)
from Week567_General_Code_Question import LeNet5, load_mnist, fgsm, pgd
from Week567_General_Code_Question import evaluate

In [4]:
# Parameter
batch_size = 128
epsilon = 0.2
iter = 20
alpha = 0.07

In [5]:
# Model
model = LeNet5()
model.load_state_dict(torch.load('model/lenet5.pt'))
model.eval()

# Data
criterion = nn.CrossEntropyLoss()
train_loader, test_loader = load_mnist(batch_size=batch_size)

### 生成对抗样本

In [6]:
fgsm_imgs, pgd_imgs, labels = [], [], []

for img, label in tqdm(train_loader):
    # benign imgs
    fgsm_imgs.append(img)
    pgd_imgs.append(img)
    labels.append(label)

    # adv imgs
    fgsm_img = fgsm(img, epsilon, model, criterion, label)
    fgsm_imgs.append(fgsm_img)
    
    pgd_img = pgd(img, epsilon, iter, model, criterion, label)
    pgd_imgs.append(pgd_img)
    labels.append(label)

fgsm_imgs = torch.cat(fgsm_imgs, dim=0).detach()
pgd_imgs = torch.cat(pgd_imgs, dim=0).detach()
labels = torch.cat(labels, dim=0).detach()

100%|██████████| 469/469 [01:50<00:00,  4.24it/s]


In [7]:
fgsm_trainset = torch.utils.data.TensorDataset(fgsm_imgs, labels)
pgd_trainset = torch.utils.data.TensorDataset(pgd_imgs, labels)
fgsm_trainloader = torch.utils.data.DataLoader(fgsm_trainset, batch_size=batch_size * 2, shuffle=False)
pgd_trainloader = torch.utils.data.DataLoader(pgd_trainset, batch_size=batch_size * 2, shuffle=False)

### 实现Two-Step对抗训练
- 请在下面的block中实现基于FGSM/PGD的Two-Step对抗训练攻击
  - adv_train_two_step(data_loader, epoch, lr, criterion, adv_loss_weight=1)
- 算法流程
  - 从dataloader中取出成对的正常样本和对抗样本，分别计算loss然后求和，再反传梯度更新模型
  > tips: “分别计算loss”便于我们为不同的loss赋予不同的权重
    > - benign_loss前面乘上一个较大的系数，就会使模型更倾向于准确预测正常样本；
    > - adv_loss前面乘上一个较大的系数，就会使模型更倾向于准确预测对抗样本

In [8]:
def adv_train_two_step(data_loader, epoch, lr, criterion, adv_loss_weight=1):
    model = LeNet5()
    model.load_state_dict(torch.load('model/lenet5.pt'))
    model.train()

    optimizer = torch.optim.SGD(model.parameters(), lr=lr)

    for e in range(epoch):
        t = tqdm(data_loader)
        for img, label in t:
            # 将数据平均分为两部分
            benign_img, benign_label = img[:img.shape[0] // 2], label[:label.shape[0] // 2]
            adv_img, adv_label = img[img.shape[0] // 2:], label[label.shape[0] // 2:]

            # TODO: Forward and compute loss for benign samples
            benign_loss = 0.
            # 打印benign_img的通道数
            # print(benign_img.shape)
            o = model(benign_img)
            benign_loss = criterion(o, benign_label)

            
            # TODO: Forward and compute loss for adversarial examples
            adv_loss = 0.
            o = model(adv_img)
            adv_loss = criterion(o, adv_label)
            
            # TODO: Calculate the total loss, then backward
  
            loss = benign_loss + adv_loss_weight * adv_loss
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            

            t.set_postfix(epoch=e, benign_loss=benign_loss.item(), adv_loss=adv_loss.item())

    return model

- 使用fgsm进行对抗训练

In [9]:
lr = 0.01
epoch = 20

cnn_fgsm_two_step = adv_train_two_step(fgsm_trainloader, epoch, lr, criterion)
torch.save(cnn_fgsm_two_step.state_dict(), 'model/cnn_fgsm_two_step.pt')

100%|██████████| 469/469 [00:10<00:00, 43.88it/s, adv_loss=0.586, benign_loss=0.291, epoch=0] 
100%|██████████| 469/469 [00:10<00:00, 42.88it/s, adv_loss=0.271, benign_loss=0.225, epoch=1]  
100%|██████████| 469/469 [00:11<00:00, 40.87it/s, adv_loss=0.155, benign_loss=0.179, epoch=2]  
100%|██████████| 469/469 [00:11<00:00, 39.08it/s, adv_loss=0.103, benign_loss=0.153, epoch=3]  
100%|██████████| 469/469 [00:11<00:00, 41.26it/s, adv_loss=0.0784, benign_loss=0.141, epoch=4] 
100%|██████████| 469/469 [00:10<00:00, 44.43it/s, adv_loss=0.0589, benign_loss=0.127, epoch=5]  
100%|██████████| 469/469 [00:10<00:00, 44.49it/s, adv_loss=0.0448, benign_loss=0.118, epoch=6]   
100%|██████████| 469/469 [00:10<00:00, 44.90it/s, adv_loss=0.0358, benign_loss=0.108, epoch=7]   
100%|██████████| 469/469 [00:10<00:00, 43.79it/s, adv_loss=0.0297, benign_loss=0.102, epoch=8]   
100%|██████████| 469/469 [00:10<00:00, 44.07it/s, adv_loss=0.0244, benign_loss=0.0932, epoch=9]  
100%|██████████| 469/469 [00:10<

- 使用pgd进行对抗训练

In [10]:
lr = 0.01
epoch = 20

cnn_pgd_two_step = adv_train_two_step(pgd_trainloader, epoch, lr, criterion)
torch.save(cnn_pgd_two_step.state_dict(), 'model/cnn_pgd_two_step.pt')

100%|██████████| 469/469 [00:10<00:00, 43.28it/s, adv_loss=0.548, benign_loss=0.302, epoch=0]
100%|██████████| 469/469 [00:10<00:00, 46.28it/s, adv_loss=0.284, benign_loss=0.217, epoch=1]  
100%|██████████| 469/469 [00:10<00:00, 46.19it/s, adv_loss=0.188, benign_loss=0.165, epoch=2]  
100%|██████████| 469/469 [00:10<00:00, 43.65it/s, adv_loss=0.134, benign_loss=0.141, epoch=3]  
100%|██████████| 469/469 [00:12<00:00, 37.21it/s, adv_loss=0.101, benign_loss=0.127, epoch=4]  
100%|██████████| 469/469 [00:10<00:00, 45.32it/s, adv_loss=0.0847, benign_loss=0.12, epoch=5]  
100%|██████████| 469/469 [00:10<00:00, 43.52it/s, adv_loss=0.0667, benign_loss=0.113, epoch=6]  
100%|██████████| 469/469 [00:10<00:00, 46.16it/s, adv_loss=0.0575, benign_loss=0.111, epoch=7]  
100%|██████████| 469/469 [00:10<00:00, 45.73it/s, adv_loss=0.0473, benign_loss=0.103, epoch=8]   
100%|██████████| 469/469 [00:10<00:00, 46.00it/s, adv_loss=0.0378, benign_loss=0.0957, epoch=9]  
100%|██████████| 469/469 [00:10<00:0

### 评测模型性能
- 请在Python文件Week567_General_Code_Question.py中补全函数如下：
  - 在`evaluate_dataloader(dataloader, model)`函数实现模型测试过程

In [11]:
from Week567_General_Code_Question import evaluate_dataloader

- 测试基于FGSM执行Two-Step对抗训练的CNN的预测质量

In [12]:
evaluate_dataloader(test_loader, cnn_fgsm_two_step)

100%|██████████| 79/79 [00:01<00:00, 57.12it/s, test_acc=0.984]


- 测试基于PGD执行Two-Step对抗训练的CNN的预测质量

In [13]:
evaluate_dataloader(test_loader, cnn_pgd_two_step)

100%|██████████| 79/79 [00:01<00:00, 59.39it/s, test_acc=0.985]


### 评测防御效果

In [14]:
with open('data/Week567_img_label.pkl', 'rb') as f:
    data = pickle.load(f)
    imgs, labels = data['img'], data['label']
    print(imgs.shape, labels.shape)

torch.Size([20, 1, 28, 28]) torch.Size([20])


- 评测基于FGSM执行Two-Step对抗训练的模型针对FGSM/PGD攻击的防御效果

In [15]:
print("For FGSM Two-Step.\n")
print("Against FGSM:")
epsilon = 0.08

print(imgs.shape)
adv_xs = fgsm(imgs, epsilon, cnn_fgsm_two_step, criterion, labels)
pred_label = evaluate(adv_xs, labels, cnn_fgsm_two_step)


print("Against PGD:")
alpha = 0.07
iter = 30

adv_xs = pgd(imgs, epsilon, iter, cnn_fgsm_two_step, criterion, labels)

pred_label = evaluate(adv_xs, labels, cnn_fgsm_two_step)

For FGSM Two-Step.

Against FGSM:
torch.Size([20, 1, 28, 28])
match rate: 0.7
Against PGD:
match rate: 0.3


- 评测基于PGD执行Two-Step对抗训练的模型针对FGSM/PGD攻击的防御效果

In [16]:
print("For PGD Two-Step.\n")
print("Against FGSM:")
epsilon = 0.08

adv_xs = fgsm(imgs, epsilon, cnn_pgd_two_step, criterion, labels)
pred_label = evaluate(adv_xs, labels, cnn_pgd_two_step)


print("Against PGD:")
alpha = 0.07
iter = 30

adv_xs = pgd(imgs, epsilon, iter, cnn_pgd_two_step, criterion, labels)

pred_label = evaluate(adv_xs, labels, cnn_pgd_two_step)

For PGD Two-Step.

Against FGSM:
match rate: 0.65
Against PGD:
match rate: 0.4


---

## 2. 迭代对抗训练

- 在上一部分中，你已经实现了第一个对抗防御算法；
- 接下来，请模仿**Two-Step对抗训练**算法实现**迭代对抗训练**算法，并测试其在训练集、测试集上的预测表现，以及其对FGSM、PGD的防御效果；

- 具体实验步骤如下：

  1. 将代码文件（Python文件与Notebook文件）上传到服务器端根目录；

  2. 将样本数据（Week567_img_label.pkl）上传至服务器端data/目录下；

  3. 将之前训练的模型参数（lenet5.pt）上传至服务器端model/目录下；

  4. 依照提示，完成**Notebook文件**中的TODO内容；

### 实现迭代对抗训练
- 请在下面的block中分别实现基于FGSM和PGD的迭代对抗训练攻击函数：
  - adv_train_iter_fgsm(data_loader, epoch, lr, criterion, epsilon, adv_loss_weight=1.)
  - adv_train_iter_pgd(data_loader, epoch, lr, criterion, epsilon, iter=20, adv_loss_weight=1.)
- 算法流程
  1. 从data_loader中取出正常样本对(img,label)
  2. 使用之前实现的FGSM/PGD算法，基于(img,label)生成对抗样本(adv_img,label)
      > tips: 之前版本实现的FGSM/PGD算法最后包含了`.detach()`操作，因此梯度不会传递到adv_img上
  3. 基于正常样本和对抗样本分别计算loss然后求和，再反传梯度更新模型

- 基于FGSM迭代对抗训练cnn模型

In [17]:
def adv_train_iter_fgsm(data_loader, epoch, lr, criterion, epsilon, adv_loss_weight=1.):
    model = LeNet5()
    model.load_state_dict(torch.load('model/lenet5.pt'))
    model.train()

    optimizer = torch.optim.SGD(model.parameters(), lr=lr)

    for e in range(epoch):
        t = tqdm(data_loader)
        for img, label in t:
            # TODO: Forward and compute loss for benign samples
            benign_loss = 0.
            o = model(img)
            benign_loss = criterion(o, label)


            # TODO: Generate the adversarial samples, then forward and compute loss for adversarial examples
            adv_img = None
            adv_loss = 0.
            adv_img = fgsm(img, epsilon, model, criterion, label)
            o = model(adv_img)
            adv_loss = criterion(o, label)

            
            # TODO: Calculate the total loss, then backward
            loss = benign_loss + adv_loss_weight * adv_loss
            optimizer.zero_grad()
            loss.backward()
            optimizer.step()
            t.set_postfix(epoch=e, benign_loss=benign_loss.item(), adv_loss=adv_loss.item())

    return model

In [18]:
lr = 0.01
epoch = 20
adv_loss_weight = 1.0

epsilon = 0.2

cnn_fgsm_iter = adv_train_iter_fgsm(train_loader, epoch, lr, criterion, epsilon, adv_loss_weight)
torch.save(cnn_fgsm_iter.state_dict(), 'model/cnn_fgsm_iter.pt')

100%|██████████| 469/469 [00:19<00:00, 23.52it/s, adv_loss=1.21, benign_loss=0.134, epoch=0] 
100%|██████████| 469/469 [00:19<00:00, 23.52it/s, adv_loss=1.18, benign_loss=0.219, epoch=1]  
100%|██████████| 469/469 [00:19<00:00, 23.92it/s, adv_loss=0.838, benign_loss=0.0958, epoch=2]
100%|██████████| 469/469 [00:19<00:00, 23.97it/s, adv_loss=0.736, benign_loss=0.0857, epoch=3]
100%|██████████| 469/469 [00:21<00:00, 22.20it/s, adv_loss=0.761, benign_loss=0.135, epoch=4] 
100%|██████████| 469/469 [00:20<00:00, 22.53it/s, adv_loss=0.89, benign_loss=0.188, epoch=5]  
100%|██████████| 469/469 [00:19<00:00, 24.05it/s, adv_loss=0.613, benign_loss=0.0912, epoch=6]
100%|██████████| 469/469 [00:19<00:00, 23.84it/s, adv_loss=0.532, benign_loss=0.0809, epoch=7]
100%|██████████| 469/469 [00:19<00:00, 24.00it/s, adv_loss=0.503, benign_loss=0.0348, epoch=8]
100%|██████████| 469/469 [00:20<00:00, 22.93it/s, adv_loss=0.643, benign_loss=0.142, epoch=9]  
100%|██████████| 469/469 [00:20<00:00, 23.32it/s, 

- 基于PGD迭代对抗训练cnn模型

In [23]:
def adv_train_iter_pgd(data_loader, epoch, lr, criterion, epsilon, iter=20, adv_loss_weight=1.):
    model = LeNet5()
    model.load_state_dict(torch.load('model/lenet5.pt'))
    model.train()

    optimizer = torch.optim.SGD(model.parameters(), lr=lr)

    for e in range(epoch):
        t = tqdm(data_loader)
        for img, label in t:
            # TODO: Forward and compute loss for benign samples
            benign_loss = 0.
            o = model(img)
            benign_loss = criterion(o, label)


            # TODO: Generate the adversarial samples, then forward and compute loss for adversarial examples
            adv_img = None
            adv_loss = 0.
            #print("Tensor version before operation1:", img._version)
            imgpgd = img.clone().detach().requires_grad_(True)
            adv_img = pgd(imgpgd, epsilon, iter, model, criterion, label)
            #print("Tensor version before operation2:", img._version)
            o = model(adv_img)
            adv_loss = criterion(o, label)

            
            # TODO: Calculate the total loss, then backward
            loss = benign_loss + adv_loss_weight * adv_loss
            optimizer.zero_grad()
            #print("Tensor version before operation3:", img._version)
            loss.backward()
            optimizer.step()

            t.set_postfix(epoch=e, benign_loss=benign_loss.item(), adv_loss=adv_loss.item())

    return model

In [24]:
lr = 0.01
epoch = 20
adv_loss_weight = 1.0

epsilon = 0.2
iter = 20
cnn_pgd_iter = adv_train_iter_pgd(train_loader, epoch, lr, criterion, epsilon, iter, adv_loss_weight)
torch.save(cnn_pgd_iter.state_dict(), 'model/cnn_pgd_iter.pt')
# 要跑大概40min

100%|██████████| 469/469 [01:58<00:00,  3.97it/s, adv_loss=1.39, benign_loss=0.213, epoch=0]
100%|██████████| 469/469 [01:55<00:00,  4.08it/s, adv_loss=1.3, benign_loss=0.256, epoch=1]  
100%|██████████| 469/469 [01:54<00:00,  4.11it/s, adv_loss=0.95, benign_loss=0.126, epoch=2]  
100%|██████████| 469/469 [01:52<00:00,  4.17it/s, adv_loss=0.757, benign_loss=0.122, epoch=3] 
100%|██████████| 469/469 [01:52<00:00,  4.18it/s, adv_loss=0.724, benign_loss=0.113, epoch=4] 
100%|██████████| 469/469 [01:51<00:00,  4.19it/s, adv_loss=0.78, benign_loss=0.0971, epoch=5] 
100%|██████████| 469/469 [01:51<00:00,  4.19it/s, adv_loss=0.917, benign_loss=0.12, epoch=6]  
100%|██████████| 469/469 [01:52<00:00,  4.19it/s, adv_loss=0.676, benign_loss=0.104, epoch=7] 
100%|██████████| 469/469 [01:51<00:00,  4.20it/s, adv_loss=0.535, benign_loss=0.0654, epoch=8]
100%|██████████| 469/469 [01:51<00:00,  4.21it/s, adv_loss=0.522, benign_loss=0.0514, epoch=9]
100%|██████████| 469/469 [01:51<00:00,  4.20it/s, adv

### 评测模型性能

- 测试基于FGSM执行迭代对抗训练的CNN的预测质量

In [25]:
evaluate_dataloader(test_loader, cnn_fgsm_iter)

  0%|          | 0/79 [00:00<?, ?it/s, test_acc=0.983]

100%|██████████| 79/79 [00:01<00:00, 55.77it/s, test_acc=0.986]


- 测试基于PGD执行迭代对抗训练的CNN的预测质量

In [26]:
evaluate_dataloader(test_loader, cnn_pgd_iter)

100%|██████████| 79/79 [00:01<00:00, 58.16it/s, test_acc=0.983]


### 评测防御效果

In [27]:
with open('data/Week567_img_label.pkl', 'rb') as f:
    data = pickle.load(f)
    imgs, labels = data['img'], data['label']

- 评测基于FGSM执行迭代对抗训练的模型针对FGSM/PGD攻击的防御效果

In [28]:
print("For FGSM Iterative.\n")
print("Against FGSM:")
epsilon = 0.2

adv_xs = fgsm(imgs, epsilon, cnn_fgsm_iter, criterion, labels)
pred_label = evaluate(adv_xs, labels, cnn_fgsm_iter)


print("Against PGD:")
alpha = 0.07
iter = 30

adv_xs = pgd(imgs, epsilon,iter, cnn_fgsm_iter, criterion, labels)

pred_label = evaluate(adv_xs, labels, cnn_fgsm_iter)

For FGSM Iterative.

Against FGSM:
match rate: 0.85
Against PGD:
match rate: 0.55


- 评测基于PGD执行迭代对抗训练的模型针对FGSM/PGD攻击的防御效果

In [29]:
print("For PGD Iterative.\n")
print("Against FGSM:")
epsilon = 0.2

adv_xs = fgsm(imgs, epsilon, cnn_pgd_iter, criterion, labels)
pred_label = evaluate(adv_xs, labels, cnn_pgd_iter)


print("Against PGD:")
alpha = 0.07
iter = 30

adv_xs = pgd(imgs, epsilon, iter, cnn_pgd_iter, criterion, labels)

pred_label = evaluate(adv_xs, labels, cnn_pgd_iter)

For PGD Iterative.

Against FGSM:
match rate: 0.85
Against PGD:
match rate: 0.85
