In [None]:
# Keras for mnist datasat
from keras.datasets import mnist

# NumPy for data processing and handling
import numpy as np

# PyTorch for neural networks
import torch

# MatPlotLib for plotting
import matplotlib.pyplot as plt
%matplotlib inline  

In [None]:
# Load MNIST dataset from Keras
(train_x, train_y), (test_x, test_labels) = mnist.load_data()

In [None]:
# Separate train and validation sets
train_inputs = np.reshape(train_x[:-5000], (train_x[:-5000].shape[0], 1, train_x[:-5000].shape[1], train_x[:-5000].shape[2]))
val_inputs = np.reshape(train_x[-5000:], (train_x[-5000:].shape[0], 1, train_x[-5000:].shape[1], train_x[-5000:].shape[2]))
train_labels = train_y[:-5000]
val_labels = train_y[-5000:]
test_inputs = np.reshape(test_x, (test_x.shape[0], 1, test_x.shape[1], test_x.shape[2]))

print('Training Inputs shape: ' + str(train_inputs.shape))
print('Training Labels shape: ' + str(train_labels.shape))

print('Testing Inputs shape: ' + str(test_inputs.shape))
print('Testing Labels shape: ' + str(test_labels.shape))

print('Validation Inputs shape: ' + str(val_inputs.shape))
print('Validation Labels shape: ' + str(val_labels.shape))

# Cast numpy arrays as PyTorch tensors
x_train = torch.FloatTensor(train_inputs)
y_train = torch.LongTensor(train_labels)
x_test = torch.FloatTensor(test_inputs)
y_test = torch.LongTensor(test_labels)
x_val = torch.FloatTensor(val_inputs)
y_val = torch.LongTensor(val_labels)

In [None]:
sample = np.random.randint(train_inputs.shape[0])

print(sample, train_labels[sample])
plt.imshow(train_inputs[sample, 0], cmap='gray')

In [None]:
# Define our convolutional neural network as a sub class of torch.nn.Module
class CNN(torch.nn.Module):
    def __init__(self, p=0.5):
        super(CNN, self).__init__()

        # Dropout function with probability p
        self.dropout = torch.nn.Dropout(p=p)

        self.network = torch.nn.Sequential(torch.nn.Conv2d(1, 32, 5, stride=1),
                                           torch.nn.ReLU(),
                                           self.dropout,
                                           torch.nn.Conv2d(32, 32, 5, stride=1),
                                           torch.nn.ReLU(),
                                           torch.nn.MaxPool2d(2),
                                           self.dropout,
                                           torch.nn.Conv2d(32, 64, 5, stride=1),
                                           torch.nn.ReLU(),
                                           torch.nn.MaxPool2d(2),
                                           self.dropout,
                                           torch.nn.Flatten(),
                                           torch.nn.Linear(3*3*64, 10))

        self.loss_fct = torch.nn.CrossEntropyLoss()

    def forward(self, x):
        return self.network(x)

    def loss(self, y_pred, y_true):
        return self.loss_fct(y_pred, y_true)

In [None]:
# Initialize network
net = CNN(0.5)

# Reset epoch count
epoch = -1

# Array to keep track of the loss after epoch
loss_log = []
val_loss_log = []

In [None]:
#Initialize optimizer
optimizer = torch.optim.SGD(net.parameters(), lr=0.001, momentum=0.9)

# Number of training epochs
n_epoch = 5

# Batch sized used for each update
batch_size = 1000
# Number of batches per epoch
n_batchs = x_train.shape[0] // batch_size

# Create an array with all the indicies for the training data
ind = np.arange(x_train.shape[0])

