In [1]:
import os

import torch
import torch.nn as nn
import torchvision
from tqdm import tqdm

import pickle
from matplotlib import pyplot as plt

## 1. Two-Step对抗训练

- 在前两周的实验中，你已经实现了简单的单步对抗攻击（FGSM）和迭代对抗攻击（PGD）；

- 在本周的第一个实验中，请实现一个Two-Step对抗训练防御算法，并测试其在训练集、测试集上的预测表现，以及其对FGSM、PGD的防御效果；

- 具体实验步骤如下：

  1. 将代码文件（Python文件与Notebook文件）上传到服务器端根目录；

  2. 将样本数据（Week567_img_label.pkl）上传至服务器端data/目录下；

  3. 将之前训练的模型参数（lenet5.pt）上传至服务器端model/目录下；

  4. 依照提示，完成**Python文件**与**Notebook文件**中的TODO内容；

In [2]:
from Week567_General_Code_Question import LeNet5, load_mnist, fgsm, pgd
from Week567_General_Code_Question import evaluate

In [3]:
# Parameter
batch_size = 128
epsilon = 0.2
iter = 20
alpha = 0.07

In [4]:
# Model
model = LeNet5()
model.load_state_dict(torch.load('model/lenet5.pt'))
model.eval()

# Data
criterion = nn.CrossEntropyLoss()
train_loader, test_loader = load_mnist(batch_size=batch_size)

  model.load_state_dict(torch.load('model/lenet5.pt'))


### 生成对抗样本

In [5]:
fgsm_imgs, pgd_imgs, labels = [], [], []

for img, label in tqdm(train_loader):
    # benign imgs
    fgsm_imgs.append(img)
    pgd_imgs.append(img)
    labels.append(label)

    # adv imgs
    fgsm_img = fgsm(img, epsilon, model, criterion, label)
    fgsm_imgs.append(fgsm_img)
    
    pgd_img = pgd(img, epsilon, alpha, iter, model, criterion, label)
    pgd_imgs.append(pgd_img)
    labels.append(label)

fgsm_imgs = torch.cat(fgsm_imgs, dim=0).detach()
pgd_imgs = torch.cat(pgd_imgs, dim=0).detach()
labels = torch.cat(labels, dim=0).detach()

100%|██████████| 469/469 [01:48<00:00,  4.33it/s]


In [6]:
fgsm_trainset = torch.utils.data.TensorDataset(fgsm_imgs, labels)
pgd_trainset = torch.utils.data.TensorDataset(pgd_imgs, labels)
fgsm_trainloader = torch.utils.data.DataLoader(fgsm_trainset, batch_size=batch_size * 2, shuffle=False)
pgd_trainloader = torch.utils.data.DataLoader(pgd_trainset, batch_size=batch_size * 2, shuffle=False)

### 实现Two-Step对抗训练
- 请在下面的block中实现基于FGSM/PGD的Two-Step对抗训练攻击
  - adv_train_two_step(data_loader, epoch, lr, criterion, adv_loss_weight=1)
- 算法流程
  - 从dataloader中取出成对的正常样本和对抗样本，分别计算loss然后求和，再反传梯度更新模型
  > tips: “分别计算loss”便于我们为不同的loss赋予不同的权重
    > - benign_loss前面乘上一个较大的系数，就会使模型更倾向于准确预测正常样本；
    > - adv_loss前面乘上一个较大的系数，就会使模型更倾向于准确预测对抗样本

