In [9]:
import numpy as np
from sklearn.datasets        import fetch_mldata
from sklearn.model_selection import train_test_split
from sklearn.preprocessing   import label_binarize

PIXEL_RANGE = range(0, 255 + 1)
IMAGE_RANGE = range(0, 9 + 1)

def process(mnist):
    # Process dataset to return features and labels for CNN
    def features():
        # Transform features to be float32 sets of 1x28x28 
        # tensors with normalized pixel values.
        return np.divide(
            mnist.data, PIXEL_RANGE[-1]
        ).astype(np.float32).reshape((-1, 1, 28, 28))
    def labels():
        # Transform labels to be float32 sets of 1x10
        # tensors that are one-hot encoded.
        return label_binarize(
            mnist.target, classes=IMAGE_RANGE
        ).astype(np.float32).reshape((-1, 10))
    return features(), labels()


# Download the MNIST dataset
mnist            = fetch_mldata('MNIST original', data_home='.')
features, labels = process(mnist)

# Split train and test data
train_features, test_features, train_labels, test_labels = train_test_split(features, labels)


In [10]:
# Import CNTK
import cntk  as c

# Display CNTK version
' '.join([
    c.__name__.upper(),
    c.__version__,
    str(c.device.all_devices()[0])
])

'CNTK 2.0 CPU'

In [11]:
class ConvNet():
    IMAGE_SHAPE        = (1, 28, 28)
    IMAGE_CLASSES      = [n for n in IMAGE_RANGE]
    IMAGE_CLASS_COUNT  = len(IMAGE_CLASSES)
    LEARNING_RATE      = 0.001
    EPOCH_COUNT        = 20
    BATCH_SIZE         = 64
    DROP_RATE          = 0.5
    HIDDEN_LAYER_COUNT = 96
    
    def __init__(self):
        self._build_inputs()
        self._build_layers()
        self._build_stack()
        self._build_trainer()
    
    def evaluate(self, fn, feeds):
        c.logging.log_number_of_parameters(self.stack); print()
        for epoch in range(self.EPOCH_COUNT):
            batch = self.batch(epoch, feeds)
            fn(epoch, batch)
    
    def train(self, data):
        self.trainer.train_minibatch(data)
    
    def summarize(self):
        self.trainer.summarize_training_progress()
    
    def checkpoint(self, version):
        self.stack.save(
            os.path.join(
                '.', '_'.join('ConvNet', 'MNIST', '{}.dnn'.format(version))
            )
        )
    
    def batch(self, epoch, data):
        def chunk(data):
            slice_begin = epoch * self.BATCH_SIZE
            slice_end   = slice_begin + self.BATCH_SIZE
            return data[slice_begin:slice_end]
        return {key: chunk(value) for key, value in data.items()}
    
    def _build_inputs(self):
        self.inputs = c.input_variable(self.IMAGE_SHAPE,       np.float32, name='inputs')
        self.labels = c.input_variable(self.IMAGE_CLASS_COUNT, np.float32, name='labels')
    
    def _build_layers(self):
        with c.layers.default_options(activation=c.ops.relu, pad=False):
            self.layers  = [
                c.layers.Convolution2D((5,5), 32, pad=True),
                c.layers.MaxPooling((3,3), (2,2)),
                c.layers.Convolution2D((3,3), 48),
                c.layers.MaxPooling((3,3), (2,2)),
                c.layers.Convolution2D((3,3), 64),
                c.layers.Dense(self.HIDDEN_LAYER_COUNT),
                c.layers.Dropout(self.DROP_RATE),
                c.layers.Dense(self.IMAGE_CLASS_COUNT, activation=None)
            ]
    
    def _build_stack(self):
        self.stack = self.inputs
        for layer in self.layers: self.stack = layer(self.stack)
        self.loss  = c.losses.cross_entropy_with_softmax(self.stack, self.labels)
        self.error = c.metrics.classification_error(self.stack, self.labels)
    
    def _build_trainer(self):
        schedule     = c.learning_rate_schedule(self.LEARNING_RATE, c.UnitType.minibatch)
        learner      = c.learners.sgd(self.stack.parameters, schedule)
        printer      = c.logging.ProgressPrinter(tag='Training', num_epochs=self.EPOCH_COUNT)
        self.trainer = c.Trainer(self.stack, (self.loss, self.error), learner, printer)
