In [1]:
from tensorflow import keras
from keras.layers import Input, Conv1D, Flatten, Dropout, MaxPooling1D, Dense, BatchNormalization, Activation, UpSampling1D, Concatenate
from keras.initializers import TruncatedNormal
from keras.utils.np_utils import to_categorical   
from keras.models import Model
from keras import backend as K
from keras.engine.topology import Layer
from keras.layers import Lambda
from keras import optimizers
from keras import layers
import numpy as np
import h5py

Using TensorFlow backend.


In [2]:
hf = h5py.File('/scratck/kjakkala/neuralwave/data/pca_data.h5', 'r')
X_train = np.expand_dims(hf.get('X_train'), axis=-1)
X_test = np.expand_dims(hf.get('X_test'), axis=-1)
y_train = np.eye(30)[hf.get('y_train')]
y_test = np.eye(30)[hf.get('y_test')]
hf.close()

(1107, 361, 1) (1107, 30)


In [3]:
def identity_block(input_tensor, kernel_size, filters, stage, block):
    filters1, filters2, filters3 = filters
        
    bn_axis = -1
        
    conv_name_base = 'res' + str(stage) + block + '_branch'
    bn_name_base = 'bn' + str(stage) + block + '_branch'

    x = layers.Conv1D(filters1, 1,
                      kernel_initializer='he_normal',
                      name=conv_name_base + '2a')(input_tensor)
    x = layers.BatchNormalization(axis=bn_axis, name=bn_name_base + '2a')(x)
    x = layers.Activation('relu')(x)

    x = layers.Conv1D(filters2, kernel_size,
                      padding='same',
                      kernel_initializer='he_normal',
                      name=conv_name_base + '2b')(x)
    x = layers.BatchNormalization(axis=bn_axis, name=bn_name_base + '2b')(x)
    x = layers.Activation('relu')(x)

    x = layers.Conv1D(filters3, 1,
                      kernel_initializer='he_normal',
                      name=conv_name_base + '2c')(x)
    x = layers.BatchNormalization(axis=bn_axis, name=bn_name_base + '2c')(x)

    x = layers.add([x, input_tensor])
    x = layers.Activation('relu')(x)
    return x


def conv_block(input_tensor,
               kernel_size,
               filters,
               stage,
               block,
               strides=2):
    filters1, filters2, filters3 = filters

    bn_axis = -1
    
    conv_name_base = 'res' + str(stage) + block + '_branch'
    bn_name_base = 'bn' + str(stage) + block + '_branch'

    x = layers.Conv1D(filters1, 1, strides=strides,
                      kernel_initializer='he_normal',
                      name=conv_name_base + '2a')(input_tensor)
    x = layers.BatchNormalization(axis=bn_axis, name=bn_name_base + '2a')(x)
    x = layers.Activation('relu')(x)

    x = layers.Conv1D(filters2, kernel_size, padding='same',
                      kernel_initializer='he_normal',
                      name=conv_name_base + '2b')(x)
    x = layers.BatchNormalization(axis=bn_axis, name=bn_name_base + '2b')(x)
    x = layers.Activation('relu')(x)

    x = layers.Conv1D(filters3, 1,
                      kernel_initializer='he_normal',
                      name=conv_name_base + '2c')(x)
    x = layers.BatchNormalization(axis=bn_axis, name=bn_name_base + '2c')(x)

    shortcut = layers.Conv1D(filters3, 1, strides=strides,
                             kernel_initializer='he_normal',
                             name=conv_name_base + '1')(input_tensor)
    shortcut = layers.BatchNormalization(
        axis=bn_axis, name=bn_name_base + '1')(shortcut)

    x = layers.add([x, shortcut])
    x = layers.Activation('relu')(x)
    return x

In [4]:
class AMSoftmax(Layer):

    def __init__(self, output_dim, s, m, **kwargs):
        self.output_dim = output_dim
        self.s = s
        self.m = m
        super(AMSoftmax, self).__init__(**kwargs)

    def build(self, input_shape):
        # Create a trainable weight variable for this layer.
        self.kernel = self.add_weight(name='kernel', 
                                      shape=(input_shape[0][-1], self.output_dim),
                                      initializer=TruncatedNormal(mean=0.0, stddev=1.0),
                                      trainable=True)

        super(AMSoftmax, self).build(input_shape)
        
    def call(self, inputs): 
        x = inputs[0]
        y = inputs[1]
        kernel_norm = K.l2_normalize(self.kernel, 0)
        cos_theta = K.dot(x, kernel_norm)
        cos_theta = K.clip(cos_theta, -1,1) # for numerical steady
        phi = cos_theta - self.m 
        adjust_theta = self.s * K.tf.where(K.tf.equal(y, 1), phi, cos_theta)
        return K.softmax(adjust_theta)

    def compute_output_shape(self, input_shape):
        return (input_shape[0][0], self.output_dim)

In [5]:
inputs = Input(shape=(X_train.shape[-2], 1), name='input')
labels = Input(shape=(30,), name='labels')

x = conv_block(inputs, 3, [4, 4, 8], stage=1, block='a', strides=2)
x = identity_block(x, 3, [4, 4, 8], stage=1, block='b')

x = conv_block(inputs, 3, [8, 8, 16], stage=2, block='a', strides=2)
x = identity_block(x, 3, [8, 8, 16], stage=2, block='b')

x = Flatten()(x)
x = Dense(64, activation='relu')(x)
x = AMSoftmax(30, s=30, m=0.55)([x, labels])

model = Model(inputs=[inputs, labels], outputs=x)
model.summary()
model.compile(loss='categorical_crossentropy', optimizer=optimizers.Adam(lr=1e-4), metrics=['acc'])
model.fit(x=[X_train, y_train], y=y_train, epochs=100, validation_data=([X_test, y_test], y_test), verbose=2)

__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input (InputLayer)              (None, 361, 1)       0                                            
__________________________________________________________________________________________________
res2a_branch2a (Conv1D)         (None, 181, 8)       16          input[0][0]                      
__________________________________________________________________________________________________
bn2a_branch2a (BatchNormalizati (None, 181, 8)       32          res2a_branch2a[0][0]             
__________________________________________________________________________________________________
activation_7 (Activation)       (None, 181, 8)       0           bn2a_branch2a[0][0]              
__________________________________________________________________________________________________
res2a_bran

ERROR:root:Internal Python error in the inspect module.
Below is the traceback from this internal error.



Traceback (most recent call last):
  File "/usr/local/lib/python3.6/dist-packages/IPython/core/interactiveshell.py", line 2961, in run_code
    exec(code_obj, self.user_global_ns, self.user_ns)
  File "<ipython-input-5-40380e8991ce>", line 17, in <module>
    model.fit(x=[X_train, y_train], y=y_train, epochs=100, validation_data=([X_test, y_test], y_test), verbose=2)
  File "/usr/local/lib/python3.6/dist-packages/keras/engine/training.py", line 1037, in fit
    validation_steps=validation_steps)
  File "/usr/local/lib/python3.6/dist-packages/keras/engine/training_arrays.py", line 199, in fit_loop
    outs = f(ins_batch)
  File "/usr/local/lib/python3.6/dist-packages/keras/backend/tensorflow_backend.py", line 2658, in __call__
    if hasattr(get_session(), '_make_callable_from_options'):
  File "/usr/local/lib/python3.6/dist-packages/keras/backend/tensorflow_backend.py", line 207, in get_session
    if not hasattr(session, 'list_devices'):
KeyboardInterrupt

During handling of the above

KeyboardInterrupt: 