#Keras-AdaCos
---
## References
* Original paper: [AdaCos: Adaptively Scaling Cosine Logits for Effectively Learning Deep Face Represent    ations](https://arxiv.org/abs/1905.00292)
* ArcFace in Keras: https://github.com/4uiiurz1/keras-arcface
* AdaCos in TensorFlow: https://github.com/taekwan-lee/adacos-tensorflow
---


In [0]:
import numpy as np

import keras, math, os
from keras.layers import Layer, Input, Dense, Flatten, BatchNormalization, Conv2D, Activation, MaxPooling2D, Dropout
from keras.models import Model
from keras import regularizers
from keras.datasets import mnist
from keras.optimizers import Adam
from keras.preprocessing.image import ImageDataGenerator
from keras.callbacks import Callback, ModelCheckpoint, TerminateOnNaN
from keras import backend as K
import tensorflow as tf

In [0]:
class AdaCos(Layer):
    def __init__(self, n_classes=10, m=0.50, regularizer=None, **kwargs):
        super(AdaCos, self).__init__(**kwargs)
        self.n_classes = n_classes
        self.s = math.sqrt(2)*math.log(n_classes-1)
        self.m = m
        self.regularizer = regularizers.get(regularizer)

    def build(self, input_shape):
        super(AdaCos, self).build(input_shape[0])
        self.W = self.add_weight(name='W',
                                shape=(input_shape[0][-1], self.n_classes),
                                initializer='glorot_uniform',
                                trainable=True,
                                regularizer=self.regularizer)

    def call(self, inputs):
        x, y = inputs
        # normalize feature
        x = tf.nn.l2_normalize(x, axis=1)
        # normalize weights
        W = tf.nn.l2_normalize(self.W, axis=0)
        # dot product
        logits = x @ W
        # add margin
        # clip logits to prevent zero division when backward
        theta = tf.acos(K.clip(logits, -1.0 + K.epsilon(), 1.0 - K.epsilon()))

        B_avg = tf.where(y < 1, tf.exp(self.s*logits), tf.zeros_like(logits))
        B_avg = tf.reduce_mean(tf.reduce_sum(B_avg, axis=1), name='B_avg')
        theta_class = tf.gather(theta, tf.cast(y, tf.int32), name='theta_class')
        theta_med = tf.contrib.distributions.percentile(theta_class, q=50)

        with tf.control_dependencies([theta_med, B_avg]):
            self.s = tf.log(B_avg) / tf.cos(tf.minimum(math.pi/4, theta_med))
            logits = self.s * logits 
            out = tf.nn.softmax(logits)
        return out

    def compute_output_shape(self, input_shape):
        return (None, self.n_classes)

In [0]:
weight_decay = 1e-1
def vgg_block(x, filters, layers):
    for _ in range(layers):
        x = Conv2D(filters, (3, 3), padding='same', kernel_initializer='he_normal',
                    kernel_regularizer=regularizers.l2(weight_decay))(x)
        x = BatchNormalization()(x)
        x = Activation('relu')(x)

    return x

In [0]:
def vgg8_adacos(input_size=None, num_class=10, num_features=3):
    input = Input(shape=input_size)
    y = Input(shape=(num_class,))

    x = vgg_block(input, 16, 2)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = vgg_block(x, 32, 2)
    x = MaxPooling2D(pool_size=(2, 2))(x)
    x = vgg_block(x, 64, 2)
    x = MaxPooling2D(pool_size=(2, 2))(x)

    x = BatchNormalization()(x)
    x = Dropout(0.5)(x)
    x = Flatten()(x)
    x = Dense(num_features, kernel_initializer='he_normal',
                kernel_regularizer=regularizers.l2(weight_decay))(x)
    x = BatchNormalization()(x)
    output = AdaCos(num_class, regularizer=regularizers.l2(weight_decay))([x, y])

    return Model([input, y], output)

In [0]:
(X, y), (X_test, y_test) = mnist.load_data()
y_test_org = y_test.copy()

X = X[:, :, :, np.newaxis].astype('float32') / 255
X_test = X_test[:, :, :, np.newaxis].astype('float32') / 255

y = keras.utils.to_categorical(y, 10)
y_test = keras.utils.to_categorical(y_test, 10)

In [0]:
optimizer = Adam(lr=1e-3)

In [0]:
model = vgg8_adacos(input_size=(28,28,1), num_class=10, num_features=3)

In [0]:
model.compile(loss='categorical_crossentropy', optimizer=optimizer, metrics=['accuracy'])

In [0]:
datagen = ImageDataGenerator(
        # set input mean to 0 over the dataset
        featurewise_center=False,
        # set each sample mean to 0
        samplewise_center=False,
        # divide inputs by std of dataset
        featurewise_std_normalization=False,
        # divide each input by its std
        samplewise_std_normalization=False,
        # apply ZCA whitening
        zca_whitening=False,
        # epsilon for ZCA whitening
        zca_epsilon=1e-06,
        # randomly rotate images in the range (deg 0 to 180)
        rotation_range=180,
        # randomly shift images horizontally
        width_shift_range=0.,
        # randomly shift images vertically
        height_shift_range=0.,
        # set range for random shear
        shear_range=0.,
        # set range for random zoom
        zoom_range=0.,
        # set range for random channel shifts
        channel_shift_range=0.,
        # set mode for filling points outside the input boundaries
        fill_mode='nearest',
        # value used for fill_mode = "constant"
        cval=0.,
        # randomly flip images
        horizontal_flip=True,
        # randomly flip images
        vertical_flip=True,
        # set rescaling factor (applied before any other transformation)
        rescale=None,
        # set function that will be applied on each input
        preprocessing_function=None,
        # image data format, either "channels_first" or "channels_last"
        data_format=None,
        # fraction of images reserved for validation (strictly between 0 and 1)
        validation_split=0.0)

# Compute quantities required for featurewise normalization
# (std, mean, and principal components if ZCA whitening is applied).
datagen.fit(X)

In [0]:
if not os.path.exists('arcface_model'):
    os.mkdir('arcface_model')

In [0]:
callbacks = [
             ModelCheckpoint(os.path.join('arcface_model','model.hdf5'),
                             verbose=1,
                             save_best_only=True),
             TerminateOnNaN()]

In [0]:
model.fit([X, y], y, validation_data=([X_test, y_test], y_test), epochs=100, batch_size=256, callbacks=callbacks, verbose=1)

In [0]:
model = keras.models.load_model('arcface_model/model.hdf5', custom_objects={'AdaCos': AdaCos})
score = model.evaluate([X_test, y_test], y_test, verbose=1)
print("loss:{}, accuracy:{}".format(score[0], score[1]))

In [0]:
model = Model(inputs=model.input[0], outputs=model.layers[-3].output)
features = model.predict(X_test, verbose=1)
features /= np.linalg.norm(features, axis=1, keepdims=True)

In [0]:
import matplotlib.pyplot as plt
from mpl_toolkits.mplot3d import Axes3D

In [0]:
fig = plt.figure()
ax = Axes3D(fig)
for c in range(len(np.unique(y_test_org))):
    ax.plot(features[y_test_org==c, 0], features[y_test_org==c, 1], features[y_test_org==c, 2], '.', alpha=0.1)
plt.title('Features distribution with AdaCos')