In [40]:
import torch
import torchvision.datasets as dset
import torchvision.transforms as transforms
import torch.nn.functional as F
import torch.nn as nn
from torch import nn, optim, no_grad
from sklearn import datasets
import numpy as np
from time import time
import os

torch.set_default_tensor_type(torch.DoubleTensor)

## Dataset

In [41]:
# We specify the location of the data to load and the transformation to apply to the data
_root = "./data"
_trans = transforms.Compose(
    [
        transforms.ToTensor(),
        transforms.Normalize((0.0,), (1.0,)),
    ]
)
# Now the data will be in the scale [0,1]

In [42]:
# Load the data! We use MNIST
train_dataset = dset.MNIST(root=_root, train=True, transform=_trans, download=True)
test_and_val_dataset = dset.MNIST(root=_root, train=False, transform=_trans, download=True)

# Now we separate the "test and val" dataset between validation and test datasets
val_dataset = list()
test_dataset = list()
validation_size = 1000
for i, x in enumerate(test_and_val_dataset):
    if i < validation_size:
        val_dataset.append(x)
    else:
        test_dataset.append(x)

In [43]:
# Create the loaders
train_loader = torch.utils.data.DataLoader(dataset=train_dataset, batch_size=128, shuffle=True)
test_loader = torch.utils.data.DataLoader(dataset=test_dataset, shuffle=True, batch_size=1)
val_loader = torch.utils.data.DataLoader(dataset=val_dataset, batch_size=1000, shuffle=True)

## Models

In [44]:
# We define a MNIST LeNet model
class LeNet(nn.Module):
    def __init__(self):
        super(LeNet, self).__init__()
        self.conv1 = nn.Conv2d(1, 10, kernel_size=5)
        self.conv2 = nn.Conv2d(10, 20, kernel_size=5)
        self.conv2_drop = nn.Dropout2d()
        self.fc1 = nn.Linear(320, 50)
        self.fc2 = nn.Linear(50, 10)

    def forward(self, x):
        #x = x.view(-1, 1, 28 * 28)
        x = F.relu(F.max_pool2d(self.conv1(x), 2))
        x = F.relu(F.max_pool2d(self.conv2_drop(self.conv2(x)), 2))
        x = x.view(-1, 320)
        x = F.relu(self.fc1(x))
        x = F.dropout(x, training=self.training)
        x = self.fc2(x)
        return F.log_softmax(x)

## Training

In [53]:
def compute_accuracy(model, loader):
    correct = 0
    model.eval()
    with no_grad():
        for data, target in loader:
            data = data.double()
            output = model(data)
            pred = output.argmax(dim=1, keepdim=True)
            correct += pred.eq(target.view_as(pred)).sum().item()
    acc = correct / len(loader.dataset)
    return acc

def train_network(model, train_loader, val_loader, loss_func, epochs, adv=False):
    lr = 0.001
    patience = 20
    optimizer = optim.Adam(model.parameters(), lr=lr, betas=(0.9, 0.99))
    scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer, mode="min", patience=patience, verbose=True, factor=0.5)
    
    loss_history = []
    t = time()
    for epoch in range(epochs):
        print(f"Starting epoch {epoch} ({time()-t} secs) and lr = {[param['lr'] for param in optimizer.param_groups]}")
        t = time()
        
        for x, y in train_loader:
            x = x.double()
            optimizer.zero_grad()
            y_pred = model(x)
            loss = loss_func(y_pred, y)
            loss.backward()
            optimizer.step()
        
        for x_val, y_val in val_loader:
            x_val = x_val.double()
            y_val_pred = model(x_val)
            val_loss = loss_func(y_val_pred, y_val)
            print(f"Validation loss = {np.around(val_loss.item(), decimals=4)}")
            loss_history.append(val_loss.item())
        scheduler.step(val_loss)
        
        if epoch % 10 == 9:
            print(f"Validation accuracy = {compute_accuracy(model, val_loader)}")
    
    return model, loss_history

def get_deep_model(epochs, path=None, adv=False):
    loss_func = nn.CrossEntropyLoss()
    if adv=False:
        adv_message = ""
    else:
        adv_message = "_adv_training"
    if path == None:
        model_path = os.getcwd()+"/trained_models"+"/MNIST_LeNet_"+str(epochs)+"_epochs"+adv_message+".model"
    else:
        model_path = path
    if not os.path.exists(os.getcwd()+"/trained_models"):
        os.mkdir(os.getcwd()+"/trained_models")
    try:
        model = torch.load(model_path)
        print(f"Successfully loaded {model_path}")
    except:
        print(f"Retraining {model_path}")
        model = LeNet()
        model, loss_history = train_network(model, train_loader, val_loader, loss_func, epochs, adv)
        torch.save(model, model_path)
    print(f"Test accuracy = {compute_accuracy(model, test_loader)}")
    
    return model

In [54]:
model = get_deep_model(11)

Successfully loaded /Users/m.goibert/Documents/Criteo/TP_adv_Robustness/trained_models/MNIST_LeNet_11_epochs.model


  return F.log_softmax(x)


Test accuracy = 0.9898888888888889
