# **Using CapsNet for Video Classification**

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import tensorflow as tf

## **Load the datasets**

In [None]:
# Initialize labels
LABELS = set(["Abuse", "Assault", "Fighting", "Normal", "Robbery", "Vandalism"])

# Initialize the list of images
print("Loading images:")
imagePaths = list(paths.list_images(r'C:\Users\Yash Umale\Documents\6th Sem\Open Lab\Python Files\Crime Detection\Datasets'))

data = []
labels = []

# Loop over the image paths
for imagePath in imagePaths:
    label = imagePath.split(os.path.sep)[-2]

    if label not in LABELS:
        continue
    
    image = cv2.imread(imagePath)
    image = cv2.cvtColor(image, cv2.COLOR_BGR2RGB)
    image = cv2.resize(image, (224, 224))
    
    data.append(image)
    labels.append(label)

np.array(labels)
np.array(data)

In [None]:
(trainX, testX, trainY, testY) = train_test_split(data, labels, test_size=0.25, stratify=labels, random_state=42)

# Initialize the training data augmentation object
trainAug = ImageDataGenerator(rotation_range=30, 
                              zoom_range=0.15, 
                              width_shift_range=0.2, 
                              height_shift_range=0.2, 
                              shear_range=0.15, 
                              horizontal_flip=True, 
                              fill_mode="nearest")

# Initialize the validation/testing data augmentation object 
valAug = ImageDataGenerator()

# Define the ImageNet mean subtraction (in RGB order) 
mean = np.array([123.68, 116.779, 103.939], dtype="float32")
trainAug.mean = mean
valAug.mean = mean
n_epochs = 50

In [None]:
X = tf.placeholder(shape = [None, 224, 224, 3], dtype = tf.float32, name = "X")

### The first layer will be composed of 256 maps of 104 x 104 capsules each.
Each capsule will output a 128 dimensional vector.

In [None]:
caps_n_maps = 256
caps1_n_caps = caps1_n_maps * 104 * 104                                              # 2768896 capsules
caps1_n_dims = 128

In [None]:
conv1 = tf.layers.Conv2D(X, name = "conv1", 
                        filters = 4096, 
                        kernel_size = 9,
                        strides = 1,
                        padding = "valid",
                        activation = tf.nn.relu)

conv2 = tf.layers.Conv2D(conv1, name = "conv2",
                        filters = caps1_n_maps * caps1_n_dims, 
                        kernel_size = 9, 
                        strides = 2,
                        padding = "valid",
                        activation = tf.nn.relu)

Since the kernel size is 9, the image is shrunk by (9 - 1 = 8) pixels after each Conv2D layer.

Hence, after two convolution layers we have (224, 224, 3) -> (216, 216, 3) -> (208, 208, 3).\
Moreover, as stride = 2, (208, 208, 3) -> (104, 104, 3)

### Output of the Conv2D layer:

Number of maps (256) * Vector dimensions per capsule (128) = 32768 feature maps for each capsule.\
Each feature map is 104 * 104.

In [None]:
caps1_raw = tf.reshape(conv2, [-1, caps1_n_caps, caps1_n_dims], name = "caps1_raw")

In [None]:
def squash(s, axis = -1, epsilon = 1e-7, name = None):
    with tf.name_scope(name, default_name="squash"):
        squared_norm = tf.reduce_sum(tf.square(s), axis=axis,
                                     keep_dims=True)
        safe_norm = tf.sqrt(squared_norm + epsilon)
        squash_factor = squared_norm / (1. + squared_norm)
        unit_vector = s / safe_norm
        return squash_factor * unit_vector

In [None]:
# Output of the first capsule layer
caps1_output = squash(caps1_raw, name="caps1_output")

## Double Digit Capsule Layer

In [2]:
import os
import argparse

import numpy as np
import matplotlib.pyplot as plt
import tensorflow as tf
from tensorflow import keras
from PIL import Image

from capsnet.two_digit_capsules.capsule_layers import CapsuleLayer, PrimaryCap, Length, Mask
from capsnet.two_digit_capsules.utils import combine_images, plot_log

keras.backend.set_image_data_format('channels_last')

### Helper Functions (for CapsNet)

