#Convolution for Augmentated data

The MNIST data is augmented for better understanding of convolution and its working. For this we use the 28X28 image and add it to an array of zeroes of size 56X56. For details refer to "Shuffle" method.
So after augmentation, our 28X28 image can be located anywhere on the 56X56 grid. And we learn what configuration and intuitive factors help the convolutional neural network identify the digit on the input image.

In [0]:
from random import randrange
import torch
from torch.autograd import Variable
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from torchvision import datasets, transforms
import numpy as np
import matplotlib.pyplot as plt

batch_size=200
criterion = nn.NLLLoss()

#Download the training data and apply transformations
#to transform the MNIST data into tensor datatype and normalize the values
#store the data in the torch's dataLoader with batch size and shuffle capabilities
#The data loader provides easier management of the data with automatic facilities like shuffling data and dividing it into batches.
train_loader = torch.utils.data.DataLoader(
    datasets.MNIST('../data', train=True, download=True,
                    transform=transforms.Compose([
                        transforms.ToTensor(),
                        transforms.Normalize((0.1307,), (0.3081,))
                    ])),
    batch_size=batch_size, shuffle=True)

test_loader = torch.utils.data.DataLoader(
    datasets.MNIST('../data', train=False, 
                    transform=transforms.Compose([
                        transforms.ToTensor(),
                        transforms.Normalize((0.1307,), (0.3081,))
                    ])),
    batch_size=batch_size, shuffle=True)
use_cuda = True

# rndx and rndy determine the position of the 28X28 'image' in the 56X56 grid(here stored in 'a')
def shuffle(rndx,rndy,image):
    a = np.zeros((56,56))
    image = image.view(-1, 28)
    # Adding the 'image' to the part of 'a' sliced from 'rndx', 'rndy'
    a[rndx:28+rndx,rndy:28+rndy]+=np.array(image)
    return torch.as_tensor(a).float()

flag = False
# The network which has two layers of convolution and 2 subsequent fully connected layers of 200 nodes each followed by output layer of 10 nodes
def create_nn(learning_rate=0.01, epochs=10,
              log_interval=10):
    class Net(nn.Module):
        #Defining the network architecture, input and output layer inclusive
        #Layer 1 size = 28X28 size convolution filetr(or kernel) of stride 1, followed by pooling of size 2 stride 2, converted to 15 14X14 output channels
        #Layer 2 size = 4X4 size convolution filetr(or kernel) of stride 1, padding 3, followed by pooling of size 2 stride 2, converted to 32 8X8 output channels
        #Layer 3 size = 200
        #Layer 4 size = 200
        #Layer 5 size = 10 (representing the output digit 0 to 9)
        def __init__(self):
            super(Net, self).__init__()
            self.layer1 = nn.Sequential(
                nn.Conv2d(1, 15, kernel_size=28, stride=1, padding=0),
                nn.ReLU()
                ,nn.MaxPool2d(kernel_size=2, stride=2)
                )
            self.layer2 = nn.Sequential(
                nn.Conv2d(15, 32, kernel_size=4, stride=1, padding=3),
                nn.ReLU()
                ,nn.MaxPool2d(kernel_size=2, stride=2)
                )
            self.drop_out = nn.Dropout()
 
            #self.fc1 = nn.Linear(28 * 28 *4, 200)
            self.fc1 = nn.Linear(2048, 200)
            self.fc2 = nn.Linear(200, 200)
            self.f7 = nn.Linear(200, 10)

        #The output of every hidden layer is subject to the relu function as configured in the forward method below
        #The final output is subject to log_softmax function as a way to normalize our output
        def forward(self, x):
            x = self.layer1(x)
            if flag:
              img = x
              plt.imshow(img.cpu().detach().numpy()[0][0])
              plt.show()
            x = self.layer2(x)
            x = x.reshape(x.size(0), -1)
            x = self.drop_out(x)
            x = F.relu(self.fc1(x))
            x = F.relu(self.fc2(x))
            x = self.f7(x)
            return F.log_softmax(x)

    net = Net()
    if use_cuda and torch.cuda.is_available():
        net.cuda()
    print(net)

    optimizer = optim.SGD(net.parameters(), lr=learning_rate, momentum=0.9)

    # run the main training loop
    for epoch in range(epochs):
        for batch_idx, (data, target) in enumerate(train_loader):
            data, target = Variable(data), Variable(target)
            # resize data from (batch_size, 1, 28, 28) to (batch_size, 28*28)
            temp = torch.as_tensor(np.zeros((200, 1, 28*2, 28*2))).float()
            for i,(image) in enumerate(data):
                rndx = randrange(29)
                rndy = randrange(29)
                temp[i][0] = shuffle(rndx,rndy,image)
            data = temp
            if use_cuda and torch.cuda.is_available():
                data = data.cuda()
                target = target.cuda()
            
            optimizer.zero_grad()
            net_out = net(data)
            loss = criterion(net_out, target)
            loss.backward()
            optimizer.step()
            if batch_idx % log_interval == 0:
                print('Train Epoch: {} [{}/{} ({:.0f}%)]\tLoss: {:.6f}'.format(
                    epoch, batch_idx * len(data), len(train_loader.dataset),
                           100. * batch_idx / len(train_loader), loss.data))
    return net

    # run a test loop
def test_nn():
    test_loss = 0
    correct = 0
    flag = True
    for data, target in test_loader:
        data, target = Variable(data, volatile=True), Variable(target)

        temp = torch.as_tensor(np.zeros((200, 1, 28*2, 28*2))).float()
        for i,(image) in enumerate(data):
            rndx = 0#randrange(29)
            rndy = 0#randrange(29)
            temp[i][0] = shuffle(rndx,rndy,image)
        data = temp

        if use_cuda and torch.cuda.is_available():
            data = data.cuda()
            target = target.cuda()
        net_out = net(data)
        # sum up batch loss
        test_loss += criterion(net_out, target).data
        pred = net_out.data.max(1)[1]  # get the index of the max log-probability
        correct += pred.eq(target.data).sum()

    test_loss /= len(test_loader.dataset)
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))


if __name__ == "__main__":
    run_opt = 2
    if run_opt == 1:
        simple_gradient()
    elif run_opt == 2:
        net = create_nn()
        test_nn()

In [0]:
    test_loss = 0
    correct = 0
    flag = True
    for data, target in test_loader:
        data, target = Variable(data, volatile=True), Variable(target)

        temp = torch.as_tensor(np.zeros((200, 1, 28*2, 28*2))).float()
        for i,(image) in enumerate(data):
            rndx = randrange(29)
            rndy = randrange(29)
            temp[i][0] = shuffle(rndx,rndy,image)
        data = temp

        if use_cuda and torch.cuda.is_available():
            data = data.cuda()
            target = target.cuda()
        net_out = net(data)
        # sum up batch loss
        test_loss += criterion(net_out, target).data
        pred = net_out.data.max(1)[1]  # get the index of the max log-probability
        correct += pred.eq(target.data).sum()

    test_loss /= len(test_loader.dataset)
    print('\nTest set: Average loss: {:.4f}, Accuracy: {}/{} ({:.0f}%)\n'.format(
        test_loss, correct, len(test_loader.dataset),
        100. * correct / len(test_loader.dataset)))

# torch.save(model.state_dict(), MODEL_STORE_PATH + 'conv_net_model.ckpt')