In [1]:
import torch

import torch.nn as nn
import torch.nn.functional as F

from torch.utils.data import DataLoader
from torchvision.transforms import ToTensor
from torchvision import datasets
from torch.utils.data import random_split
from sklearn.metrics import classification_report

import matplotlib
matplotlib.use("Agg")
import matplotlib.pyplot as plt

import numpy as np

In [3]:
class NeuralNetwork(nn.Module):
    def __init__(self):
        super(NeuralNetwork, self).__init__()
        self.conv_layer1 = nn.Sequential(
            nn.Conv2d(in_channels=1, out_channels=32, kernel_size=3, stride=1),
            nn.BatchNorm2d(32),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=32, out_channels=64, kernel_size=3, stride=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2)
        )

        self.conv_layer2 = nn.Sequential(
            nn.Conv2d(in_channels=64, out_channels=128, kernel_size=3, stride=1),
            nn.BatchNorm2d(128),
            nn.ReLU(inplace=True),
            nn.Conv2d(in_channels=128, out_channels=128, kernel_size=3, stride=1),
            nn.ReLU(inplace=True),
            nn.MaxPool2d(kernel_size=2, stride=2),
            nn.Dropout2d(p=0.1)
        )
    
    # For future use, not needed for MNIST dataset
#     self.conv_layer3 = nn.Sequential(
#         nn.Conv2d(in_channels=128, out_channels=256, kernel_size=3, stride=1),
#         nn.BatchNorm2d(256),
#         nn.ReLU(inplace=True),
#         nn.Conv2d(in_channels=256, out_channels=256, kernel_size=3, stride=1),
#         nn.ReLU(inplace=True)
#         nn.MaxPool2d(kernel_size=2, stride=2)
#         nn.Dropout2d(p=0.1)
#     )
    
    self.fc_layer = nn.Sequential(
        nn.Linear(in_features=28*28*4, out_features=512),
        nn.ReLU(inplace=True),
        nn.Linear(in_features=512, out_features=256),
        nn.ReLU(inplace=True),
        nn.Dropout(p=0.1),
        nn.Linear(in_feature=256, out_features=10)
    )

  def forward(self, x):
    x = self.conv_layer1(x)
    
    x = self.conv_layer2(x)
    
#     x = self.conv_layer3(x)
    
    x = nn.flatten(X) # Maybe replace with .view
    
    x = self.fc_layer(X)
    
    return F.log_softmax(x, dim=1)
        

model = NeuralNetwork()
print(model)

IndentationError: unindent does not match any outer indentation level (<tokenize>, line 43)

In [None]:
def train(model, train_dataloader, val_dataloader, loss_function, optimizer, train_steps, val_steps, history):
    model.train()

    total_train_loss = 0
    total_val_loss = 0

    total_train_correct = 0
    total_val_correct = 0

    for (X, y) in train_dataloader:
    (X, y) = (X.to(device), y.to(device))

    # Where to put...
    optimizer.zero_grad()

    pred = model(X)
    loss = loss_function(pred, y)

    loss.backward()
    optimizer.step()

    total_train_loss += loss
    total_train_correct += (pred.argmax(1) == y).type(torch.float).sum().item()

    with torch.no_grad():
    model.eval()

    for (X, y) in val_dataloader:
      (X, y) = (X.to(device), y.to(device))

      pred = model(X)
      loss = loss_function(pred, y)

      total_val_loss += loss
      total_val_correct += (pred.argmax(1) == y).type(torch.float).sum().item()


    avg_train_loss = total_train_loss / train_steps
    avg_val_loss = total_val_loss / val_steps

    train_correct = total_train_correct / len(train_dataloader.dataset)
    val_correct = total_val_correct / len(val_dataloader.dataset)

    history["train_loss"].append(avg_train_loss.cpu().detach().numpy())
    history["train_acc"].append(train_correct)
    history["val_loss"].append(avg_val_loss.cpu().detach().numpy())
    history["val_acc"].append(val_correct)

    return history