In [3]:
class Length(keras.layers.Layer):
    
    def call(self, inputs, **kwargs):
        return tf.sqrt(tf.reduce_sum(tf.square(inputs), -1))

    def compute_output_shape(self, input_shape):
        print('Length input_shape:', input_shape)
        return input_shape[:-1]

    def get_config(self):
        config = super(Length, self).get_config()
        return config


class Mask(keras.layers.Layer):
    
    def call(self, inputs, **kwargs):
        if type(inputs) is list:  
            assert len(inputs) == 2
            inputs, mask = inputs
        else:  
            x = tf.sqrt(tf.reduce_sum(tf.square(inputs), -1))
            mask = tf.one_hot(indices=tf.argmax(x, 1), depth=x.get_shape().as_list()[1])

        masked = keras.backend.batch_flatten(inputs * tf.expand_dims(mask, -1))
        return masked

    def compute_output_shape(self, input_shape):
        print('Mask input shape:', input_shape)
        if type(input_shape[0]) is tuple:
            return tuple([None, int(input_shape[0][1]) * int(input_shape[0][2])])
        else:  
            return tuple([None, int(input_shape[1]) * int(input_shape[2])])

    def get_config(self):
        config = super(Mask, self).get_config()
        return config

In [4]:
def squash(vectors, axis=-1):
   
    s_squared_norm = tf.reduce_sum(tf.square(vectors), axis, keepdims=True)
    scale = s_squared_norm / (1 + s_squared_norm) / tf.sqrt(s_squared_norm + keras.backend.epsilon())
    return scale * vectors

In [None]:
class CapsuleLayer(keras.layers.Layer):
    
    def __init__(self, num_capsule, dim_capsule, routings=3,
                 kernel_initializer='glorot_uniform',
                 **kwargs):
        super(CapsuleLayer, self).__init__(**kwargs)
        self.num_capsule = num_capsule
        self.dim_capsule = dim_capsule
        self.routings = routings
        self.kernel_initializer = keras.initializers.get(kernel_initializer)

    def build(self, input_shape):
        assert len(input_shape) >= 3, "The input Tensor should have shape=[None, input_num_capsule, input_dim_capsule]"
        self.input_num_capsule = int(input_shape[1])
        self.input_dim_capsule = int(input_shape[2])

        # Transform matrix
        self.W = self.add_weight(shape=[self.num_capsule, self.input_num_capsule,
                                        self.dim_capsule, self.input_dim_capsule],
                                 initializer=self.kernel_initializer,
                                 name='W')

        self.built = True

    def call(self, inputs, training=None):
        inputs_expand = tf.expand_dims(inputs, 1)

        inputs_tiled = tf.tile(inputs_expand, [1, self.num_capsule, 1, 1])

        inputs_hat = tf.map_fn(lambda x: keras.backend.batch_dot(x, self.W, [2, 3]), elems=inputs_tiled)
        b = tf.zeros(shape=[tf.shape(inputs_hat)[0], self.num_capsule, self.input_num_capsule])

        assert self.routings > 0, 'The routings should be > 0.'
        for i in range(self.routings):
            c = tf.nn.softmax(b, dim=1)
            outputs = squash(keras.backend.batch_dot(c, inputs_hat, [2, 2]))  # [None, 10, 16]

            if i < self.routings - 1:
                b += keras.backend.batch_dot(outputs, inputs_hat, [2, 3])

        return outputs

    def compute_output_shape(self, input_shape):
        return tuple([None, self.num_capsule, self.dim_capsule])

    def get_config(self):
        config = {
            'num_capsule': self.num_capsule,
            'dim_capsule': self.dim_capsule,
            'routings': self.routings
        }
        base_config = super(CapsuleLayer, self).get_config()
        return dict(list(base_config.items()) + list(config.items()))

### Initializing a Primary Capsule

