# Code for capsule_layers.py

In [1]:
"""
Some key layers used for constructing a Capsule Network. These layers can used to construct CapsNet on other dataset,
not just MNIST.
*NOTE*: Some functions may be implemented in multiple ways, I keep all of them. You can try them for youself just by
uncommenting them and commenting their counterparts.
"""

import keras.backend as K
import tensorflow as tf
from keras import initializers, layers

  from ._conv import register_converters as _register_converters
Using Theano backend.


In [3]:
def squash(vectors, axis=-1):
    """
    The non-linear activation used in Capsule. It drives the length of a large vector to near 1 and small vector to 0
    :param vectors: some vectors to be squashed, N-dim tensor
    :param axis: the axis to squash
    :return: a Tensor with same shape as input vectors
    """
    
    s_squared_norm = K.sum(k.square(vectors), axis=axis, keepdims=True)
    scale = s_squared_norm / (1+s_squared_norm) / K.sqrt(s_squared_norm+K.epsilon())
    return scale*vectors

In [None]:
class CapsuleLayer(layers.Layer):
    

In [2]:
def primaryCap(inputs, dim_capsule, n_channels, kernel_size, strides, padding):
    """
    Apply Conv2D `n_channels` times and concatenate all capsules
    :param inputs: 4D tensor, shape=[None, width, height, channels]
    :param dim_capsule: the dim of the output vector of capsule
    :param n_channels: the number of types of capsules
    :return: output tensor, shape = [None, num_capsule, dim_capsule]
    """
    
    output = layers.Conv2D(filters=dim_capsule*n_channels, kernel_size = kernel_size, strides=strides, padding=padding,
                          name='primarycap_conv2d')(inputs)
    outputs = layer.Reshape(target_shape=[-1, dim_capsule], name='primarycap_reshape')(output)
    return layers.Lambda(squash, name='primarycap_squash')(outputs)

# Code for capsule_net.py

In [None]:
import numpy as np
from keras import backend as K
from keras import layers, models, optimizers
from keras.utils import to_categorical
import matplotlib.pyplot as plt
from PIL import Image

K.set_image_data_format("channels_last")

In [None]:
def CapsNet(input_shape, n_class, routings):
    """
    A capsule network on fashion MNIST
    :param input_shape: data shape, 3d, [width, height, channels]
    :param n_class: number of classes
    :routings: number of routing iterations
    :return: Two Keras Models, the first one used for training, and the second one for evaluation.
            `eval_model` can also be used for training
    """
    x = layers.Input(shape=input_shape)
    
    # Layer 1: just a convolutional Conv2D layer
    conv1 = layers.Conv2D(filters=256, kernel_size=9, strides=1, padding='valid', activation='relu', name='conv1')(x)
    
    # Layer 2: Conv2D layer with `squash` activation, then reshape to [None, num_capsule, dim_capsule]
    primarycaps = PrimaryCap(conv1, dim_capsule=8, n_channels=32, kernel_size = 9, strides=2, padding='valid')
    
    # Layer 3: Capsule layer. Routing algorithm works here
    digitcaps = CapsuleLayer(num_capsule=n_class, dim_capsule=16, routings=routings, name='digitcaps')(primarycaps)
    
    # Layer 4: This is auxilary layer to replace each capsule with its length. Just to match the true label's shape.
    # If using TensorFlow, this will not be necessary. :)
    out_caps = Length(name='capsnet')(digitcaps)
    
    # Decoder network.
    y = layers.Input(shape=(n_class,))
    masked_by_y = Mask()([digitcaps, y])  # The true label is used to mask the output of capsule layer. (for training)
    masked = Mask()(digitcaps)  # Mask using the capsule with maximum length. (for prediction)
    
    # Shared Decoder Model in training and prediction
    decoder = models.Sequential(name='decoder')
    decoder.add(layers.Dense(512, activation='relu', input_dim=16*n_class))
    decoder.add(layers.Dense(1024, activation='relu'))
    decoder.add(layers.Dense(np.prod(input_shape), activation='sigmoid'))
    decoder.add(layers.Reshape(target_shape=input_shape, name='out_recon'))
    
    # Models for training and evaluation (prediction)
    train_model = models.Model([x,y], [out_caps, decoder(masked_by_y)])
    eval_model = models.Model(x, [out_caps, decoder(masked)])
    
    # manipulate model
    noise = layer.Input(shape=(nclass, 16))
    noised_digitcaps = layers.Add()([digitcaps, noise])
    masked_noised_y = Mask()([noised_digitcaps, noise])
    manipulate_model = models.Model([x, y, noise], decoder(masked_noised_y))
    return train_model, eval_model, manipulate_model    

In [None]:
def load_fashion_mnist():
    # the data, shuffled and split between train and test sets
    from keras.datasets import fashion_mnist
    (x_train, y_train), (x_test, y_test) = fashion_mnist.load_data()
    
    x_train = x_train.reshape(-1, 28, 28, 1).astype('float32') / 255.0
    x_test = x_test.reshape(-1, 28, 28, 1).astype('float32') / 255.0
    y_train = to_categorical(y_train.astype('float32'))
    y_test = to_categorical(y_test.astype('float32'))
    return (x_train, y_train), (x_test, y_test)

In [None]:
import os
import argparse
from keras.preprocessing.image import ImageDataGenerator
from keras import callbacks

# setting the hyper parameters
parser = argparse.ArgumentParser(description="Capsule network on Fashion MNIST")
parser.add_argument('--epochs', default=50, type=int)
parser.add_argument('--batch_size', default=100, type=int)
parser.add_argument('--lr', default=0.001, type=float, help="Initial learning rate")
parser.add_argument('--lr_decay', default=0.9, type=float, help="The value multiplied by lr at each epoch. Set a larger value for larger epochs")
parser.add_argument('--lam_recon', default=0.392, type=float, help="The cofficient for the loss of decoder")
parser.add_argument('-r', '--routings', default=3, type=int, help="Number of iterations used in routing algorithm. Should > 0")
parser.add_argument('--shift_fraction', default=0.1, type=float, help="Faction of pixels to shift at most in each direction.")
parser.add_argument('--debug', action='store_true', help="Save weights by TensorBoard")
parser.add_argument('--save_dir', default='./result')
parser.add_argument('-t', '--testing', action='store_true', help="Test the trained model on testing dataset")
parser.add_argument('--digit', default=5, type=int, help="Digit to manipulate")
parser.add_argument('-w', '--weights', default=None, help="The path of the saved weights. Should be specified when testing.")
args = parser.parse_args(["--epochs", "2"])
print(args)

if not os.path.exists(args.save_dir):
    os.makedirs(args.save_dir)

# load the data
(x_train, y_train), (x_test, y_test) = load_fashion_mnist()

# define the model
model, eval_model, manipulate_model = CapsNet(input_shape=x_train.shape[1:],
                                              n_class=len(np.unique(np.argmax(y_train, 1))),
                                             routings=args.routings)
model.summary()

if args.weights is not None:   # init the model weights with provided one
    model.load_weights(args.weights)
if not args.testing:
    train(model=model, data=((x_train, y_train), (x_test, y_test)), args=args)
else:
    if args.weights is None:
        print("No weights provided. Will test using random initialized weights.")
    manipulate_latent(manipulate_model, (x_test, y_test), args)
    test(model=eval_model, data=(x_test, y_test), args=args)