# Loop over training algorithm for n_epochs
for e in range(n_epoch):
    # Increase epoch count
    epoch += 1
    # Shuffle training data indices for SGD
    shuffled_ind = np.random.permutation(ind)
    # Initialize variable to keep track of average epoch loss
    epoch_loss = 0.0
    
    # Put network in inference mode
    #net.eval()

    # Calculate validation loss
    x_val_t = torch.autograd.Variable(x_val)
    y_val_true = torch.autograd.Variable(y_val)
    y_val_pred = net(x_val)
    val_loss_log.append(net.loss(y_val_pred, y_val).item())
    
    # Put network in training mode
    net.train()

    # Loop over training algorithm for each batch in a single epoch
    for i in range(n_batchs):
        # Collect batch of training data and labels using shuffled indicies
        x = torch.autograd.Variable(x_train[shuffled_ind[batch_size*i: batch_size*(i+1)]])
        y_true = torch.autograd.Variable(y_train[shuffled_ind[batch_size*i: batch_size*(i+1)]])

        # Set gradients to zero
        optimizer.zero_grad()

        # Propogate the training data through the network
        y_pred = net(x)

        # Calculate the loss from the network outputs
        loss = net.loss(y_pred, y_true)
        
        # Perform backpropagation
        loss.backward()
        # Update weights using backpropagation
        optimizer.step()
        # Add batch loss to epoch loss
        epoch_loss += loss.item()

    # Record the average loss for the epoch
    loss_log.append(epoch_loss / n_batchs)
    
    print('\n' + 'Epoch: ' + str(epoch) + '\n' + 'Training Loss: ' + str(loss_log[epoch]) + '\n' + 'Validation Loss: ' + str(val_loss_log[epoch]))

print('\n' + 'Training finished.')

In [None]:
# Put network in inference mode
net.eval()

# Initialize variable to keep track of average epoch loss
epoch_loss = 0.0

# Calculate training loss
# Must evaluate in batchs due to memory limits
eval_batch_size = 1000
n_batch = x_train.shape[0] // eval_batch_size
for i in range(n_batch):
    x = torch.autograd.Variable(x_train[eval_batch_size*i: eval_batch_size*(i+1)])
    y_true = torch.autograd.Variable(y_train[eval_batch_size*i: eval_batch_size*(i+1)])
    y_pred = net(x)
    epoch_loss += net.loss(y_pred, y_true).item()

epoch_loss = epoch_loss / n_batch

# Calculate validation loss
x_val_t = torch.autograd.Variable(x_val)
y_val_true = torch.autograd.Variable(y_val)
y_val_pred = net(x_val)
val_loss = net.loss(y_val_pred, y_val).item()

# Calculate test loss
x_test_t = torch.autograd.Variable(x_test)
y_test_true = torch.autograd.Variable(y_test)
y_test_pred = net(x_test_t)
test_loss = net.loss(y_test_pred, y_test).item()

# Calculate model accuracy
train_values, train_indices = torch.max(y_pred, 1)
train_correct = y_true[train_indices == y_true].shape[0]

val_values, val_indices = torch.max(y_val_pred, 1)
val_correct = y_val_true[val_indices == y_val_true].shape[0]

test_values, test_indices = torch.max(y_test_pred, 1)
test_correct = y_test_true[test_indices == y_test_true].shape[0]

# Record the average loss for the epoch
print('\n' + 'Epoch: %d \n' % (9+1))
print('  Training Loss: %.4f, Accuracy: %.4f' % (epoch_loss, train_correct / y_true.shape[0]))
print('Validation Loss: %.4f, Accuracy: %.4f' % (val_loss, val_correct / y_val_true.shape[0]))
print('      Test Loss: %.4f, Accuracy: %.4f' % (test_loss, test_correct / y_test_true.shape[0]))

In [None]:
plt.figure()
plt.plot(np.arange(len(loss_log)), loss_log, label='Training')
plt.plot(np.arange(len(val_loss_log)), val_loss_log, label='Validation')
plt.xlabel('Epoch')
plt.ylabel('Cross Entropy Loss')
plt.xlim([0, len(loss_log)-1])
plt.ylim([0, 1])
plt.legend()
plt.show()

In [None]:
# Save model
torch.save(net.state_dict(), './mnist_model.pkl')

In [None]:
# Load model
net = CNN(0.5)

net.load_state_dict(torch.load('./mnist_model.pkl'))

In [None]:
sample = np.random.randint(test_inputs.shape[0])

x_sample = torch.autograd.Variable(np.reshape(x_test[sample], (1, x_test.shape[1], x_test.shape[2], x_test.shape[3])))
y_sample_true = torch.autograd.Variable(y_test[sample])
value, y_sample_pred = torch.max(loaded_net(x_sample), 1)
print(y_sample_pred)
print('     True Label: %d' % (y_sample_true))
print('Predicted Label: %d' % (y_sample_pred))

plt.imshow(test_inputs[sample, 0], cmap='gray')