In [None]:
def PrimaryCap(inputs, dim_capsule, n_channels, kernel_size, strides, padding, do_reshape=False):
    conv = keras.layers.Conv2D(filters=dim_capsule*n_channels, kernel_size=kernel_size, strides=strides, padding=padding,
                               kernel_regularizer=keras.regularizers.l2(1.e-4))(inputs)
    output = keras.layers.BatchNormalization(axis=3)(conv)

    if not do_reshape:
        conv_shape = output.get_shape()
        shape = int(int(conv_shape[1]) * int(conv_shape[2]) * (int(conv_shape[3]) / dim_capsule))
        desired_shape = [int(math.sqrt(shape)), int(math.sqrt(shape)), dim_capsule]
        outputs = keras.layers.Reshape(target_shape=desired_shape, name='primarycap_reshape_1')(output)

        return keras.layers.Lambda(squash, name='primarycap_squash_1')(outputs)

    conv_shape = output.get_shape()
    desired_shape = [int(int(conv_shape[1]) * int(conv_shape[2]) * (int(conv_shape[3]) / dim_capsule)), dim_capsule]
    outputs = keras.layers.Reshape(target_shape=desired_shape, name='primarycap_reshape_2')(output)
    
    return keras.layers.Lambda(squash, name='primarycap_squash_2')(outputs)

In [None]:
def CapsNet(input_shape, n_class, routings, primary_capsules=16, number_of_primary_channels=32, digit_capsules=16):
    
    x = keras.layers.Input(shape=input_shape)

    # Layer 1: Just a conventional Conv2D layer
    conv1 = keras.layers.Conv2D(filters=256, kernel_size=5, strides=2, padding='valid', name='conv1', kernel_regularizer=keras.regularizers.l2(1.e-4))(x)
    norm = keras.layers.BatchNormalization(axis=3)(conv1)
    conv1 = keras.layers.Activation('relu')(norm)

    # Layer 2: Conv2D layer with `squash` activation, then reshape to [None, num_capsule, dim_capsule]
    primarycaps = PrimaryCap(conv1, dim_capsule=32, n_channels=16, kernel_size=5, strides=2, padding='valid')

    # Layer 2: Conv2D layer with `squash` activation, then reshape to [None, num_capsule, dim_capsule]
    primarycaps = PrimaryCap(primarycaps, dim_capsule=32, n_channels=16, kernel_size=5, strides=2, padding='valid', do_reshape=True)

    # Layer 3: Capsule layer. Routing algorithm works here.
    digitcaps = CapsuleLayer(num_capsule=n_class, dim_capsule=digit_capsules, routings=routings, name='digitcaps')(primarycaps)

    # Layer 4: To replace capsule layer with it's length (Not required with TensorFlow)
    out_caps = Length(name='capsnet')(digitcaps)

    # Decoder network
    y = keras.layers.Input(shape=(n_class,))
    masked_by_y = Mask()([digitcaps, y])  # The true label is used to mask the output of capsule layer. For training
    masked = Mask()(digitcaps)  # Mask using the capsule with maximal length. For prediction

    # Shared Decoder model in training and prediction
    decoder = keras.models.Sequential(name='decoder')
    decoder.add(keras.layers.Dense(1024, activation='relu', input_dim=digit_capsules*n_class))
    decoder.add(keras.layers.Dense(1024, activation='relu'))
    decoder.add(keras.layers.Dense(np.prod(input_shape), activation='sigmoid'))
    decoder.add(keras.layers.Reshape(target_shape=input_shape, name='out_recon'))

    # Models for training and evaluation (prediction)
    train_model = keras.models.Model([x, y], [out_caps, decoder(masked_by_y)])
    eval_model = keras.models.Model(x, [out_caps, decoder(masked)])

    # Manipulate model
    noise = keras.layers.Input(shape=(n_class, digit_capsules))
    noised_digitcaps = keras.layers.Add()([digitcaps, noise])
    masked_noised_y = Mask()([noised_digitcaps, y])
    manipulate_model = keras.models.Model([x, y, noise], decoder(masked_noised_y))
    return train_model, eval_model, manipulate_model

In [None]:
def margin_loss(y_true, y_pred):
   
    L = y_true * tf.square(tf.maximum(0., 0.9 - y_pred)) + \
        0.5 * (1 - y_true) * tf.square(tf.maximum(0., y_pred - 0.1))

    return tf.reduce_mean(tf.reduce_sum(L, 1))

