In [None]:
import os
import torch
import torch.nn as nn
from torch.utils.data import DataLoader
from torchvision import datasets, transforms
import numpy as np
import time
import torch.optim as optim
# from collections import OrderedDict
import torch.nn.functional as F

In [None]:
device = ("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using {device} device")

Using cpu device


In [None]:
# class NN(nn.Module):
#   '''
#   NN with 3 hidden layers each of 50 neurons each.
#   Input is 28*28 image
#   '''
#   def __init__(self):
#       super().__init__()
#       self.flatten = nn.Flatten()
#       self.linear_relu_stack = nn.Sequential(OrderedDict([
#           ('w01', nn.Linear(28*28, 50)),
#           ('r1', nn.ReLU()),
#           ('w12', nn.Linear(50, 50)),
#           ('r2', nn.ReLU()),
#           ('w23', nn.Linear(50, 50)),
#           ('r3', nn.ReLU()),
#           ('w34', nn.Linear(50, 10))
#       ])
#       )

#   def forward(self, x):
#       x = self.flatten(x)
#       logits = self.linear_relu_stack(x)
#       return logits


In [None]:
class NN(nn.Module):
    def __init__(self):
        super(NN, self).__init__()
        self.flatten = nn.Flatten()
        self.w01 = nn.Linear(28*28, 50)
        self.r1 = nn.ReLU()
        self.w12 = nn.Linear(50,50)
        self.r2 = nn.ReLU()
        self.w23 = nn.Linear(50,50)
        self.r3 = nn.ReLU()
        self.w34 = nn.Linear(50,10)

    def forward(self, x):
        x = self.flatten(x)
        x = x.view((-1, 28*28))
        x = self.w01(x)
        x = self.r1(x)
        x = self.w12(x)
        x = self.r2(x)
        x = self.w23(x)
        x = self.r3(x)
        x = self.w34(x)
        return x

class Normalize(nn.Module):
    def forward(self, x):
        return (x - 0.1307)/0.3081

# Add the data normalization as a first "layer" to the network
# this allows us to search for adverserial examples to the real image, rather than
# to the normalized image
model = nn.Sequential(Normalize(), NN())

model = model.to(device)
model.train()


Sequential(
  (0): Normalize()
  (1): NN(
    (flatten): Flatten(start_dim=1, end_dim=-1)
    (w01): Linear(in_features=784, out_features=50, bias=True)
    (r1): ReLU()
    (w12): Linear(in_features=50, out_features=50, bias=True)
    (r2): ReLU()
    (w23): Linear(in_features=50, out_features=50, bias=True)
    (r3): ReLU()
    (w34): Linear(in_features=50, out_features=10, bias=True)
  )
)

In [None]:
model = NN().to(device)
print(model)

NN(
  (flatten): Flatten(start_dim=1, end_dim=-1)
  (w01): Linear(in_features=784, out_features=50, bias=True)
  (r1): ReLU()
  (w12): Linear(in_features=50, out_features=50, bias=True)
  (r2): ReLU()
  (w23): Linear(in_features=50, out_features=50, bias=True)
  (r3): ReLU()
  (w34): Linear(in_features=50, out_features=10, bias=True)
)


Data


In [None]:
batch_size = 64
# np.random.seed(42)
# torch.manual_seed(42)
train_dataset = datasets.MNIST('mnist_data/', train=True, download=True, transform=transforms.Compose(
    [transforms.ToTensor()]
))
test_dataset = datasets.MNIST('mnist_data/', train=False, download=True, transform=transforms.Compose(
    [transforms.ToTensor()]
))

train_loader = torch.utils.data.DataLoader(train_dataset, batch_size=batch_size, shuffle=True)
test_loader = torch.utils.data.DataLoader(test_dataset, batch_size=batch_size, shuffle=False)

Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz to mnist_data/MNIST/raw/train-images-idx3-ubyte.gz


100%|██████████| 9912422/9912422 [00:00<00:00, 88310148.10it/s]

Extracting mnist_data/MNIST/raw/train-images-idx3-ubyte.gz to mnist_data/MNIST/raw






Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz to mnist_data/MNIST/raw/train-labels-idx1-ubyte.gz


100%|██████████| 28881/28881 [00:00<00:00, 87525790.34it/s]


Extracting mnist_data/MNIST/raw/train-labels-idx1-ubyte.gz to mnist_data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz to mnist_data/MNIST/raw/t10k-images-idx3-ubyte.gz


100%|██████████| 1648877/1648877 [00:00<00:00, 26412661.92it/s]


Extracting mnist_data/MNIST/raw/t10k-images-idx3-ubyte.gz to mnist_data/MNIST/raw

Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz to mnist_data/MNIST/raw/t10k-labels-idx1-ubyte.gz


100%|██████████| 4542/4542 [00:00<00:00, 5372399.54it/s]


Extracting mnist_data/MNIST/raw/t10k-labels-idx1-ubyte.gz to mnist_data/MNIST/raw



Training

In [None]:
# Loss = nn.CrossEntropyLoss()
# optimizer = optim.SGD(model.parameters(), lr=0.001, momentum=0.9)



