[View in Colaboratory](https://colab.research.google.com/github/byebye/MNIST/blob/master/MNIST.ipynb)

# Install PyTorch

In [1]:
# https://pytorch.org/
!pip3 install torch torchvision



# Import PyTorch and check CUDA availability

In [0]:
import torch
import torch.nn as nn
import torchvision.datasets as dsets
import torchvision.transforms as transforms
from torch import optim
from torch.autograd import Variable

use_cuda = True
if use_cuda:
  assert(torch.cuda.is_available())

# Initialize parameters

In [0]:
input_size    = 784   # The image size = 28 x 28 = 784
num_classes   = 10    # The number of output classes. In this case, from 0 to 9
num_epochs    = 5     # The number of times entire dataset is trained
train_size = 38000
validation_size = 4000
batch_size    = 64    # The size of input data took for one iteration
learning_rate = 1e-3  # The speed of convergence

# Download MNIST Dataset

In [5]:
train_dataset = dsets.MNIST(root='./data',
                           train=True,
                           transform=transforms.ToTensor(),
                           download=True)

test_dataset = dsets.MNIST(root='./data',
                           train=False,
                           transform=transforms.ToTensor())

Downloading http://yann.lecun.com/exdb/mnist/train-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/train-labels-idx1-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-images-idx3-ubyte.gz
Downloading http://yann.lecun.com/exdb/mnist/t10k-labels-idx1-ubyte.gz
Processing...
Done!


# Load the Dataset

**Note**: We shuffle the loading process of `train_dataset` to make the learning process independent of data order, but the order of `test_loader` remains so as to examine whether we can handle unspecified bias order of inputs.


In [0]:
train_loader = torch.utils.data.DataLoader(dataset=train_dataset,
                                          batch_size=batch_size,
                                          shuffle=True)

test_loader = torch.utils.data.DataLoader(dataset=test_dataset,
                                          batch_size=batch_size,
                                          shuffle=False)

# Build Neural Network

In [0]:
class Flatten(nn.Module):
    def forward(self, input):
        return input.view(input.size(0), -1)


def build_cnn():
    return nn.Sequential(
#         nn.BatchNorm2d(1),
        nn.Conv2d(1, 32, 5),
        nn.ReLU(),
        nn.Conv2d(32, 32, 5),
        nn.ReLU(),
        nn.MaxPool2d((2, 2)),
        nn.Dropout(0.25),

        nn.Conv2d(32, 64, 3),
        nn.ReLU(),
        nn.Conv2d(64, 64, 3),
        nn.ReLU(),
        nn.MaxPool2d((2, 2)),
        nn.Dropout(0.25),

        Flatten(),
        nn.Linear(576, 256),
        nn.ReLU(),
        nn.Dropout(p=0.5),
        nn.Linear(256, 10),
        nn.Softmax(dim=1))

# Instantiate the CNN

In [0]:
net = build_cnn()
if use_cuda:
  net.cuda()

# Choose the Loss Function and Optimizer

Loss function (**criterion**) decides how the output can be compared to a class, which determines how good or bad the neural network performs. And the **optimizer** chooses a way to update the weight in order to converge to find the best weights in this neural network.

In [0]:
criterion = nn.CrossEntropyLoss()
# optimizer = torch.optim.Adam(net.parameters(), lr=learning_rate)

optimizer = optim.RMSprop(params=net.parameters(), lr=learning_rate)

lr_scheduler = optim.lr_scheduler.ReduceLROnPlateau(optimizer)

# Training the CNN Model

In [0]:
def is_last_epoch(epoch):
    return epoch + 1 == num_epochs
  
predicted_train = []
true_train = []
predicted_test = []
true_test = []

In [50]:
for epoch in range(num_epochs):
    net.train()
    for i, (images, labels) in enumerate(train_loader):   # Load a batch of images with its (index, data, class)
        # Conv2d expects data of shape (N, C, H, W) where N is batch size, C is channels, H is height, W is width
        images = images.view(-1, 1, 28, 28) # change images dimension from [64, 768] to a matrix [64, 1, 28, 28]
        images = Variable(images)
        labels = Variable(labels)
        
        if use_cuda:
            images = images.cuda()
            labels = labels.cuda()
        
        optimizer.zero_grad()                             # Intialize the hidden weight to all zeros
        outputs = net(images)                             # Forward pass: compute the output class given a image
        loss = criterion(outputs, labels)                 # Compute the loss: difference between the output class and the pre-given label
        loss.backward()                                   # Backward pass: compute the weight
        optimizer.step()                                  # Optimizer: update the weights of hidden nodes
        
        if (i+1) % 100 == 0:                              # Logging
            print('Epoch [%d/%d], Step [%d/%d], Loss: %.4f'
                 %(epoch+1, num_epochs, i+1, len(train_dataset)//batch_size, loss.data[0]))
    
    net.eval() 
    correct = 0
    total = 0
    for images, labels in test_loader:
        images.view(-1, 1, 28, 28)
        images = Variable(images)

        if use_cuda:
            images = images.cuda()
            labels = labels.cuda()


        outputs = net(images)
        _, predicted = torch.max(outputs.data, 1) # Choose the best class from the output: The class with the best score
        total += labels.size(0)                    # Increment the total count
        correct += (predicted == labels).sum()     # Increment the correct count
        
        if is_last_epoch(epoch):
            global predicted_test
            global true_test
            true_test += labels.cpu().numpy().tolist()
            predicted_test += predicted.data.numpy().tolist()
    
    test_acc = correct / total
    lr_scheduler.step(test_acc)
    print(test_acc)


Epoch [1/5], Step [100/937], Loss: 1.5550
Epoch [1/5], Step [200/937], Loss: 1.4924
Epoch [1/5], Step [300/937], Loss: 1.5237
Epoch [1/5], Step [400/937], Loss: 1.5080
Epoch [1/5], Step [500/937], Loss: 1.4768
Epoch [1/5], Step [600/937], Loss: 1.4924
Epoch [1/5], Step [700/937], Loss: 1.5080
Epoch [1/5], Step [800/937], Loss: 1.4768
Epoch [1/5], Step [900/937], Loss: 1.4768
0.9831
Epoch [2/5], Step [100/937], Loss: 1.5080
Epoch [2/5], Step [200/937], Loss: 1.4612
Epoch [2/5], Step [300/937], Loss: 1.4768
Epoch [2/5], Step [400/937], Loss: 1.4924
Epoch [2/5], Step [500/937], Loss: 1.4768
Epoch [2/5], Step [600/937], Loss: 1.4924
Epoch [2/5], Step [700/937], Loss: 1.5080
Epoch [2/5], Step [800/937], Loss: 1.4926
Epoch [2/5], Step [900/937], Loss: 1.4924
0.9801
Epoch [3/5], Step [100/937], Loss: 1.5080
Epoch [3/5], Step [200/937], Loss: 1.5237
Epoch [3/5], Step [300/937], Loss: 1.5237
Epoch [3/5], Step [400/937], Loss: 1.5080
Epoch [3/5], Step [500/937], Loss: 1.4924
Epoch [3/5], Step [6

RuntimeError: ignored

# Plot Confusion Matrix

In [0]:
def plot_confusion_matrix(cm, classes, normalize=False,
                          title='Confusion matrix', cmap=plt.cm.Blues,
                          filesave=None):
    """
    This function prints and plots the confusion matrix.
    Normalization can be applied by setting `normalize=True`.
    """
    np.set_printoptions(precision=2)
    plt.figure(figsize=(7, 7))

    if normalize:
        cm = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        print("Normalized confusion matrix")
    else:
        print('Confusion matrix, without normalization')

    plt.imshow(cm, interpolation='nearest', cmap=cmap)
    plt.title(title)
    plt.colorbar()
    tick_marks = np.arange(len(classes))

    plt.xticks(tick_marks, classes)

    yticks = []
    for i in (range(cm.shape[0])):
        acc = cm[i, i] / np.sum(cm[i])
        yticks.append("{} (acc={:.10f})".format(i, acc))

    plt.yticks(tick_marks, yticks)

    fmt = '.2f' if normalize else 'd'
    thresh = cm.max() / 2.
    for i, j in itertools.product(range(cm.shape[0]), range(cm.shape[1])):
        plt.text(j, i, format(cm[i, j], fmt),
                 horizontalalignment="center",
                 color="white" if cm[i, j] > thresh else "black")

    plt.ylabel('True label')
    plt.xlabel('Predicted label')

    if filesave is None:
        plt.tight_layout()
#         plt.show()
    else:
        plt.savefig(filesave, bbox_inches='tight')

In [0]:
# cnf_matrix = confusion_matrix(true_train, predicted_train)
# plot_confusion_matrix(cm=cnf_matrix, classes=list(range(10)),
#                       title='Train data confusion matrix')
    
cnf_matrix = confusion_matrix(true_test, predicted_test)
plot_confusion_matrix(cm=cnf_matrix, classes=list(range(10)),
                      title='Test data confusion matrix')

# Testing the CNN Model

Similar to training the neural network, we also need to load batches of test images and collect the outputs. The differences are that:

1. No loss & weights calculation
2. No weights update
3. Has correct prediction calculation


In [0]:
net.eval() 
correct = 0
total = 0
for images, labels in test_loader:
    images = Variable(images.view(-1, 28*28))
    
    if use_cuda and torch.cuda.is_available():
        images = images.cuda()
        labels = labels.cuda()
    
    
    outputs = net(images)
    _, predicted = torch.max(outputs.data, 1)  # Choose the best class from the output: The class with the best score
    total += labels.size(0)                    # Increment the total count
    correct += (predicted == labels).sum()     # Increment the correct count
    
print('Accuracy of the network on the 10K test images: %d %%' % (100 * correct / total))

# Save the trained FNN Model for future use

We save the trained model as a pickle that can be loaded and used later.

In [0]:
torch.save(net.state_dict(), 'cnn_model.pkl')

# Google Drive connection

In [4]:
# Install the PyDrive wrapper & import libraries.
# This only needs to be done once per notebook.
# !pip install -U -q PyDrive
# from pydrive.auth import GoogleAuth
# from pydrive.drive import GoogleDrive
# from google.colab import auth
# from oauth2client.client import GoogleCredentials

# # Authenticate and create the PyDrive client.
# # This only needs to be done once per notebook.
# auth.authenticate_user()
# gauth = GoogleAuth()
# gauth.credentials = GoogleCredentials.get_application_default()
# drive = GoogleDrive(gauth)

# # Download a file based on its file ID.
# train_file_id = '12WsWKOlQ-IHlU4jK35xNJ3koG_CL6l5z'
# test_file_id = '12MIA-FGEU4Ahtyqd_lDarSF0_cNenOj1'

# train_file = drive.CreateFile({'id': train_file_id})
# test_file = drive.CreateFile({'id': test_file_id})

#print('Downloaded content "{}"'.format(train_file.GetContentString()[:100]))
# print('Downloaded content "{}"'.format(test_file.GetContentString()))

Downloaded content "label,pixel0,pixel1,pixel2,pixel3,pixel4,pixel5,pixel6,pixel7,pixel8,pixel9,pixel10,pixel11,pixel12,"
