In [55]:
import torch
import torchvision
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import matplotlib.pyplot as plt
from advertorch.attacks import GradientSignAttack

# makes default tensor a CUDA tensor so GPU can be used
torch.set_default_tensor_type('torch.cuda.FloatTensor')

In [56]:
torch.manual_seed(1)

n_epochs = 15
batch_size_train = 64
batch_size_test = 1000
learning_rate = 0.01
momentum = 0.9
log_interval = 150

### Define data loaders and data preprocessing steps

In [57]:
data_preprocess = torchvision.transforms.Compose([
                        torchvision.transforms.ToTensor(),
                        torchvision.transforms.Normalize((0.1307,), (0.3081,))])
# the mean of mnist pixel data is .1307 and the stddev is .3081

train_loader = torch.utils.data.DataLoader(
                    torchvision.datasets.MNIST('./data', train=True, download=True,
                         transform=data_preprocess), 
                    batch_size=batch_size_train, 
                    shuffle=True)

test_loader = torch.utils.data.DataLoader(
                    torchvision.datasets.MNIST('./data', train=False, download=True,
                         transform=data_preprocess), 
                    batch_size=batch_size_test, 
                    shuffle=True)

### Define the model

In [58]:
class LeNet(nn.Module):
    """MNIST-modified LeNet-5 model.
    """
    def __init__(self):
        super(LeNet, self).__init__()
        self.conv1 = nn.Conv2d(1, 6, kernel_size=5, stride=1, padding=2)
        self.pool1 = nn.MaxPool2d(kernel_size=2)
        self.conv2 = nn.Conv2d(6, 16, kernel_size=5, stride=1, padding=0)
        self.pool2 = nn.MaxPool2d(kernel_size=2)
        self.fc1 = nn.Linear(16*5*5, 120)
        self.fc1_drop = nn.Dropout(p=.50)
        self.fc2 = nn.Linear(120, 84)
        self.fc2_drop = nn.Dropout(p=.50)
        self.fc3 = nn.Linear(84,10)

    def forward(self, x):
        x = self.pool1(F.relu(self.conv1(x)))
        x = self.pool2(F.relu(self.conv2(x)))
        x = x.view(x.shape[0], -1)
        x = self.fc1_drop(F.relu(self.fc1(x)))
        x = self.fc2_drop(F.relu(self.fc2(x)))
        x = self.fc3(x)
        return F.log_softmax(x, dim=1)

### Define adversarial example generation function

In [59]:
def generate_adversarial_samples(og_samples, true_labels, adversary, num_per_samp=1):
    """Create num_per_samp adversarial examples for each sample in
    og_samples. Return the generated samples along with corresponding 
    adv_labels, a tensor containing the adversarial examples' labels.
    """
    adv_samples = []
    for i in range(num_per_samp):
        adv_samples.append(adversary.perturb(og_samples, true_labels))
    adv_samples = torch.cat(adv_samples, 0)
    adv_labels = torch.cat([true_labels]*num_per_samp, 0)
    return adv_samples, adv_labels

### Define my loss function for weighting adversarial examples


In [68]:
def my_loss(output, labels, l2_reg=True, alpha1=.005, jac_reg=False, alpha2=.005):
    """Adds terms for L2-regularization and the norm of the input-output 
    Jacobian to the standard cross-entropy loss function. Check https://arxiv.org/abs/1908.02729
    for alpha1, alpha2 suggestions.
    """
    loss = F.cross_entropy(output, labels)
    
    # add l2 regularization
    if l2_reg:
        l2 = 0
        for p in lenet.parameters():
            l2 += p.pow(2).sum()
        loss = loss + alpha1*l2
    
    # add input-output jacobian regularization
    # this part not ready yet
    if jac_reg:
        for i in range(output.shape[1]):
            if i == 0:
                Jacobian = torch.autograd.grad(output, samples)
            else:
                Jacobian = torch.cat([Jacobian, torch.autograd.grad(output, samples)], 0)
        j = torch.norm(Jacobian)
        loss = loss + alpha2*j

    return loss

### Define train and test functions 

In [69]:
def train(epoch, loss_fn=F.cross_entropy, adversary=None):
    lenet.train()
    
    for batch_idx, (samples, labels) in enumerate(train_loader):
        # send inputs and labels to GPU
        samples, labels = samples.to(device), labels.to(device)
        
        # expand dataset with adversarial examples if adversary specified
        if adversary != None:
            adv_samples, adv_labels = generate_adversarial_samples(samples, labels, adversary)  
            samples, labels = torch.cat([samples, adv_samples], 0), torch.cat([labels, adv_labels], 0)
            samples, labels = samples.to(device), labels.to(device)
        
        # needed in order to calculate input-output Jacobian
        samples.requires_grad = True
        
        optimizer.zero_grad()
        
        output = lenet(samples)
        loss = loss_fn(output, labels)
        loss.backward()
        
        optimizer.step()
        
        if batch_idx % log_interval == 0:
            print(f'Train Epoch: {epoch} \tLoss: {loss.item():.6f}')
            train_losses.append(loss.item())
            train_counter.append((batch_idx*64) + ((epoch-1)*len(train_loader.dataset)))    

In [70]:
def test(loss_fn=F.cross_entropy):
    lenet.eval()
    test_loss = 0
    correct = 0
    
    with torch.no_grad():
        for samples, labels in test_loader:
            samples, labels = samples.to(device), labels.to(device)
            output = lenet(samples)
            # negative log-likelihood loss
            test_loss += loss_fn(output, labels).item()
            # output is a tensor, .data retrieves its data, max must return the index of the highest valued element
            preds = output.data.max(1, keepdim=True)[1]
            correct += preds.eq(labels.data.view_as(preds)).sum()
                
    test_loss /= len(test_loader.dataset)
    accuracy = 100. * correct / len(test_loader.dataset)
    
    print(f'Test set: \n\tAvg. loss: {test_loss:.4f}\n\tAccuracy: ({accuracy:.2f}%)')
    
    test_accuracies.append(test_accuracy)
    test_losses.append(test_loss)

### Instantiate model and train it

In [71]:
lenet = LeNet()
lenet.cuda()
optimizer = optim.SGD(lenet.parameters(), lr=learning_rate, momentum=momentum)
device = 0

train_losses = []
train_counter = []
test_accuracies = []
test_losses = []
    
for epoch in range(1, n_epochs + 1):
    train(epoch, loss_fn=my_loss)
    test()

Train Epoch: 1 	Loss: 2.716683
Train Epoch: 1 	Loss: 0.943496


KeyboardInterrupt: 

### View performance 

In [None]:
fig = plt.figure(figsize=(20,15))

ax1 = fig.add_subplot(2,2,1)
ax1.plot(train_counter, train_losses, color='blue')
ax1.legend(['Train Loss'], loc='upper right')
ax1.set_xlabel('number of training examples seen')
ax1.set_ylabel('cross-entropy loss')

ax2 = fig.add_subplot(2,2,2)
ax2.plot(list(range(n_epochs)), test_accuracies, 'green')
ax2.legend(['Test accuracy'], loc='upper right')
ax2.set_xlabel('epoch')
ax2.set_ylabel('test accuracy')

ax3 = fig.add_subplot(2,2,3)
ax3.plot(list(range(n_epochs)), test_losses, 'red')
ax3.legend(['Test loss'], loc='upper right')
ax3.set_xlabel('epoch')
ax3.set_ylabel('cross-entropy loss')

plt.show()