def train_model(model, num_epochs, enable_defense=True):
    learning_rate = 0.0001

    opt = optim.Adam(params=model.parameters(), lr=learning_rate)

    ce_loss = torch.nn.CrossEntropyLoss()

    tot_steps = 0

    for epoch in range(1,num_epochs+1):
        t1 = time.time()

        for batch_idx, (x_batch, y_batch) in enumerate(train_loader):
          x_batch, y_batch = x_batch.to(device), y_batch.to(device)
          tot_steps += 1
          opt.zero_grad()
          out = model(x_batch)
          batch_loss = ce_loss(out, y_batch)
          batch_loss.backward()
          opt.step()

        tot_test, tot_acc = 0.0, 0.0
        for batch_idx, (x_batch, y_batch) in enumerate(test_loader):
            x_batch, y_batch = x_batch.to(device), y_batch.to(device)
            out = model(x_batch)
            pred = torch.max(out, dim=1)[1]
            acc = pred.eq(y_batch).sum().item()
            tot_acc += acc
            tot_test += x_batch.size()[0]
        t2 = time.time()

        print('Epoch %d: Accuracy %.5lf [%.2lf seconds]' % (epoch, tot_acc/tot_test, t2-t1))

Box operations:

In [None]:
def box_add(*boxes):
  add_a, add_b = 0, 0
  for a,b in boxes:
    add_a = add_a + a
    add_b = add_b + b
  return (add_a, add_b)

def box_scalarmult(alpha, box):
  a, b = box
  return (alpha*a, alpha*b) if alpha >= 0 else (alpha*b, alpha*a)

def box_affine(layer, boxes):
      '''
      wi are the weights going into the ith neuron in the next layer.
      wi[j]*box[j] gives the box abstraction of the edge from the jth neuron on the current layer to the ith neuron of the next.
      Sum over all wi[j] to get the box of the ith neuron of the next layer.
      '''
      weights = layer.weight
      next_boxes = []
      for wi in weights:
        next_boxes.append(box_add(*[box_scalarmult(wi[j], boxes[j]) for j in range(len(weights))]))

      return next_boxes

def box_relu(boxes):
  def box_relu1(box):
    a, b = box
    ra = 0 if a <= 0 else a
    rb = 0 if b <= 0 else b
    return (ra, rb)
  return [box_relu1(box) for box in boxes]


lower = 0
upper = 1

def box_lu_tensorify(boxes, mode = 0):
  '''
  (...,[li,ui],...) -> tensor([...li...]), tensor([...ui...])
  mode = 0 gives lower bounds
  mode = 1 gives upper bounds
  '''
  if mode == 0:
    m = [l for (l,u) in boxes]
    return torch.FloatTensor(m)
  else:
    m = [u for (l,u) in boxes]
    return torch.FloatTensor(m)


def box_indexed_tensorify(boxes, index):
  '''
  Return a tensor which has the upper bound for all indeces that isn't 'index'.
  Lower bound for index.
  '''
  indexed_list = []
  for i, box in enumerate(boxes):
    if i != index:
      indexed_list.append(box[1])
    else:
      indexed_list.append(box[0])
  return torch.FloatTensor(indexed_list)


def tensor_boxify(tensor, eps):
  '''
  [tensor([...x..]) -> [...(x-eps,x+eps)...]
  '''
  boxes = []
  t1 = tensor.tolist()
  for x in t1:
    boxes.append((x-eps,x+eps))
  return boxes


# def box_isindex(boxes, index):
#   '''
#   To check if classified as index.
#   Checks if min(box[i])>max(box[j]) for every j != i
#   '''
#   min_index = boxes[i][0]
#   is_index = True

#   for i in range(len(boxes)):
#     if i != index:
#       if min_index <= boxes[i][1]:
#             is_index = False
#             break
#   return is_index

In [None]:
def box_nn(input_boxes, model):
  '''
  Inputs of the NN as boxes. Returns boxes of output (modulo softmax)
  '''
  w01 = model.w01
  w12 = model.w12
  w23 = model.w23
  w34 = model.w34

  boxes = box_relu( box_affine(w01, input_boxes)  )
  boxes = box_relu( box_affine(w12, boxes)  )
  boxes = box_relu( box_affine(w23, boxes)  )
  boxes = box_relu( box_affine(w34, boxes)  )

  return boxes

2(a)

In [None]:
def robustness(model, test_set, eps):
  '''
  Given a model and a test set with input, ground truth (x,y),
  check if the L_infty eps ball around x also gives y.

  Using boxes, we get a range of losses. See if inputs are robust.
  Also find the maximum loss amongst these and average out amongst no. of xs.
  '''
  sum_loss = 0
  number_of_robust_inputs = 0
  for input_tensor, ground_index in test_set:
    input_boxes = tensor_boxify(torch.flatten(input_tensor), eps)

    output_boxes = box_nn(input_boxes, model)

    output_tensor = box_indexed_tensorify(output_boxes, ground_index) # This gives a worst case scenario loss (where ground_index is minimized)
                                                                      # and the others are maximized within our abstraction.

    if torch.argmax(output_tensor) == ground_index:
      number_of_robust_inputs += 1

    # max_loss = nn.CrossEntropyLoss(output_tensor, torch.tensor([ground_index], dtype = torch.long)) # worst_case loss
    # sum_loss = sum_loss + max_loss

  return (number_of_robust_inputs/len(test_set))*100
      # , (sum_loss/len(test_set))*100 # robustness and expected max loss


In [None]:
train_model(model, 3)


Epoch 1: Accuracy 0.86440 [11.29 seconds]
Epoch 2: Accuracy 0.89900 [11.59 seconds]
Epoch 3: Accuracy 0.90880 [10.84 seconds]


In [None]:
for i in range(1,11):
  print(f'Robustness {robustness(model, test_dataset, 0.1*i)} for eps = {0.1*i}')