In [7]:
def adv_train_two_step(data_loader, epoch, lr, criterion, adv_loss_weight=1):
    model = LeNet5()
    model.load_state_dict(torch.load('model/lenet5.pt'))
    model.train()

    optimizer = torch.optim.SGD(model.parameters(), lr=lr)

    for e in range(epoch):
        t = tqdm(data_loader)
        for img, label in t:
            benign_img, benign_label = img[:img.shape[0] // 2], label[:label.shape[0] // 2]
            adv_img, adv_label = img[img.shape[0] // 2:], label[label.shape[0] // 2:]

            optimizer.zero_grad()

            # TODO: Forward and compute loss for benign samples
            benign_loss = criterion(model(benign_img), benign_label)
            
            # TODO: Forward and compute loss for adversarial examples
            adv_loss = criterion(model(adv_img), adv_label)
            
            # TODO: Calculate the total loss, then backward
  
            loss = benign_loss + adv_loss_weight * adv_loss
            loss.backward()
            optimizer.step()

            t.set_postfix(epoch=e, benign_loss=benign_loss.item(), adv_loss=adv_loss.item())

    return model

- 使用fgsm进行对抗训练

In [8]:
lr = 0.01
epoch = 20

cnn_fgsm_two_step = adv_train_two_step(fgsm_trainloader, epoch, lr, criterion)
torch.save(cnn_fgsm_two_step.state_dict(), 'model/cnn_fgsm_two_step.pt')

  model.load_state_dict(torch.load('model/lenet5.pt'))
100%|██████████| 469/469 [00:10<00:00, 44.74it/s, adv_loss=0.0485, benign_loss=0.0664, epoch=0]
100%|██████████| 469/469 [00:10<00:00, 44.25it/s, adv_loss=0.0202, benign_loss=0.0443, epoch=1] 
100%|██████████| 469/469 [00:10<00:00, 46.14it/s, adv_loss=0.0115, benign_loss=0.0329, epoch=2] 
100%|██████████| 469/469 [00:10<00:00, 45.14it/s, adv_loss=0.00797, benign_loss=0.0264, epoch=3]
100%|██████████| 469/469 [00:10<00:00, 44.87it/s, adv_loss=0.00618, benign_loss=0.0223, epoch=4] 
100%|██████████| 469/469 [00:10<00:00, 44.97it/s, adv_loss=0.0048, benign_loss=0.0188, epoch=5]  
100%|██████████| 469/469 [00:10<00:00, 45.26it/s, adv_loss=0.00383, benign_loss=0.0159, epoch=6]  
100%|██████████| 469/469 [00:10<00:00, 45.74it/s, adv_loss=0.00321, benign_loss=0.0142, epoch=7]  
100%|██████████| 469/469 [00:10<00:00, 46.10it/s, adv_loss=0.00277, benign_loss=0.0128, epoch=8]  
100%|██████████| 469/469 [00:10<00:00, 44.11it/s, adv_loss=0.0023

- 使用pgd进行对抗训练

In [9]:
lr = 0.01
epoch = 20

cnn_pgd_two_step = adv_train_two_step(pgd_trainloader, epoch, lr, criterion)
torch.save(cnn_pgd_two_step.state_dict(), 'model/cnn_pgd_two_step.pt')
# 训练后保存好模型文件，以便检查时快速测试结果

  model.load_state_dict(torch.load('model/lenet5.pt'))
100%|██████████| 469/469 [00:10<00:00, 44.09it/s, adv_loss=0.0641, benign_loss=0.0613, epoch=0]
100%|██████████| 469/469 [00:10<00:00, 45.75it/s, adv_loss=0.0365, benign_loss=0.042, epoch=1] 
100%|██████████| 469/469 [00:10<00:00, 44.65it/s, adv_loss=0.0263, benign_loss=0.0311, epoch=2]
100%|██████████| 469/469 [00:10<00:00, 46.06it/s, adv_loss=0.0211, benign_loss=0.0267, epoch=3] 
100%|██████████| 469/469 [00:10<00:00, 43.91it/s, adv_loss=0.0162, benign_loss=0.0221, epoch=4] 
100%|██████████| 469/469 [00:10<00:00, 45.62it/s, adv_loss=0.0137, benign_loss=0.02, epoch=5]    
100%|██████████| 469/469 [00:10<00:00, 43.46it/s, adv_loss=0.0113, benign_loss=0.0184, epoch=6]  
100%|██████████| 469/469 [00:11<00:00, 40.95it/s, adv_loss=0.0095, benign_loss=0.0167, epoch=7]  
100%|██████████| 469/469 [00:11<00:00, 42.35it/s, adv_loss=0.00802, benign_loss=0.0152, epoch=8] 
100%|██████████| 469/469 [00:10<00:00, 43.69it/s, adv_loss=0.00687, ben

### 评测模型性能
- 请在Python文件Week567_General_Code_Question.py中补全函数如下：
  - 在`evaluate_dataloader(dataloader, model)`函数实现模型测试过程

In [10]:
from Week567_General_Code_Question import evaluate_dataloader

- 测试基于FGSM执行Two-Step对抗训练的CNN的预测质量

In [11]:
evaluate_dataloader(test_loader, cnn_fgsm_two_step)

100%|██████████| 79/79 [00:00<00:00, 104.58it/s, test_acc=0.913]


- 测试基于PGD执行Two-Step对抗训练的CNN的预测质量

In [12]:
evaluate_dataloader(test_loader, cnn_pgd_two_step)

100%|██████████| 79/79 [00:00<00:00, 101.82it/s, test_acc=0.961]


### 评测防御效果

In [13]:
with open('data/Week567_img_label.pkl', 'rb') as f:
    data = pickle.load(f)
    imgs, labels = data['img'], data['label']

- 评测基于FGSM执行Two-Step对抗训练的模型针对FGSM/PGD攻击的防御效果

In [14]:
print("For FGSM Two-Step.\n")
print("Against FGSM:")
epsilon = 0.08 # TODO: epsilon=0.03和0.2的情况也需要测试

adv_xs = fgsm(imgs, epsilon, cnn_fgsm_two_step, criterion, labels)
pred_label = evaluate(adv_xs, labels, cnn_fgsm_two_step)


print("Against PGD:")
alpha = 0.07
iter = 30

adv_xs = pgd(imgs, epsilon, alpha, iter, cnn_fgsm_two_step, criterion, labels)

pred_label = evaluate(adv_xs, labels, cnn_fgsm_two_step)

For FGSM Two-Step.

Against FGSM:
match rate: 0.1
Against PGD:
match rate: 0.1


- 评测基于PGD执行Two-Step对抗训练的模型针对FGSM/PGD攻击的防御效果

In [15]:
print("For PGD Two-Step.\n")
print("Against FGSM:")
epsilon = 0.08 # TODO: epsilon=0.03和0.2的情况也需要测试

adv_xs = fgsm(imgs, epsilon, cnn_pgd_two_step, criterion, labels)
pred_label = evaluate(adv_xs, labels, cnn_pgd_two_step)


print("Against PGD:")
alpha = 0.07
iter = 30

adv_xs = pgd(imgs, epsilon, alpha, iter, cnn_pgd_two_step, criterion, labels)

pred_label = evaluate(adv_xs, labels, cnn_pgd_two_step)

For PGD Two-Step.

Against FGSM:
match rate: 0.45
Against PGD:
match rate: 0.3


---

## 2. 迭代对抗训练

- 在上一部分中，你已经实现了第一个对抗防御算法；
- 接下来，请模仿**Two-Step对抗训练**算法实现**迭代对抗训练**算法，并测试其在训练集、测试集上的预测表现，以及其对FGSM、PGD的防御效果；

- 具体实验步骤如下：

  1. 将代码文件（Python文件与Notebook文件）上传到服务器端根目录；

  2. 将样本数据（Week567_img_label.pkl）上传至服务器端data/目录下；

  3. 将之前训练的模型参数（lenet5.pt）上传至服务器端model/目录下；

  4. 依照提示，完成**Notebook文件**中的TODO内容；

### 实现迭代对抗训练
- 请在下面的block中分别实现基于FGSM和PGD的迭代对抗训练攻击函数：
  - adv_train_iter_fgsm(data_loader, epoch, lr, criterion, epsilon, adv_loss_weight=1.)
  - adv_train_iter_pgd(data_loader, epoch, lr, criterion, epsilon, iter=20, adv_loss_weight=1.)
- 算法流程
  1. 从data_loader中取出正常样本对(img,label)
  2. 使用之前实现的FGSM/PGD算法，基于(img,label)生成对抗样本(adv_img,label)
      > tips: 之前版本实现的FGSM/PGD算法最后包含了`.detach()`操作，因此梯度不会传递到adv_img上
  3. 基于正常样本和对抗样本分别计算loss然后求和，再反传梯度更新模型

- 基于FGSM迭代对抗训练cnn模型

In [16]:
def adv_train_iter_fgsm(data_loader, epoch, lr, criterion, epsilon, adv_loss_weight=1.):
    model = LeNet5()
    model.load_state_dict(torch.load('model/lenet5.pt'))
    model.train()

    optimizer = torch.optim.SGD(model.parameters(), lr=lr)

    for e in range(epoch):
        t = tqdm(data_loader)
        for img, label in t:

            optimizer.zero_grad()

            # TODO: Forward and compute loss for benign samples
            benign_loss = criterion(model(img), label)

            # TODO: Generate the adversarial samples, then forward and compute loss for adversarial examples
            adv_img = fgsm(img, epsilon, model, criterion, label)
            adv_loss = criterion(model(adv_img), label)
            
            # TODO: Calculate the total loss, then backward
            loss = benign_loss + adv_loss_weight * adv_loss
            loss.backward()
            optimizer.step()

            t.set_postfix(epoch=e, benign_loss=benign_loss.item(), adv_loss=adv_loss.item())

    return model

In [17]:
lr = 0.01
epoch = 20
adv_loss_weight = 1.0

epsilon = 0.2 

cnn_fgsm_iter = adv_train_iter_fgsm(train_loader, epoch, lr, criterion, epsilon, adv_loss_weight)
torch.save(cnn_fgsm_iter.state_dict(), 'model/cnn_fgsm_iter.pt')

  model.load_state_dict(torch.load('model/lenet5.pt'))
100%|██████████| 469/469 [00:30<00:00, 15.41it/s, adv_loss=0.598, benign_loss=0.109, epoch=0] 
100%|██████████| 469/469 [00:30<00:00, 15.47it/s, adv_loss=0.659, benign_loss=0.122, epoch=1] 
100%|██████████| 469/469 [00:29<00:00, 15.79it/s, adv_loss=0.532, benign_loss=0.0911, epoch=2]
100%|██████████| 469/469 [00:30<00:00, 15.29it/s, adv_loss=0.514, benign_loss=0.0705, epoch=3]
100%|██████████| 469/469 [00:29<00:00, 15.89it/s, adv_loss=0.441, benign_loss=0.0818, epoch=4]
100%|██████████| 469/469 [00:30<00:00, 15.43it/s, adv_loss=0.343, benign_loss=0.0685, epoch=5]
100%|██████████| 469/469 [00:29<00:00, 15.93it/s, adv_loss=0.515, benign_loss=0.0644, epoch=6]
100%|██████████| 469/469 [00:28<00:00, 16.47it/s, adv_loss=0.385, benign_loss=0.0531, epoch=7]
100%|██████████| 469/469 [00:30<00:00, 15.47it/s, adv_loss=0.542, benign_loss=0.141, epoch=8] 
100%|██████████| 469/469 [00:29<00:00, 16.05it/s, adv_loss=0.484, benign_loss=0.0456, epoc

- 基于PGD迭代对抗训练cnn模型

In [18]:
def adv_train_iter_pgd(data_loader, epoch, lr, criterion, epsilon, alpha, iter=20, adv_loss_weight=1.):
    model = LeNet5()
    model.load_state_dict(torch.load('model/lenet5.pt'))
    model.train()

    optimizer = torch.optim.SGD(model.parameters(), lr=lr)

    for e in range(epoch):
        t = tqdm(data_loader)
        for img, label in t:

            optimizer.zero_grad()

            # TODO: Forward and compute loss for benign samples
            benign_loss = criterion(model(img), label)

            # TODO: Generate the adversarial samples, then forward and compute loss for adversarial examples
            adv_img = pgd(img, epsilon, alpha, iter, model, criterion, label)
            adv_loss = criterion(model(adv_img), label)
            
            # TODO: Calculate the total loss, then backward
            loss = benign_loss + adv_loss_weight * adv_loss
            loss.backward()
            optimizer.step()

            t.set_postfix(epoch=e, benign_loss=benign_loss.item(), adv_loss=adv_loss.item())

    return model

In [19]:
lr = 0.01
epoch = 20
adv_loss_weight = 1.0

epsilon = 0.2
iter = 20

cnn_pgd_iter = adv_train_iter_pgd(train_loader, epoch, lr, criterion, epsilon, alpha, iter, adv_loss_weight)
torch.save(cnn_pgd_iter.state_dict(), 'model/cnn_pgd_iter.pt')
# 训练后保存好模型文件，以便检查时快速测试结果

  model.load_state_dict(torch.load('model/lenet5.pt'))
100%|██████████| 469/469 [01:56<00:00,  4.03it/s, adv_loss=0.44, benign_loss=0.135, epoch=0]  
100%|██████████| 469/469 [01:56<00:00,  4.01it/s, adv_loss=0.452, benign_loss=0.0864, epoch=1]
100%|██████████| 469/469 [01:56<00:00,  4.02it/s, adv_loss=0.24, benign_loss=0.0796, epoch=2] 
100%|██████████| 469/469 [01:56<00:00,  4.02it/s, adv_loss=0.454, benign_loss=0.092, epoch=3] 
100%|██████████| 469/469 [01:57<00:00,  4.00it/s, adv_loss=0.39, benign_loss=0.0988, epoch=4] 
100%|██████████| 469/469 [01:56<00:00,  4.01it/s, adv_loss=0.359, benign_loss=0.0598, epoch=5]
100%|██████████| 469/469 [01:55<00:00,  4.05it/s, adv_loss=0.398, benign_loss=0.0535, epoch=6] 
100%|██████████| 469/469 [01:56<00:00,  4.03it/s, adv_loss=0.306, benign_loss=0.0698, epoch=7]
100%|██████████| 469/469 [01:57<00:00,  3.98it/s, adv_loss=0.348, benign_loss=0.0514, epoch=8]
100%|██████████| 469/469 [01:56<00:00,  4.02it/s, adv_loss=0.475, benign_loss=0.148, epoc

### 评测模型性能

- 测试基于FGSM执行迭代对抗训练的CNN的预测质量

In [20]:
evaluate_dataloader(test_loader, cnn_fgsm_iter)

100%|██████████| 79/79 [00:00<00:00, 106.96it/s, test_acc=0.99] 


- 测试基于PGD执行迭代对抗训练的CNN的预测质量

In [21]:
evaluate_dataloader(test_loader, cnn_pgd_iter)

100%|██████████| 79/79 [00:00<00:00, 104.98it/s, test_acc=0.983]


### 评测防御效果

In [22]:
with open('data/Week567_img_label.pkl', 'rb') as f:
    data = pickle.load(f)
    imgs, labels = data['img'], data['label']

- 评测基于FGSM执行迭代对抗训练的模型针对FGSM/PGD攻击的防御效果

In [23]:
print("For FGSM Iterative.\n")
print("Against FGSM:")
epsilon = 0.2  # TODO: epsilon=0.08和0.3的情况也需要测试

adv_xs = fgsm(imgs, epsilon, cnn_fgsm_iter, criterion, labels)
pred_label = evaluate(adv_xs, labels, cnn_fgsm_iter)


print("Against PGD:")
alpha = 0.07
iter = 30

adv_xs = pgd(imgs, epsilon, alpha, iter, cnn_fgsm_iter, criterion, labels)

pred_label = evaluate(adv_xs, labels, cnn_fgsm_iter)

For FGSM Iterative.

Against FGSM:
match rate: 0.8
Against PGD:
match rate: 0.6


- 评测基于PGD执行迭代对抗训练的模型针对FGSM/PGD攻击的防御效果

In [24]:
print("For PGD Iterative.\n")
print("Against FGSM:")
epsilon = 0.2 # TODO: epsilon=0.08和0.3的情况也需要测试

adv_xs = fgsm(imgs, epsilon, cnn_pgd_iter, criterion, labels)
pred_label = evaluate(adv_xs, labels, cnn_pgd_iter)


print("Against PGD:")
alpha = 0.07
iter = 30

adv_xs = pgd(imgs, epsilon, alpha, iter, cnn_pgd_iter, criterion, labels)

pred_label = evaluate(adv_xs, labels, cnn_pgd_iter)

For PGD Iterative.

Against FGSM:
match rate: 0.9
Against PGD:
match rate: 0.8
