# Copyright (c) 2018-present, Royal Bank of Canada.
# All rights reserved.
# This source code is licensed under the license found in the
# LICENSE file in the root directory of this source tree.
from __future__ import print_function
import os
import argparse
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from advertorch.context import ctx_noparamgrad_and_eval
from advertorch.test_utils import LeNet5
from advertorch_examples.utils import get_mnist_train_loader
from advertorch_examples.utils import get_mnist_test_loader
from advertorch_examples.utils import TRAINED_MODEL_PATH
if __name__ == '__main__':
parser = argparse.ArgumentParser(description='Train MNIST')
parser.add_argument('--seed', default=0, type=int)
parser.add_argument('--mode', default="cln", help="cln | adv")
parser.add_argument('--train_batch_size', default=50, type=int)
parser.add_argument('--test_batch_size', default=1000, type=int)
parser.add_argument('--log_interval', default=200, type=int)
args = parser.parse_args()
use_cuda = torch.cuda.is_available()
device = torch.device("cuda" if use_cuda else "cpu")
if args.mode == "cln":
flag_advtrain = False
nb_epoch = 10
model_filename = ""
elif args.mode == "adv":
flag_advtrain = True
nb_epoch = 90
model_filename = ""
train_loader = get_mnist_train_loader(
batch_size=args.train_batch_size, shuffle=True)
test_loader = get_mnist_test_loader(
batch_size=args.test_batch_size, shuffle=False)
model = LeNet5()
optimizer = optim.Adam(model.parameters(), lr=1e-4)
if flag_advtrain:
from advertorch.attacks import LinfPGDAttack
adversary = LinfPGDAttack(
model, loss_fn=nn.CrossEntropyLoss(reduction="sum"), eps=0.3,
nb_iter=40, eps_iter=0.01, rand_init=True, clip_min=0.0,
clip_max=1.0, targeted=False)
for epoch in range(nb_epoch):
for batch_idx, (data, target) in enumerate(train_loader):
data, target =,
ori = data
if flag_advtrain:
# when performing attack, the model needs to be in eval mode
# also the parameters should be accumulating gradients
with ctx_noparamgrad_and_eval(model):
data = adversary.perturb(data, target)
output = model(data)
loss = F.cross_entropy(
output, target, reduction='elementwise_mean')
if batch_idx % args.log_interval == 0:
print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
epoch, batch_idx *
len(data), len(train_loader.dataset),
100. * batch_idx / len(train_loader), loss.item()))
test_clnloss = 0
clncorrect = 0
if flag_advtrain:
test_advloss = 0
advcorrect = 0
for clndata, target in test_loader:
clndata, target =,
with torch.no_grad():
output = model(clndata)
test_clnloss += F.cross_entropy(
output, target, reduction='sum').item()
pred = output.max(1, keepdim=True)[1]
clncorrect += pred.eq(target.view_as(pred)).sum().item()
if flag_advtrain:
advdata = adversary.perturb(clndata, target)
with torch.no_grad():
output = model(advdata)
test_advloss += F.cross_entropy(
output, target, reduction='sum').item()
pred = output.max(1, keepdim=True)[1]
advcorrect += pred.eq(target.view_as(pred)).sum().item()
test_clnloss /= len(test_loader.dataset)
print('\nTest set: avg cln loss: {:.4f},'
' cln acc: {}/{} ({:.0f}%)\n'.format(
test_clnloss, clncorrect, len(test_loader.dataset),
100. * clncorrect / len(test_loader.dataset)))
if flag_advtrain:
test_advloss /= len(test_loader.dataset)
print('Test set: avg adv loss: {:.4f},'
' adv acc: {}/{} ({:.0f}%)\n'.format(
test_advloss, advcorrect, len(test_loader.dataset),
100. * advcorrect / len(test_loader.dataset)))
os.path.join(TRAINED_MODEL_PATH, model_filename))