In [None]:
def train(model, save_dir, batch_size, debug, learning_rate, lr_decay, lam_recon):

    # Setup callbacks
    log = keras.callbacks.CSVLogger(save_dir + '/log.csv')
    tb = keras.callbacks.TensorBoard(log_dir=save_dir + '/tensorboard-logs',
                                     batch_size=batch_size, histogram_freq=int(debug))
    checkpoint = keras.callbacks.ModelCheckpoint(save_dir + '/weights-{epoch:02d}.h5', monitor='val_capsnet_acc',
                                                 save_best_only=True, save_weights_only=True, verbose=1)
    lr_decay = keras.callbacks.LearningRateScheduler(schedule=lambda epoch: lr * (lr_decay ** epoch))

    # Compile the model
    model.compile(optimizer=keras.optimizers.Adam(lr=lr),
                  loss=[margin_loss, 'mse'],
                  loss_weights=[1., lam_recon],
                  metrics={'capsnet': 'accuracy'})

    # Training
    model.fit_generator(generator=cifar10.get_train_generator_for_capsnet(batch_size),
                        steps_per_epoch=int(cifar10.TRAIN_SIZE / batch_size),
                        epochs=epochs,
                        validation_data=cifar10.get_validation_data_for_capsnet(),
                        callbacks=[log, tb, checkpoint, lr_decay])

    model.save_weights(save_dir + '/trained_model.h5')
    print('Trained model saved to \'%s/trained_model.h5\'' % save_dir)

    plot_log(save_dir + '/log.csv', show=True)

    return model

In [None]:
def test(model, save_dir):
    x_test, y_test = cifar10.get_test_data()
    y_pred, x_recon = model.predict(x_test, batch_size=100)
    print('-'*30 + 'Begin: test' + '-'*30)
    print('Test acc:', np.sum(np.argmax(y_pred, 1) == np.argmax(y_test, 1))/y_test.shape[0])

    img = combine_images(np.concatenate([x_test[:50],x_recon[:50]]))
    image = img * 255
    Image.fromarray(image.astype(np.uint8)).save(save_dir + "/real_and_recon.png")
    print()
    print('Reconstructed images are saved to %s/real_and_recon.png' % save_dir)
    print('-' * 30 + 'End: test' + '-' * 30)
    plt.imshow(plt.imread(save_dir + "/real_and_recon.png"))
    plt.show()

In [None]:
def manipulate_latent(model, digit, save_dir):
    print('-'*30 + 'Begin: manipulate' + '-'*30)
    x_test, y_test = cifar10.get_test_data()
    index = np.argmax(y_test, 1) == args.digit
    number = np.random.randint(low=0, high=sum(index) - 1)
    x, y = x_test[index][number], y_test[index][number]
    x, y = np.expand_dims(x, 0), np.expand_dims(y, 0)
    noise = np.zeros([1, 10, 16])
    x_recons = []
    for dim in range(16):
        for r in [-0.25, -0.2, -0.15, -0.1, -0.05, 0, 0.05, 0.1, 0.15, 0.2, 0.25]:
            tmp = np.copy(noise)
            tmp[:,:,dim] = r
            x_recon = model.predict([x, y, tmp])
            x_recons.append(x_recon)

    x_recons = np.concatenate(x_recons)

    img = combine_images(x_recons, height = 16)
    image = img * 255
    Image.fromarray(image.astype(np.uint8)).save(save_dir + '/manipulate-%d.png' % digit)
    print('manipulated result saved to %s/manipulate-%d.png' % (save_dir, digit))
    print('-' * 30 + 'End: manipulate' + '-' * 30)

In [None]:
# Define model
model, eval_model, manipulate_model = CapsNet(input_shape= # dataset shape,
                                              n_class= # classes,
                                              routings=args.routings,
                                              primary_capsules=args.primary_capsules,
                                              number_of_primary_channels=args.number_of_primary_channels,
                                              digit_capsules=args.digit_capsules)

model.summary()

# Train or test
if args.weights is not None:  
    model.load_weights(args.weights)
if not args.testing:
    train(model=model, save_dir, batch_size, debug, learning_rate, lr_decay, lam_recon)
else:  
    if args.weights is None:
        print('No weights are provided. Will test using random initialized weights.')
    manipulate_latent(manipulate_model, args)
    test(model=eval_model, save_dir)