In [None]:
def test_normal(model, test_dataloader, loss_function, test_data):
    model.eval()

    total_test_loss = 0
    total_test_correct = 0
    preds = []
    
    examples = []
    
    for (X, y) in test_dataloader:
        (X, y) = (X.to(device), y.to(device))

        pred = model(X)
        loss = loss_function(pred, y)

        total_test_loss += loss
        total_test_correct += (pred.argmax(1) == y).type(torch.float).sum().item()

        preds.extend(pred.argmax(axis=1).cpu().numpy())
        
        if len(examples) < 5:
            examples.append((pred, X))

    #cr = classification_report(test_data.targets.cpu().numpy(), np.array(preds), target_names=test_data.classes)
    cr = classification_report(test_data.targets, np.array(preds), target_names=test_data.classes)

    return cr, examples

In [None]:
def fgsm_attack(image, epsilon, data_grad):
    sign_data_grad = data_grad.sign()

    perturbed_image = image + epsilon*sign_data_grad
    perturbed_image = torch.clamp(perturbed_image, 0, 1)

    return perturbed_image

# restores the tensors to their original scale
def denorm(batch, mean=[0.1307], std=[0.3081]):
    if isinstance(mean, list):
        mean = torch.tensor(mean).to(device)
    if isinstance(std, list):
        std = torch.tensor(std).to(device)

    return batch * std.view(1, -1, 1, 1) + mean.view(1, -1, 1, 1)

In [None]:
#def test_attack

In [None]:
LEARNING_RATE = 1e-4
BATCH_SIZE = 64
EPOCHS = 5

TRAIN_SPLIT = 0.8
VAL_SPLIT = 1 - TRAIN_SPLIT

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Device: {device}")

In [None]:
# For now remove SSL certification because is not working
# Remove when SSL cert is working
import ssl
ssl._create_default_https_context = ssl._create_unverified_context

In [None]:
train_data_init = datasets.MNIST(root="data", train=True, download=True, transform=ToTensor())
test_data = datasets.MNIST(root="data", train=True, download=True, transform=ToTensor())

train_sample_size = int(len(train_data_init) * TRAIN_SPLIT)
#val_sample_size = int(len(train_data_init) * VAL_SPLIT)
val_sample_size = len(train_data_init) - train_sample_size
train_data, val_data = random_split(train_data_init, [train_sample_size, val_sample_size], generator=torch.Generator().manual_seed(42)) # manual seed for reproducability

In [None]:
train_dataloader = DataLoader(train_data, batch_size = BATCH_SIZE)
val_dataloader = DataLoader(val_data, batch_size = BATCH_SIZE)
test_dataloader = DataLoader(test_data, batch_size = BATCH_SIZE)

train_steps = len(train_dataloader.dataset) // BATCH_SIZE
val_steps = len(val_dataloader.dataset) // BATCH_SIZE
test_steps = len(test_dataloader.dataset) // BATCH_SIZE

classes = ("0", "1", "2", "3", "4", "5", "6", "7", "8", "9")

In [None]:
for X, y in train_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

print("-"*50)

for X, y in val_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break

print("-"*50)

for X, y in test_dataloader:
    print(f"Shape of X [N, C, H, W]: {X.shape}")
    print(f"Shape of y: {y.shape} {y.dtype}")
    break


In [None]:
def show_images(img, one_channel=True):
  if one_channel:
    img = img.mean(dim=0)

  img = img / 2 + 0.5 # unnormalize? (what does it mean)
  #np_img = img.numpy()

  if one_channel:
    plt.imshow(img, cmap="Greys")
  else:
    plt.imshow(np.transpose(img, (1, 2, 0)))

dataiter = iter(train_dataloader)
images, labels = next(dataiter)
images = images.numpy()

%matplotlib inline

fig = plt.figure(figsize=(25, 4))
for idx in np.arange(20):
    ax = fig.add_subplot(2, 10, idx+1, xticks=[], yticks=[])
    show_images(images[idx])
    ax.set_title(classes[labels[idx]])

In [None]:
for e in range(EPOCHS):
  print(f"Epoch {e+1}")
  print(train(model, train_dataloader, val_dataloader, loss_function, optimizer, train_steps, val_steps, history))
  print("-"*50)

print(test(model, test_dataloader, loss_function, test_data))