In [1]:
# import os
# os.environ["CUDA_DEVICE_ORDER"] = "PCI_BUS_ID"   # see issue #152
# os.environ["CUDA_VISIBLE_DEVICES"] = "-1"

In [1]:
from keras import backend as K
from keras.engine.topology import Layer
from keras.models import Sequential
from keras.layers import Dense, Activation, InputLayer
import keras

from sklearn import datasets, model_selection, preprocessing, metrics

import numpy as np

import theano




Using Theano backend.


## Implementing circulant layer

In [2]:
a = np.array([[1,2,3], [2,3,4]])
a= a[:,None,:]
b = np.array([[[1,2,3], [3,4,5]],[[1,2,3], [3,4,5]]])
a*b

array([[[ 1,  4,  9],
        [ 3,  8, 15]],

       [[ 2,  6, 12],
        [ 6, 12, 20]]])

In [3]:
a=np.array([[1,2,3], [3,4,5]])
np.tile(a, (3, 1,1))

array([[[1, 2, 3],
        [3, 4, 5]],

       [[1, 2, 3],
        [3, 4, 5]],

       [[1, 2, 3],
        [3, 4, 5]]])

In [4]:
def circulant_convolution_op(r, X):
    rfft = np.fft.rfft(r)
    Xfft = np.fft.rfft(X)
    return np.fft.irfft(rfft*Xfft)

def circulant_convolution_grad_X(r, X):
    rfft = np.fft.rfft(r)
    Xfft = np.fft.rfft(X)
    return np.fft.irfft(rfft*Xfft)



In [79]:
class CirculantConvolutionGradientHelperOp(theano.Op):
    def __init__(self):
        super(CirculantConvolutionGradientHelperOp, self).__init__()

    def make_node(self, X, O):
        X = theano.tensor.as_tensor_variable(X)
        O = theano.tensor.as_tensor_variable(O)

        return theano.Apply(self, [X, O], [O.type()])

    def perform(self, node, inputs, output_storage):
        X = inputs[0][:, None, :]
        O = inputs[1]
        O = O.T
        
        Offt = np.fft.rfft(O)
        Offt = np.tile(Offt, (X.shape[0],1,1))
        
        Xfft = np.fft.rfft(X)
        
        XOprod = np.fft.irfft(Xfft*Offt)
        
        z[0] = np.swapaxes(XOprod, 1,2)

    def infer_shape(self, node, input_shapes):
        return [(input_shapes[0][0], input_shapes[0][1], input_shapes[1][1])]   

In [83]:
class CirculantConvolutionOp(theano.Op):
    def __init__(self):
        super(CirculantConvolutionOp, self).__init__()

    def make_node(self, r, X):
        r = theano.tensor.as_tensor_variable(r)
        X = theano.tensor.as_tensor_variable(X)
        return theano.Apply(self, [r, X], [X.type()])

    def perform(self, node, inputs, output_storage):
        r = inputs[0]
        X = inputs[1]
        
        rfft = np.fft.rfft(r)
        Xfft = np.fft.rfft(X)
        
        result = np.fft.irfft(rfft*Xfft)
        
        z = output_storage[0]
        z[0] = result

    def infer_shape(self, node, input_shapes):
        return [input_shapes[1]]

    def grad(self, inputs, output_grads):
        r = inputs[0]
        X = inputs[1]
        
        #grad for r
        grad_for_r_op = CirculantConvolutionGradientHelperOp()
        grad_r = grad_for_r_op(X, output_grads[0])
        
        #grad for X
        grad_for_X_op = CirculantConvolutionOp()
        grad_for_one_sample = grad_for_X_op(r, output_grads[0].T).T
        grad_X=theano.tensor.tile(grad_for_one_sample, (X.shape[0],1,1))
        
        return [grad_X, grad_r]

In [84]:
r = theano.tensor.dvector()
X = theano.tensor.dmatrix()
conv_op = CirculantConvolutionOp()
f = theano.function([r, X], conv_op(r, X))

print(f([1,2], [[1,2],[0,0]]))

[[ 5.  4.]
 [ 0.  0.]]


In [85]:
def test_grad():
    theano.tensor.verify_grad(conv_op,
                              [np.array([1,2], dtype=np.float32), np.array([[1,2],[0,0]], dtype=np.float32)],
                             rng=np.random.RandomState())
    
test_grad()

ValueError: <__main__.CirculantConvolutionOp object at 0x000001B653FD33C8>.grad returned a term with 3 dimensions, but 1 are required.

In [None]:
from theano.tests import unittest_tools as utt
from theano import config

