In [1]:
import numpy as np 
import pandas as pd

In [2]:
import torch
import torchvision.datasets as datasets
import torchvision.transforms as transforms
from torch.utils.data.sampler import SubsetRandomSampler
import torch.utils.data as DataUtils
import numpy as np
import time
import sys
import torch.nn as nn
import torch.nn.functional as F

# Readymade data loading function
DATA_ROOT='./MNISTData/'
def getMNISTDataLoaders(batchSize=64, nTrain=50000, nVal=10000, nTest=10000):
  # You can use technically use the same transform instance for all 3 sets
  assert (60000 - nVal) == nTrain, 'nTrain + nVal must be equal to 60000'
  trainTransform = transforms.Compose([transforms.ToTensor()])
  valTransform = transforms.Compose([transforms.ToTensor()])
  testTransform = transforms.Compose([transforms.ToTensor()])
  
  trainSet = datasets.MNIST(root=DATA_ROOT, download=True, train=True, \
                           transform=trainTransform)
  valSet = datasets.MNIST(root=DATA_ROOT, download=True, train=True, \
                         transform=valTransform)
  testSet = datasets.MNIST(root=DATA_ROOT, download=True, train=False, \
                                 transform=testTransform)
  
  indices = np.arange(0, 60000)
  np.random.shuffle(indices)
  
  trainSampler = SubsetRandomSampler(indices[:nTrain])
  valSampler = SubsetRandomSampler(indices[nTrain:])
  testSampler = SubsetRandomSampler(np.arange(0, nTest))
  
  trainLoader = DataUtils.DataLoader(trainSet, batch_size=batchSize, \
                                   sampler=trainSampler)
  valLoader = DataUtils.DataLoader(valSet, batch_size=batchSize, \
                                  sampler=valSampler)
  testLoader = DataUtils.DataLoader(testSet, batch_size=batchSize, \
                                    sampler=testSampler)
  return trainLoader, valLoader, testLoader

