In [1]:
import chainer
import chainer.links as L
import chainer.functions as F
from chainer import iterators, datasets, optimizers
from chainer.dataset import concat_examples
import numpy as np
import math

In [2]:
# Parameters
learning_rate = 0.01
training_epochs = 100
batch_size = 128
display_step = 1

# Network Parameters
channels = [1, 32, 64] # 2nd layer number of neurons
k_size = 5
stride = 1
pad = 1
pool_size = 2
pool_stride = 2
n_hiddens_1 = 1024
num_classes = 10 # MNIST total classes (0-9 digits)
dropout = 0.75 # Dropout, probability to keep units

In [3]:
# Load data
train, test = chainer.datasets.get_mnist(withlabel=True, ndim=3, scale=1.)
reduced_train = chainer.iterators.SerialIterator(dataset=train, batch_size=5000, repeat=True, shuffle=True).next()
reduced_test = chainer.iterators.SerialIterator(dataset=train, batch_size=1000, repeat=True, shuffle=True).next()
train_iter = chainer.iterators.SerialIterator(dataset=reduced_train, batch_size=batch_size, 
                                              repeat=True, shuffle=True)
test_iter = chainer.iterators.SerialIterator(dataset=reduced_test, batch_size=batch_size, 
                                             repeat=False, shuffle=False)

### Convolutional layer
Input: (batch_size, channels, height, width)

In [4]:
class CNN(chainer.Chain):
    def __init__(self):
        super(CNN, self).__init__()
        with self.init_scope():
            self.conv1 = L.Convolution2D(in_channels=channels[0], 
                                         out_channels=channels[1], 
                                         ksize=k_size, 
                                         stride=stride, pad=pad)
            self.conv2 = L.Convolution2D(in_channels=channels[1], 
                                         out_channels=channels[2], 
                                         ksize=k_size, 
                                         stride=stride, pad=pad)
            self.fc1 = L.Linear(in_size=None, out_size=n_hiddens_1)
            self.fc2 = L.Linear(in_size=n_hiddens_1, out_size=num_classes)
            
    def __call__(self, x, training=False):
        z1 = F.relu(self.conv1(x))
        max_z1 = F.max_pooling_2d(z1, ksize=pool_size, stride=pool_stride)
        z2 = F.relu(self.conv2(max_z1))
        max_z2 = F.max_pooling_2d(z2, ksize=pool_size, stride=pool_stride)
        z3 = F.relu(self.fc1(max_z2))
        h = self.fc2(z3)
        if chainer.config.train:
            h = F.dropout(h, dropout)
        return h

In [5]:
model = CNN()
optimiser = chainer.optimizers.SGD(lr=learning_rate)
optimiser.setup(model)

In [6]:
def reset_iter_state(iterator):
    iterator.epoch = 0
    iterator.current_position = 0
    iterator.is_new_epoch = False
    iterator._pushed_position = None

In [7]:
reset_iter_state(train_iter)

while train_iter.epoch < training_epochs:
    train_batch = train_iter.next()
    image_train, target_train = concat_examples(train_batch)
    
    # Calculate the prediction of the network
    prediction_train = model(image_train)

    # Calculate the loss with softmax_cross_entropy
    loss = F.softmax_cross_entropy(prediction_train, target_train)

    # Calculate the gradients in the network
    model.cleargrads()
    loss.backward()
    
    # Update all the trainable paremters
    optimiser.update()
    # --------------------- until here ---------------------

    # Check the validation accuracy of prediction after every epoch
    if train_iter.is_new_epoch and (train_iter.epoch + 1) % display_step == 0:
        # If this iteration is the final iteration of the current epoch
        with chainer.using_config('train', False):
            test_losses = []
            test_accuracies = []
            while True:
                test_batch = test_iter.next()
                image_test, target_test = concat_examples(test_batch)

                # Forward the test data
                prediction_test = model(image_test)

                # Calculate the loss
                loss_test = F.softmax_cross_entropy(prediction_test, target_test)
                test_losses.append(loss_test.data)

                # Calculate the accuracy
                accuracy = F.accuracy(prediction_test, target_test)
                test_accuracies.append(accuracy.data)

                if test_iter.is_new_epoch:
                    reset_iter_state(test_iter)
                    break

            print('epoch={:03d} train_loss={:.05f} val_loss={:.05f} val_accuracy={:.05f}'.format(
                train_iter.epoch + 1, float(loss.data), np.mean(test_losses), np.mean(test_accuracies)))

epoch=002 train_loss=2.02196 val_loss=1.89633 val_accuracy=0.71387
epoch=003 train_loss=1.90810 val_loss=1.61309 val_accuracy=0.76660
epoch=004 train_loss=1.73206 val_loss=1.41903 val_accuracy=0.86426
epoch=005 train_loss=1.73137 val_loss=1.27605 val_accuracy=0.88379
epoch=006 train_loss=1.78970 val_loss=1.18834 val_accuracy=0.89941
epoch=007 train_loss=1.77280 val_loss=1.12085 val_accuracy=0.91406
epoch=008 train_loss=1.76641 val_loss=1.04981 val_accuracy=0.91602
epoch=009 train_loss=1.63876 val_loss=0.99821 val_accuracy=0.90234
epoch=010 train_loss=1.54371 val_loss=0.95356 val_accuracy=0.91797
epoch=011 train_loss=1.67924 val_loss=0.91687 val_accuracy=0.92285
epoch=012 train_loss=1.58991 val_loss=0.87299 val_accuracy=0.92676
epoch=013 train_loss=1.65448 val_loss=0.86471 val_accuracy=0.91992
epoch=014 train_loss=1.60676 val_loss=0.82400 val_accuracy=0.93457
epoch=015 train_loss=1.60100 val_loss=0.81788 val_accuracy=0.92969
epoch=016 train_loss=1.46830 val_loss=0.77951 val_accuracy=0.9