In [11]:
class CirculantLayer(Layer):

    def __init__(self, output_dim,  **kwargs):
        self.output_dim = output_dim
        super(CirculantLayer, self).__init__(**kwargs)

    def build(self, input_shape):
        # Create a trainable weight variable for this layer.
        self.sign_flipping = np.random.randint(0,2, size = input_shape[1])
        self.sign_flipping = (self.sign_flipping*2-1).astype(np.float32)
        
        self.input_dim = input_shape[1]
        
        circulant_vec_shape = (self.input_dim,)
        if self.input_dim<self.output_dim:
            circulant_vec_shape = (self.output_dim,)
        
        self.circulant_vec = self.add_weight(name='circulant_vec', 
                                      shape=circulant_vec_shape,
                                      initializer='uniform',
                                      trainable=True)
        super(CirculantLayer, self).build(input_shape)  # Be sure to call this somewhere!

    def call(self, x):
        x = K.tf.multiply(x, K.tf.constant(self.sign_flipping))  
        
        if self.input_dim<self.output_dim:
            paddings = K.tf.constant([[0, 0], [0, self.output_dim-self.input_dim]])
            x = K.tf.pad(x, paddings)
        
        r = self.circulant_vec
        
        xfft = K.tf.spectral.rfft(x)
        Rfft = K.tf.spectral.rfft(self.circulant_vec)
        
        Rx = K.tf.spectral.irfft(K.tf.multiply(xfft, Rfft))
        
        if self.input_dim>self.output_dim:
            Rx = Rx[:, :self.output_dim]
        
        return Rx

    def compute_output_shape(self, input_shape):
        return (input_shape[0], self.output_dim)

In [4]:
# class CirculantLayer(Layer):

#     def __init__(self, output_dim,  **kwargs):
#         self.output_dim = output_dim
#         super(CirculantLayer, self).__init__(**kwargs)

#     def build(self, input_shape):
#         # Create a trainable weight variable for this layer.
#         self.sign_flipping = np.random.randint(0,2, size = input_shape[1])
#         self.sign_flipping = (self.sign_flipping*2-1).astype(np.float32)
        
#         self.input_dim = input_shape[1]
        
#         circulant_vec_shape = (self.input_dim,)
#         if self.input_dim<self.output_dim:
#             circulant_vec_shape = (self.output_dim,)
        
#         self.circulant_vec = self.add_weight(name='circulant_vec', 
#                                       shape=circulant_vec_shape,
#                                       initializer='uniform',
#                                       trainable=True)
#         super(CirculantLayer, self).build(input_shape)  # Be sure to call this somewhere!

#     def call(self, x):
#         x =  K.T.as_tensor_variable(self.sign_flipping) * x  
        
#         if self.input_dim<self.output_dim:
#             zeros = K.T.zeros((x.shape[0], self.output_dim))
#             x = K.T.set_subtensor(zeros[:,:self.input_dim], x)
        
#         r = self.circulant_vec
        
#         xfft = K.T.fft.rfft(x)
#         Rfft = K.T.fft.rfft(self.circulant_vec.reshape((1,-1)))
        
#         Rx = K.T.fft.irfft(Rfft[0] * xfft)
        
#         if self.input_dim>self.output_dim:
#             Rx = Rx[:, :self.output_dim]
        
#         return Rx

#     def compute_output_shape(self, input_shape):
#         return (input_shape[0], self.output_dim)

## Testing on digits dataset

In [12]:
digits = datasets.load_digits()
n_samples = len(digits.images)
X = digits.images.reshape((n_samples, -1))
le = preprocessing.OneHotEncoder()
y = le.fit_transform(digits.target.reshape(-1, 1)).todense()

In [13]:
X_train, X_test, y_train, y_test = model_selection.train_test_split(X, y, test_size=0.33, random_state=42)

In [35]:
def fit_circulant():
    model = Sequential()
    model.add(Dense(1000, input_shape=[X.shape[1]]))
    model.add(Activation('relu'))
    model.add(CirculantLayer(1000))
    model.add(Activation('relu'))
    model.add(CirculantLayer(1000))
    model.add(Activation('relu'))
    model.add(Dense(10))
    model.add(Activation('sigmoid'))

    model.compile(optimizer='adam',
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])
    model.fit(X_train, y_train, verbose = 0)
    return model

In [36]:
def fit_dense():
    model = Sequential()
    model.add(Dense(1000, input_shape=[X.shape[1]]))
    model.add(Activation('relu'))
    model.add(Dense(1000))
    model.add(Activation('relu'))
    model.add(Dense(1000))
    model.add(Activation('relu'))
    model.add(Dense(10))
    model.add(Activation('sigmoid'))

    model.compile(optimizer='adam',
                  loss='categorical_crossentropy',
                  metrics=['accuracy'])
    model.fit(X_train, y_train, verbose=0)
    return model

In [37]:
%timeit -n 2 fit_dense()

2 loops, best of 3: 9.02 s per loop


In [38]:
%timeit -n 2 fit_circulant()

2 loops, best of 3: 11.1 s per loop


In [30]:
dense_model = fit_dense()
circulant_model = fit_circulant()

In [39]:
%timeit dense_model.predict(X_test)

10 loops, best of 3: 32.2 ms per loop


In [40]:
%timeit circulant_model.predict(X_test)

10 loops, best of 3: 92.5 ms per loop


In [None]:
K.eval(K.mean(keras.metrics.categorical_accuracy(K.variable(y_test), K.variable(dense_model.predict(X_test)))))

In [None]:
K.eval(K.mean(keras.metrics.categorical_accuracy(K.variable(y_test), K.variable(circulant_model.predict(X_test)))))