<h3> ART SMOOTHADVERSARIAL </h3>

Provably robust deep learning via adversarially trained smoothed classifier.

Link to the paper: [https://arxiv.org/abs/1906.04584](https://arxiv.org/abs/1906.04584)

In [33]:
import torch
import torchvision
import torchvision.transforms as transforms
from torchvision import datasets
from torch.autograd import Variable
import torch.nn as nn
import torch.nn.functional as F
from torch.utils.data import Dataset
import torch.optim as optim
from torch.optim.lr_scheduler import StepLR, MultiStepLR
from torch.utils.data import DataLoader
import numpy as np
import matplotlib.pyplot as plt
import random as random

from art.utils import load_dataset, random_targets, compute_accuracy,load_cifar10
from art.estimators.certification.randomized_smoothing import PyTorchRandomizedSmoothing
from torch.nn import CrossEntropyLoss
from art.estimators.classification.pytorch import PyTorchClassifier 
from art.data_generators import PyTorchDataGenerator

import math
from art.estimators.classification.pytorch import PyTorchClassifier 
from art.data_generators import PyTorchDataGenerator


In [None]:
use_gpu = torch.cuda.is_available()
if use_gpu:
    print("Using CUDA")
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")

<h3> Load Data </h3>

Loading the data to be used to train the model on. Using the CIFAR10 dataset here with data augmentation and preprocessing applied.

Training data: 50000
Test data: 10000
Input Shape: (3,32,32)

In [None]:
batch_size = 64
train_data = datasets.CIFAR10("./dataset_cache", train=True, download=True, transform=transforms.Compose([
            transforms.RandomCrop(32, padding=4),
            transforms.RandomHorizontalFlip(),
            transforms.ToTensor()
        ]))
test_data = datasets.CIFAR10("./dataset_cache", train=False, download=True, transform=transforms.ToTensor())
train_loader = DataLoader(train_data, shuffle=True, batch_size=batch_size, num_workers=1)
test_loader = DataLoader(test_data, shuffle=False, batch_size=batch_size,
                             num_workers=1, pin_memory=True)

In [10]:
num_train_samples = 50000

x_train = torch.zeros((num_train_samples, 3, 32, 32), dtype=torch.float32)
y_train = torch.zeros((num_train_samples,), dtype=torch.uint8)

for i,(data,labels) in enumerate(train_loader):
    x_train[(i) * batch_size : (i+1) * batch_size, :, :, :] = data
    y_train[(i) * batch_size : (i+1) * batch_size] = labels

In [11]:
num_train_samples = 10000

x_test = torch.zeros((num_train_samples, 3, 32, 32), dtype=torch.float32)
y_test = torch.zeros((num_train_samples,), dtype=torch.uint8)

for i,(data,labels) in enumerate(test_loader):
    x_test[(i) * batch_size : (i+1) * batch_size, :, :, :] = data
    y_test[(i) * batch_size : (i+1) * batch_size] = labels

<h3> Train SmoothAdversarial Classifier </h3>

Training the smooth adversarial classifier. Building the model first using the ResNet110 architecture. Then the optimizer and scheduler functions are defined along with the Randomized Smoothing class object. The fit invocation trains the model on the training dataset.

In [44]:
_CIFAR10_MEAN = [0.4914, 0.4822, 0.4465]
_CIFAR10_STDDEV = [0.2023, 0.1994, 0.2010]
class NormalizeLayer(torch.nn.Module):
    """Standardize the channels of a batch of images by subtracting the dataset mean
      and dividing by the dataset standard deviation.
      In order to certify radii in original coordinates rather than standardized coordinates, we
      add the Gaussian noise _before_ standardizing, which is why we have standardization be the first
      layer of the classifier rather than as a part of preprocessing as is typical.
      """

    def __init__(self, means, sds):
        """
        :param means: the channel means
        :param sds: the channel standard deviations
        """
        super(NormalizeLayer, self).__init__()
        self.means = torch.tensor(means).cuda()
        self.sds = torch.tensor(sds).cuda()

    def forward(self, input: torch.tensor):
        (batch_size, num_channels, height, width) = input.shape
        means = self.means.repeat((batch_size, height, width, 1)).permute(0, 3, 1, 2)
        sds = self.sds.repeat((batch_size, height, width, 1)).permute(0, 3, 1, 2)
        return (input - means) / sds
def get_num_classes(dataset: str):
    """Return the number of classes in the dataset. """
    if dataset == "imagenet":
        return 1000
    elif dataset == "cifar10":
        return 10


def get_normalize_layer(dataset: str) -> torch.nn.Module:
    """Return the dataset's normalization layer"""
    if dataset == "cifar10":
        return NormalizeLayer(_CIFAR10_MEAN, _CIFAR10_STDDEV)

In [45]:
def conv3x3(in_planes, out_planes, stride=1):
  " 3x3 convolution with padding "
  return nn.Conv2d(in_planes, out_planes, kernel_size=3, stride=stride, padding=1, bias=False)


class BasicBlock(nn.Module):
  expansion = 1

  def __init__(self, inplanes, planes, stride=1, downsample=None):
    super(BasicBlock, self).__init__()
    self.conv1 = conv3x3(inplanes, planes, stride)
    self.bn1 = nn.BatchNorm2d(planes)
    self.relu = nn.ReLU(inplace=True)
    self.conv2 = conv3x3(planes, planes)
    self.bn2 = nn.BatchNorm2d(planes)
    self.downsample = downsample
    self.stride = stride

  def forward(self, x):
    residual = x

    out = self.conv1(x)
    out = self.bn1(out)
    out = self.relu(out)

    out = self.conv2(out)
    out = self.bn2(out)

    if self.downsample is not None:
      residual = self.downsample(x)

    out += residual
    out = self.relu(out)

    return out


class ResNet_Cifar(nn.Module):

  def __init__(self, block, layers, width=1, num_classes=10):
    super(ResNet_Cifar, self).__init__()
    self.inplanes = 16
    self.conv1 = nn.Conv2d(3, 16, kernel_size=3,
                           stride=1, padding=1, bias=False)
    self.bn1 = nn.BatchNorm2d(16)
    self.relu = nn.ReLU(inplace=True)
    self.layer1 = self._make_layer(block, 16 * width, layers[0])
    self.layer2 = self._make_layer(block, 32 * width, layers[1], stride=2)
    self.layer3 = self._make_layer(block, 64 * width, layers[2], stride=2)
    self.avgpool = nn.AvgPool2d(8, stride=1)
    self.fc = nn.Linear(64 * block.expansion * width, num_classes)

    for m in self.modules():
      if isinstance(m, nn.Conv2d):
        n = m.kernel_size[0] * m.kernel_size[1] * m.out_channels
        m.weight.data.normal_(0, math.sqrt(2. / n))
      elif isinstance(m, nn.BatchNorm2d):
        m.weight.data.fill_(1)
        m.bias.data.zero_()

  def _make_layer(self, block, planes, blocks, stride=1):
    downsample = None
    if stride != 1 or self.inplanes != planes * block.expansion:
      downsample = nn.Sequential(
          nn.Conv2d(self.inplanes, planes * block.expansion,
                    kernel_size=1, stride=stride, bias=False),
          nn.BatchNorm2d(planes * block.expansion)
      )

    layers = []
    layers.append(block(self.inplanes, planes, stride, downsample))
    self.inplanes = planes * block.expansion
    for _ in range(1, blocks):
      layers.append(block(self.inplanes, planes))

    return nn.Sequential(*layers)

  def forward(self, x):
    x = self.conv1(x)
    x = self.bn1(x)
    x = self.relu(x)

    x = self.layer1(x)
    x = self.layer2(x)
    x = self.layer3(x)

    x = self.avgpool(x)
    x = x.view(x.size(0), -1)
    x = self.fc(x)

    return x


def resnet110(**kwargs):
  model = ResNet_Cifar(BasicBlock, [18, 18, 18], width=1, **kwargs)
  return model

In [46]:
model = resnet110()

In [47]:
normalize_layer = get_normalize_layer("cifar10")
model = torch.nn.Sequential(normalize_layer, model)

In [20]:
optimizer = optim.SGD(model.parameters(), lr=0.1, momentum=0.9, weight_decay=1e-4)
scheduler = StepLR(optimizer, step_size=50, gamma=0.1)
loss = CrossEntropyLoss()

In [23]:
sigma_1 = 0.25 #Noise
rs_smoothadv_classifier = PyTorchRandomizedSmoothing(model=model,
                          clip_values=(0.0, 255.0),
                          optimizer=optimizer,
                          scheduler = scheduler,
                          loss = loss,
                          input_shape=(3, 32, 32),
                          nb_classes=10,
                          scale=sigma_1, 
                          num_noise_vec= 8,
                          train_multi_noise= True,
                          attack_type="PGD",
                          epsilon=1.0,
                          num_steps=10,
                          warmup=10
                          )

In [25]:
rs_smoothadv_classifier.fit(x_train, y_train, nb_epochs=150, batch_size=256, train_method = 'smoothadv')

<h3> Predictions for Trained Model </h3>

Predicting on the test dataset using the trained model. The accuracy and coverage for the trained model is observed. Coverage refers to the percentage number of instances that could be classified and the model did not abstain from providing a predicted label.

In [28]:
y_test_encoded = F.one_hot(y_test.to(torch.int64))

In [None]:
x_preds_rs_1 = rs_smoothadv_classifier.predict(x_test[:500])
acc_rs_1, cov_rs_1 = compute_accuracy(x_preds_rs_1, y_test_encoded[:500].numpy())
print("\nSmoothedAdversarial Classifier, sigma=" + str(sigma_1))
print("Accuracy: {}".format(acc_rs_1))
print("Coverage: {}".format(cov_rs_1))

In [30]:
# Calculate certification accuracy for a given radius
def getCertAcc(radius, pred, y_test):
    rad_list = np.linspace(0, 2.25, 201)
    cert_acc = []
    num_cert = len(radius)
    for r in rad_list:
        rad_idx = np.where(radius >= r)[0]
        y_test_subset = y_test[rad_idx]
        cert_acc.append(np.sum(pred[rad_idx] == y_test_subset) / num_cert)
    return cert_acc

In [31]:
def calculateACR(target, prediction, radius):
  tot = 0
  cnt = 0
  for i in range(0,len(prediction)):
      if(prediction[i] == target[i]):
          tot += radius[i]
      cnt += 1
  return tot/cnt

<h3>Certified Radius for Single Image</h3>

Calculating the certified radius on a single image. The image index to be used is used randomly from the 10000 test images. To use any particular image, use the corresponding image's index below.

In [None]:
#single image certification return certified radius, index or random) 
index = random.randint(0,9999)
x_sample = x_test[index].expand((1,3,32,32))
prediction, radius = rs_smoothadv_classifier.certify(x_sample, n = 100000)
print("Prediction: {} and Radius: {}".format(prediction,radius))

<h3>Certification on Test Images</h3>

Performing and observing the certification over all the test dataset consisting of 10000 images. The ACR (Average Certified Radius) is computed for the certification results to understand results better.

In [40]:
# no.of test images for ACR/graph (ACR inside the graph)
start_img = 500
num_img = 500
skip = 1
N = 100000

In [41]:
prediction_1, radius_1 = rs_smoothadv_classifier.certify(x_test[(start_img-1):(start_img-1)+(num_img*skip):skip], n=N)

In [None]:
acr = calculateACR(target=np.array(y_test[(start_img-1):(start_img-1)+(num_img*skip):skip]), prediction= np.array(prediction_1), radius = np.array(radius_1))
print("ACR for Smooth Adversarial Classifier: ", acr)

In [None]:
rad_list = np.linspace(0, 2.25, 201)
plt.plot(rad_list, getCertAcc(radius_1, prediction_1, np.array(y_test)), 'r-', label='smoothed, $\sigma=$' + str(sigma_1))
plt.xlabel('l2 radius')
plt.ylabel('Certified Accuracy')
plt.legend()
plt.title('Average Certified Radius plot: ACR {}'.format(acr))
plt.show()