In [3]:
class Net(nn.Module):
    def __init__(self):
        super(Net, self).__init__()
        
        self.conv1 = nn.Sequential(
            nn.Conv2d(1, 32, 3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(32),
            nn.Conv2d(32, 32, 3, stride=2, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(32),
            nn.MaxPool2d(2, 2),
            nn.Dropout(0.25)
        )
        
        self.conv2 = nn.Sequential(
            nn.Conv2d(32, 64, 3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.Conv2d(64, 64, 3, stride=2, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(64),
            nn.MaxPool2d(2, 2),
            nn.Dropout(0.25)
        )
                
        self.conv3 = nn.Sequential(
            nn.Conv2d(64, 128, 3, padding=1),
            nn.ReLU(),
            nn.BatchNorm2d(128),
            nn.MaxPool2d(2, 2),
            nn.Dropout(0.25)
        )
        
        self.fc = nn.Sequential(
            nn.Linear(128, 10),
        )
                
        
    def forward(self, x):
        x = self.conv1(x)
        x = self.conv2(x)
        x = self.conv3(x)
        
        x = x.view(x.size(0), -1)
        return self.fc(x)

In [4]:
device = 'cuda' if torch.cuda.is_available() else 'cpu'
print('Notebook will use PyTorch Device: ' + device.upper())

Notebook will use PyTorch Device: CUDA


In [5]:
# Utility Progress Bar Function
def progress(curr, total, suffix=''):
  bar_len = 48
  filled = int(round(bar_len * curr / float(total)))
  if filled == 0:
    filled = 1
  bar = '=' * (filled - 1) + '>' + '-' * (bar_len - filled)
  sys.stdout.write('\r[%s] .. %s' % (bar, suffix))
  sys.stdout.flush()
  if curr == total:
    bar = bar_len * '='
    sys.stdout.write('\r[%s] .. %s .. Completed\n' % (bar, suffix))

In [6]:
model = Net().to(device)
criterion = nn.CrossEntropyLoss()
learning_rate = 1e-2
optimizer = torch.optim.Adam(model.parameters(), lr=learning_rate)
n_epochs = 1
lr = 1e-2
step = 0
model.train()

Net(
  (conv1): Sequential(
    (0): Conv2d(1, 32, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (3): Conv2d(32, 32, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (4): ReLU()
    (5): BatchNorm2d(32, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (6): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (7): Dropout(p=0.25, inplace=False)
  )
  (conv2): Sequential(
    (0): Conv2d(32, 64, kernel_size=(3, 3), stride=(1, 1), padding=(1, 1))
    (1): ReLU()
    (2): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (3): Conv2d(64, 64, kernel_size=(3, 3), stride=(2, 2), padding=(1, 1))
    (4): ReLU()
    (5): BatchNorm2d(64, eps=1e-05, momentum=0.1, affine=True, track_running_stats=True)
    (6): MaxPool2d(kernel_size=2, stride=2, padding=0, dilation=1, ceil_mode=False)
    (7): Dropout(p

In [7]:
train_loader, val_loader, test_loader = getMNISTDataLoaders()
start_time = time.time()
"""
for i in range(n_epochs):
  for j, (images, labels) in enumerate(train_loader):
    images, labels = images.to(device), labels.to(device)
    optimizer.zero_grad()
    logits = model(images)
    loss = criterion(logits, labels)
    loss.backward()
    optimizer.step()
    if j % 8 == 0:
      progress(j+1, len(train_loader), 'Batch [{}/{}] Epoch [{}/{}] Loss = {:.3f}'.format(j+1, len(train_loader), i+1, n_epochs, loss.item()))
    step += 1
end_time = time.time()
print('\nTotal training steps = {}'.format(step))
print('Total time taken = {}'.format(end_time - start_time))
"""


0it [00:00, ?it/s]

Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz to ./MNISTData/MNIST/raw/train-images-idx3-ubyte.gz


9920512it [00:01, 9136025.22it/s]                            


Extracting ./MNISTData/MNIST/raw/train-images-idx3-ubyte.gz to ./MNISTData/MNIST/raw


0it [00:00, ?it/s]

Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz to ./MNISTData/MNIST/raw/train-labels-idx1-ubyte.gz


32768it [00:00, 88371.31it/s]            
  0%|          | 0/1648877 [00:00<?, ?it/s]

Extracting ./MNISTData/MNIST/raw/train-labels-idx1-ubyte.gz to ./MNISTData/MNIST/raw
Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz to ./MNISTData/MNIST/raw/t10k-images-idx3-ubyte.gz


1654784it [00:00, 2320725.21it/s]                           
0it [00:00, ?it/s]

Extracting ./MNISTData/MNIST/raw/t10k-images-idx3-ubyte.gz to ./MNISTData/MNIST/raw
Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz to ./MNISTData/MNIST/raw/t10k-labels-idx1-ubyte.gz


8192it [00:00, 29756.34it/s]            

Extracting ./MNISTData/MNIST/raw/t10k-labels-idx1-ubyte.gz to ./MNISTData/MNIST/raw
Processing...
Done!





"\nfor i in range(n_epochs):\n  for j, (images, labels) in enumerate(train_loader):\n    images, labels = images.to(device), labels.to(device)\n    optimizer.zero_grad()\n    logits = model(images)\n    loss = criterion(logits, labels)\n    loss.backward()\n    optimizer.step()\n    if j % 8 == 0:\n      progress(j+1, len(train_loader), 'Batch [{}/{}] Epoch [{}/{}] Loss = {:.3f}'.format(j+1, len(train_loader), i+1, n_epochs, loss.item()))\n    step += 1\nend_time = time.time()\nprint('\nTotal training steps = {}'.format(step))\nprint('Total time taken = {}'.format(end_time - start_time))\n"

In [8]:
!pip install advertorch > /dev/null
import advertorch
print(advertorch.__version__)

0.1.5


In [9]:

# Evaluating against FGSM attack
from advertorch.attacks import GradientSignAttack
"""
# Documentation for this attack can be found at the link below
# https://advertorch.readthedocs.io/en/latest/advertorch/attacks.html#advertorch.attacks.GradientSignAttack
adversary_1 = GradientSignAttack(model, eps=0.3)
correct = 0
model.eval()
for j, (images, labels) in enumerate(test_loader):
  images, labels = images.to(device), labels.to(device)
  adv_images_1 = adversary_1.perturb(images, labels) # This is extra step as compared to normal clean accuracy testing
  logits = model(adv_images_1)
  _, preds = torch.max(logits, 1)
  correct += (preds == labels).sum().item()
  progress(j+1, len(test_loader), 'Batch [{}/{}]'.format(j+1, len(test_loader)))
model.train()
print('Accuracy on FGSM adversarial samples = {}%'.format(float(correct) * 100 / 10000))
"""

"\n# Documentation for this attack can be found at the link below\n# https://advertorch.readthedocs.io/en/latest/advertorch/attacks.html#advertorch.attacks.GradientSignAttack\nadversary_1 = GradientSignAttack(model, eps=0.3)\ncorrect = 0\nmodel.eval()\nfor j, (images, labels) in enumerate(test_loader):\n  images, labels = images.to(device), labels.to(device)\n  adv_images_1 = adversary_1.perturb(images, labels) # This is extra step as compared to normal clean accuracy testing\n  logits = model(adv_images_1)\n  _, preds = torch.max(logits, 1)\n  correct += (preds == labels).sum().item()\n  progress(j+1, len(test_loader), 'Batch [{}/{}]'.format(j+1, len(test_loader)))\nmodel.train()\nprint('Accuracy on FGSM adversarial samples = {}%'.format(float(correct) * 100 / 10000))\n"

In [10]:

# Evaluating against IFSGM attack
from advertorch.attacks import LinfBasicIterativeAttack
"""
# Documentation for this attack can be found at the link below
# https://advertorch.readthedocs.io/en/latest/advertorch/attacks.html#advertorch.attacks.GradientSignAttack
adversary_2 = LinfBasicIterativeAttack(model, eps=0.1 , nb_iter=40)
correct = 0
model.eval()
for j, (images, labels) in enumerate(test_loader):
  images, labels = images.to(device), labels.to(device)
  adv_images_2 = adversary_2.perturb(images, labels) # This is extra step as compared to normal clean accuracy testing
  logits = model(adv_images_2)
  _, preds = torch.max(logits, 1)
  correct += (preds == labels).sum().item()
  progress(j+1, len(test_loader), 'Batch [{}/{}]'.format(j+1, len(test_loader)))
model.train()
print('Accuracy on FGSM adversarial samples = {}%'.format(float(correct) * 100 / 10000))
"""

"\n# Documentation for this attack can be found at the link below\n# https://advertorch.readthedocs.io/en/latest/advertorch/attacks.html#advertorch.attacks.GradientSignAttack\nadversary_2 = LinfBasicIterativeAttack(model, eps=0.1 , nb_iter=40)\ncorrect = 0\nmodel.eval()\nfor j, (images, labels) in enumerate(test_loader):\n  images, labels = images.to(device), labels.to(device)\n  adv_images_2 = adversary_2.perturb(images, labels) # This is extra step as compared to normal clean accuracy testing\n  logits = model(adv_images_2)\n  _, preds = torch.max(logits, 1)\n  correct += (preds == labels).sum().item()\n  progress(j+1, len(test_loader), 'Batch [{}/{}]'.format(j+1, len(test_loader)))\nmodel.train()\nprint('Accuracy on FGSM adversarial samples = {}%'.format(float(correct) * 100 / 10000))\n"

In [11]:

# Evaluating against PGD attack
from advertorch.attacks import LinfPGDAttack
"""
# Documentation for this attack can be found at the link below
# https://advertorch.readthedocs.io/en/latest/advertorch/attacks.html#advertorch.attacks.GradientSignAttack
adversary_3 = LinfPGDAttack(model, eps=0.3 , nb_iter = 40)
correct = 0
model.eval()
for j, (images, labels) in enumerate(test_loader):
  images, labels = images.to(device), labels.to(device)
  adv_images_3 = adversary_3.perturb(images, labels) # This is extra step as compared to normal clean accuracy testing
  logits = model(adv_images_3)
  _, preds = torch.max(logits, 1)
  correct += (preds == labels).sum().item()
  progress(j+1, len(test_loader), 'Batch [{}/{}]'.format(j+1, len(test_loader)))
model.train()
print('Accuracy on FGSM adversarial samples = {}%'.format(float(correct) * 100 / 10000))
"""

"\n# Documentation for this attack can be found at the link below\n# https://advertorch.readthedocs.io/en/latest/advertorch/attacks.html#advertorch.attacks.GradientSignAttack\nadversary_3 = LinfPGDAttack(model, eps=0.3 , nb_iter = 40)\ncorrect = 0\nmodel.eval()\nfor j, (images, labels) in enumerate(test_loader):\n  images, labels = images.to(device), labels.to(device)\n  adv_images_3 = adversary_3.perturb(images, labels) # This is extra step as compared to normal clean accuracy testing\n  logits = model(adv_images_3)\n  _, preds = torch.max(logits, 1)\n  correct += (preds == labels).sum().item()\n  progress(j+1, len(test_loader), 'Batch [{}/{}]'.format(j+1, len(test_loader)))\nmodel.train()\nprint('Accuracy on FGSM adversarial samples = {}%'.format(float(correct) * 100 / 10000))\n"

In [12]:
n_epochs = 70
lr = 1e-2
step = 0
xent_loss = nn.CrossEntropyLoss()
adv_model = model
adv_model.train()
optimizer = torch.optim.SGD(adv_model.parameters(), lr=lr, momentum=0.9)

train_loader, val_loader, test_loader = getMNISTDataLoaders()
start_time = time.time()

In [13]:
"""
Although not officially mentioned, making `size_average=False` for the loss 
function improves reliability of the result in PyTorch 0.4.0. This is required
since we are taking step against the gradient for "every" image in the batch.
So reducing them to a single value won't cut it.
"""
#training on FSGM
advertorch_loss_fn = nn.CrossEntropyLoss(size_average=False)
for i in range(n_epochs):
  for j, (images, labels) in enumerate(train_loader):
    images, labels = images.to(device), labels.to(device)
    """
    Creating the adversary :
    ------------------------
    Adversarial examples should be typically generated when model parameters are not 
    changing i.e. model parameters are frozen. This step may not be required for very
    simple linear models, but is a must for models using components such as dropout 
    or batch normalization.
    """
    adv_model.eval() # Freezes the model parameters
    """
    The `clip` values here determine the clipping range after taking the adversarial step
    The clipping is essential to keep the domain of input images within the range
    MNIST images for this notebook are normalized to [0, 1]. If you're using something else, 
    make sure to modify these values accordingly. The `eps` value decides the magnitude
    of the attack. For all MNIST models, the threat model advises to stick to maximum eps of 0.3 
    for input in range [0, 1]
    """
    fgsm_adversary = GradientSignAttack(adv_model, advertorch_loss_fn, eps=0.3, clip_min=0., \
                    clip_max=1., targeted=False)
    adv_images = fgsm_adversary.perturb(images, labels) # Generate adversarial samples
    ifgsm_adversary = LinfBasicIterativeAttack(adv_model, advertorch_loss_fn, eps=0.1, clip_min=0., \
                    clip_max=1., targeted=False , nb_iter=40)
    adv_images_2 = ifgsm_adversary.perturb(images, labels) # Generate adversarial samples
    pgd_adversary = LinfPGDAttack(adv_model, advertorch_loss_fn, eps=0.3, clip_min=0., \
                    clip_max=1., targeted=False , nb_iter = 40)
    adv_images_3 = pgd_adversary.perturb(images, labels)
    adv_model.train() # Allows model parameters to be changed again
    optimizer.zero_grad()
    logits = adv_model(images)
    loss = criterion(logits, labels)
    loss.backward()
    optimizer.step()
    train_images = adv_images 
    train_labels = labels
    optimizer.zero_grad()
    logits = adv_model(train_images)
    loss = xent_loss(logits, train_labels)
    loss.backward()
    optimizer.step()
    train_images = adv_images_2 
    train_labels = labels
    optimizer.zero_grad()
    logits = adv_model(train_images)
    loss = xent_loss(logits, train_labels)
    loss.backward()
    optimizer.step()
    train_images = adv_images_3 
    train_labels = labels
    optimizer.zero_grad()
    logits = adv_model(train_images)
    loss = xent_loss(logits, train_labels)
    loss.backward()
    optimizer.step()
    if j % 8 == 0:
      progress(j+1, len(train_loader), 'Batch [{}/{}] Epoch [{}/{}] Loss = {:.3f}'.format(j+1, len(train_loader), i+1, n_epochs, loss.item()))
    step += 1

    end_time = time.time()
print('\nTotal training steps = {}'.format(step))
print('Total time taken = {}'.format(end_time - start_time))



Total training steps = 54740
Total time taken = 16093.067623853683


In [14]:

correct = 0
model.eval()
for j, (images, labels) in enumerate(test_loader):
  images, labels = images.to(device), labels.to(device)
  logits = model(images)
  _, preds = torch.max(logits, 1)
  correct += (preds == labels).sum().item()
  progress(j+1, len(test_loader), 'Batch [{}/{}]'.format(j+1, len(test_loader)))
model.train()
print('Accuracy = {}%'.format(float(correct) * 100 / 10000))


Accuracy = 99.35%


In [15]:
# Evaluating against FGSM attack

from advertorch.attacks import GradientSignAttack
# Documentation for this attack can be found at the link below
# https://advertorch.readthedocs.io/en/latest/advertorch/attacks.html#advertorch.attacks.GradientSignAttack
adversary_1 = GradientSignAttack(adv_model, eps=0.3)
correct = 0
adv_model.eval()
for j, (images, labels) in enumerate(test_loader):
  images, labels = images.to(device), labels.to(device)
  adv_images_1 = adversary_1.perturb(images, labels)
  logits = adv_model(adv_images_1)
  _, preds = torch.max(logits, 1)
  correct += (preds == labels).sum().item()
  progress(j+1, len(test_loader), 'Batch [{}/{}]'.format(j+1, len(test_loader)))
adv_model.train()
print('Accuracy on FGSM adversarial samples = {}%'.format(float(correct) * 100 / 10000))


Accuracy on FGSM adversarial samples = 96.81%


In [16]:
# Evaluating against I-FGSM attack

from advertorch.attacks import LinfBasicIterativeAttack
# Documentation for this attack can be found at the link below
# https://advertorch.readthedocs.io/en/latest/advertorch/attacks.html#advertorch.attacks.GradientSignAttack
adversary_2 = LinfBasicIterativeAttack(adv_model, eps=0.1 , nb_iter=40)
correct = 0
adv_model.eval()
for j, (images, labels) in enumerate(test_loader):
  images, labels = images.to(device), labels.to(device)
  adv_images_2 = adversary_2.perturb(images, labels)
  logits = adv_model(adv_images_2)
  _, preds = torch.max(logits, 1)
  correct += (preds == labels).sum().item()
  progress(j+1, len(test_loader), 'Batch [{}/{}]'.format(j+1, len(test_loader)))
adv_model.train()
print('Accuracy on FGSM adversarial samples = {}%'.format(float(correct) * 100 / 10000))


Accuracy on FGSM adversarial samples = 98.45%


In [17]:
# Evaluating against PGD attack

from advertorch.attacks import LinfPGDAttack
# Documentation for this attack can be found at the link below
# https://advertorch.readthedocs.io/en/latest/advertorch/attacks.html#advertorch.attacks.GradientSignAttack
adversary_3 = LinfPGDAttack(adv_model, eps=0.3)
correct = 0
adv_model.eval()
for j, (images, labels) in enumerate(test_loader):
  images, labels = images.to(device), labels.to(device)
  adv_images_3 = adversary_3.perturb(images, labels)
  logits = adv_model(adv_images_3)
  _, preds = torch.max(logits, 1)
  correct += (preds == labels).sum().item()
  progress(j+1, len(test_loader), 'Batch [{}/{}]'.format(j+1, len(test_loader)))
adv_model.train()
print('Accuracy on FGSM adversarial samples = {}%'.format(float(correct) * 100 / 10000))


Accuracy on FGSM adversarial samples = 95.13%


In [18]:
torch.save(adv_model.state_dict(), 'model.pt')