In [None]:
import warnings
warnings.filterwarnings("ignore")

import numpy as np
import pandas as pd
import time
import h5py
from scipy.ndimage.interpolation import rotate

import matplotlib
import matplotlib.pyplot as plt
import matplotlib.image as mpimg
import matplotlib.gridspec as gridspec

import seaborn as sns
#%matplotlib inline

import torch
import torchvision
from torchvision import datasets
from torchvision import transforms
from torch.autograd import Variable
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torch.utils.data.sampler import SubsetRandomSampler

In [None]:
class Flatten(nn.Module):
    def __init__(self):
        super(Flatten, self).__init__()

    def forward(self, x):
        x = x.view(x.size(0), -1)
        return x

class LeNet(nn.Module):
    def __init__(self, droprate1=0.5, droprate2=0.5):
        super(LeNet, self).__init__()
        self.model = nn.Sequential()
        self.model.add_module('conv1', nn.Conv2d(1, 20, kernel_size=5, padding=2))
        self.model.add_module('dropout1', nn.Dropout2d(p=droprate1))
        self.model.add_module('maxpool1', nn.MaxPool2d(2, stride=2))
        self.model.add_module('conv2', nn.Conv2d(20, 50, kernel_size=5, padding=2))
        self.model.add_module('dropout2', nn.Dropout2d(p=droprate1))
        self.model.add_module('maxpool2', nn.MaxPool2d(2, stride=2))
        self.model.add_module('flatten', Flatten())
        self.model.add_module('dense3', nn.Linear(50*7*7, 500))
        self.model.add_module('relu3', nn.ReLU())
        self.model.add_module('dropout3', nn.Dropout(p=droprate2))
        self.model.add_module('final', nn.Linear(500, 10))

    def forward(self, x):
        return self.model(x)


In [None]:
# ---------------------------------------------------------------------------------------------------
# ---------------------------------------------------------------------------------------------------

class LeNetClassifier:
    def __init__(self, droprate1=0.5, droprate2=0.5, batch_size=128, max_epoch=300, lr=0.01):
        self.batch_size = batch_size
        self.max_epoch = max_epoch
        self.lr = lr
        self.model = LeNet(droprate1, droprate2)
        self.model.cuda()
        self.criterion = nn.CrossEntropyLoss().cuda()
        self.optimizer = optim.SGD(self.model.parameters(), lr=lr)
        self.loss_ = []
        self.test_error = []
        self.test_error_ep = []
        self.test_accuracy = []
        self.test_accuracy_ep = []

    def fit(self, trainset, testset, verbose=True):
        trainloader = torch.utils.data.DataLoader(trainset, batch_size=self.batch_size, shuffle=True)
        testloader = torch.utils.data.DataLoader(testset, batch_size=len(testset), shuffle=False)
        X_test, y_test = iter(testloader).next()
        X_test = X_test.cuda()
        #print(self.model)
        for epoch in range(self.max_epoch):
            running_loss = 0
            for i, data in enumerate(trainloader, 0):
                inputs, labels = data
                inputs, labels = Variable(inputs).cuda(), Variable(labels).cuda()
                self.optimizer.zero_grad()
                outputs = self.model(inputs)
                loss = self.criterion(outputs, labels)
                loss.backward()
                self.optimizer.step()
                running_loss += loss.item()  # change
            self.loss_.append(running_loss / len(trainloader))
            if verbose:
                print('Epoch {} loss: {}'.format(epoch+1, self.loss_[-1]))
            
            #y_test_pred = self.predict(X_test).cpu()
            y_test_pred, y_test_pred_ep = self.predict_ep(X_test)
            
            self.test_accuracy.append(np.mean(y_test.data.numpy() == y_test_pred))
            self.test_accuracy_ep.append(np.mean(y_test.data.numpy() == y_test_pred_ep))
            
            self.test_error.append(int(len(testset)*(1-self.test_accuracy[-1])))
            self.test_error_ep.append(int(len(testset)*(1-self.test_accuracy_ep[-1])))
            
            if verbose:
                print('Test error: {}; test accuracy: {:4f}; test accuracy ep: {:4f}'.format(self.test_error[-1], self.test_accuracy[-1], self.test_accuracy_ep[-1]))
        return self

    def predict(self, x):
        model = self.model.eval()
        outputs = model(Variable(x))
        _, pred = torch.max(outputs.data, 1)
        model = self.model.train()
        return pred

    def predict_ep(self, x, T=100):
        standard_pred = self.predict(x).cpu().numpy()
        y1 = []
        y2 = []
        for _ in range(T):
            _y1 = self.model(Variable(x))
            _y2 = F.softmax(_y1, dim=1)
            y1.append(_y1.data.cpu().numpy())
            y2.append(_y2.data.cpu().numpy())
        
        y2 = np.array(y2)
        pred = np.argmax(np.mean(y2, axis=0), axis=1)
        
        standard_pred = standard_pred.cpu().numpy()
        
        return standard_pred, pred

In [None]:
transform = transforms.Compose([transforms.ToTensor(), \
                                transforms.Normalize((0,), (1,))]) # change

trainset = datasets.MNIST(root='data/', train=True, download=True, transform=transform)
testset = datasets.MNIST(root='data/', train=False, transform=transform)


# Define networks
lenet1 = [LeNetClassifier(droprate1=0, droprate2=0, max_epoch=600),
          LeNetClassifier(droprate1=0, droprate2=0.5, max_epoch=600),
          LeNetClassifier(droprate1=0.5, droprate2=0.5, max_epoch=600)]

# Training, set verbose=True to see loss after each epoch.
[lenet.fit(trainset, testset,verbose=False) for lenet in lenet1]

# Save torch models
for ind, lenet in enumerate(lenet1):
    torch.save(lenet.model, 'mnist_lenet1_'+str(ind)+'.pth')
    # Prepare to save errors
    lenet.test_error = list(map(str, lenet.test_error))
    lenet.test_error_ep = list(map(str, lenet.test_error_ep))

# Save test errors to plot figures
open("lenet1_test_errors.txt","w").write('\n'.join([','.join(lenet.test_error) for lenet in lenet1]))
open("lenet1_test_errors_ep.txt","w").write('\n'.join([','.join(lenet.test_error_ep) for lenet in lenet1]))

In [None]:
# Load saved test errors to plot figures.
lenet1_test_errors = [error_array.split(',') for error_array in open("lenet1_test_errors.txt","r").read().split('\n')]
lenet1_test_errors = np.array(lenet1_test_errors,dtype='f')


labels = ['LeNet no dropout',
          'LeNet IP dropout',
          'LeNet All dropout']

plt.figure(figsize=(8, 7))
for i, r in enumerate(mlp1_test_errors.tolist() + lenet1_test_errors.tolist()):
    plt.plot(range(1, len(r)+1), r, '.-', label=labels[i], alpha=0.6);
plt.ylim([50, 250]);
plt.legend(loc=1);
plt.xlabel('Epochs');
plt.ylabel('Number of errors in test set');
plt.title('Test Error on MNIST Dataset for All Networks')
plt.show()