In [1]:
! pwd

/data/GDSC_AudioPoli


In [24]:
from keras.models import load_model
# from sklearn.metrics import classification_report
# from sklearn.metrics import confusion_matrix
from keras import backend as K
from keras.utils import to_categorical
from capsulelayers import CapsuleLayer, PrimaryCap, Distance, Mask
from keras.backend.tensorflow_backend import set_session
# import seaborn as sn
# import matplotlib.pyplot as plt
import tensorflow as tf
# import pandas as pd
import numpy as np
import sys; sys.argv=['']; del sys
import argparse
import h5py
import time
import re
import os

In [25]:
# CapsNet margin loss
def margin_loss(y_true, y_pred):
    L = y_true * K.square(K.maximum(0., 0.9 - y_pred)) + \
        0.5 * (1 - y_true) * K.square(K.maximum(0., y_pred - 0.1))
    return K.mean(K.sum(L, 1))

# predict
def test_one(model, data, args):
    x_test, y_test = data
    start = time.time()
    y_pred, x_recon = model.predict(x_test, batch_size=100)
    print('\nPrediction response time : ', time.time() - start)
    print('Test accuracy:', np.sum(np.argmax(y_pred, 1) == np.argmax(y_test, 1)) / y_test.shape[0])

# preprocess data
def detect_index(name):
    index = name.split('.')[1].split('_')[0]
    if index == '강제추행(성범죄)':
        index = 1
    elif index == '강도범죄':
        index = 2
    elif index == '절도범죄':
        index = 3
    elif index == '폭력범죄':
        index = 4
    elif index == '화재':
        index = 5
    elif index == '갇힘':
        index = 6
    elif index == '응급의료':
        index = 7
    elif index == '전기사고':
        index = 8
    elif index == '가스사고':
        index = 9
    elif index == '낙상':
        index = 10
    elif index == '붕괴사고':
        index = 11
    elif index == '태풍-강풍':
        index = 12
    elif index == '지진':
        index = 13
    elif index == '도움요청':
        index = 14
    elif index == '실내':
        index = 15
    elif index == '실외':
        index = 16
    else:
        pass
    return index

def test_graph(model, data, args):
    labels = ['A', 'B', 'C', 'D', 'E', 'F', 'G', 'H', 'I', 'J', 'K', 'L', 'M', 'N', 'O', 'P']
    ''' A:강제추행, B:강도범죄, C:절도범죄, D:폭력범죄, 
    E:화재, F:갇힘, G:응급의료, H:전기사고, 
    I:가스사고, J:낙상, K:붕괴사고, L:태풍/강풍, 
    M:지진, N:도움요청, O:실내, P:실외 
    '''
    x_test, y_test = data
    y_pred, x_recon = model.predict(x_test, batch_size=20)
    cm = confusion_matrix(np.argmax(y_test, 1), np.argmax(y_pred, 1))
    print('\nCategory Classification Report\n')
    print(classification_report(np.argmax(y_test, 1), np.argmax(y_pred, 1), target_names=labels))
    print('\nConfusion Matrix graph saved', model_save)
    return cm, labels

# read HDF5 file
def test_dataset():
    x_train_mfcc = []
    y_train_mfcc = []
    for i in range(0, label):  # 1~16 label
        for ds_name in ['mfcc', 'y']:
            if ds_name == 'mfcc':
                count = "mfcc_y_%ix%i_%i" % (height, width, i)
                mfcc = h5py.File(save_HDF + count + '.h5', 'r')
                x_train_mfcc.extend(mfcc[ds_name])
            if ds_name == 'y':
                count = "mfcc_y_%ix%i_%i" % (height, width, i)
                mfcc = h5py.File(save_HDF + count + '.h5', 'r')
                y_train_mfcc.extend(mfcc[ds_name])
    # reshape
    test_x = np.array(x_train_mfcc)
    test_y = np.array(y_train_mfcc)
    test_x = test_x.reshape(-1, height, width, 1)
    test_y = np.argmax(test_y, axis=1).reshape(-1)
    test_y = test_y[:, None]
    test_y = to_categorical(test_y.astype('float32'))
    return (test_x, test_y)

In [27]:
from keras import initializers, layers


class Distance(layers.Layer):
    def call(self, inputs, **kwargs):
        return K.sqrt(K.sum(K.square(inputs), -1) + K.epsilon())

    def compute_output_shape(self, input_shape):
        return input_shape[:-1]

    def get_config(self):
        config = super(Distance, self).get_config()
        return config


class Mask(layers.Layer):
    def call(self, inputs, **kwargs):
        if type(inputs) is list:  # true label is provided with shape = [None, n_classes], i.e. one-hot code.
            assert len(inputs) == 2
            inputs, mask = inputs
        else:  # if no true label, mask by the max length of capsules. Mainly used for prediction
            # compute lengths of capsules
            x = K.sqrt(K.sum(K.square(inputs), -1))
            # generate the mask which is a one-hot code.
            # mask.shape=[None, n_classes]=[None, num_capsule]
            mask = K.one_hot(indices=K.argmax(x, 1), num_classes=x.get_shape().as_list()[1])

        # inputs.shape=[None, num_capsule, dim_capsule]
        # mask.shape=[None, num_capsule]
        # masked.shape=[None, num_capsule * dim_capsule]
        masked = K.batch_flatten(inputs * K.expand_dims(mask, -1))
        return masked

    def compute_output_shape(self, input_shape):
        if type(input_shape[0]) is tuple:  # true label provided
            return tuple([None, input_shape[0][1] * input_shape[0][2]])
        else:  # no true label provided
            return tuple([None, input_shape[1] * input_shape[2]])

    def get_config(self):
        config = super(Mask, self).get_config()
        return config


def squash(vectors, axis=-1):
    s_squared_norm = K.sum(K.square(vectors), axis, keepdims=True)
    scale = s_squared_norm / (1 + s_squared_norm) / K.sqrt(s_squared_norm + K.epsilon())
    return scale * vectors


class CapsuleLayer(layers.Layer):
    def __init__(self, num_capsule, dim_capsule, routings=3,
                 kernel_initializer='glorot_uniform',
                 **kwargs):
        super(CapsuleLayer, self).__init__(**kwargs)
        self.num_capsule = num_capsule
        self.dim_capsule = dim_capsule
        self.routings = routings
        self.kernel_initializer = initializers.get(kernel_initializer)

    def build(self, input_shape):
        assert len(input_shape) >= 3, "The input Tensor should have shape=[None, input_num_capsule, input_dim_capsule]"
        self.input_num_capsule = input_shape[1]
        self.input_dim_capsule = input_shape[2]

        # Transform matrix
        self.W = self.add_weight(shape=[self.num_capsule, self.input_num_capsule,
                                        self.dim_capsule, self.input_dim_capsule],
                                 initializer=self.kernel_initializer,
                                 name='W')

        self.built = True

    def call(self, inputs, training=None):
        # inputs.shape=[None, input_num_capsule, input_dim_capsule]
        # inputs_expand.shape=[None, 1, input_num_capsule, input_dim_capsule]
        inputs_expand = K.expand_dims(inputs, 1)

        # Replicate num_capsule dimension to prepare being multiplied by W
        # inputs_tiled.shape=[None, num_capsule, input_num_capsule, input_dim_capsule]
        inputs_tiled = K.tile(inputs_expand, [1, self.num_capsule, 1, 1])

        # Compute `inputs * W` by scanning inputs_tiled on dimension 0.
        # x.shape=[num_capsule, input_num_capsule, input_dim_capsule]
        # W.shape=[num_capsule, input_num_capsule, dim_capsule, input_dim_capsule]
        # Regard the first two dimensions as `batch` dimension,
        # then matmul: [input_dim_capsule] x [dim_capsule, input_dim_capsule]^T -> [dim_capsule].
        # inputs_hat.shape = [None, num_capsule, input_num_capsule, dim_capsule]
        inputs_hat = K.map_fn(lambda x: K.batch_dot(x, self.W, [2, 3]), elems=inputs_tiled)

        # Begin: Routing algorithm ---------------------------------------------------------------------#
        # The prior for coupling coefficient, initialized as zeros.
        # b.shape = [None, self.num_capsule, self.input_num_capsule].
        b = tf.zeros(shape=[K.shape(inputs_hat)[0], self.num_capsule, self.input_num_capsule])

        assert self.routings > 0, 'The routings should be > 0.'
        for i in range(self.routings):
            # c.shape=[batch_size, num_capsule, input_num_capsule]
            c = tf.nn.softmax(b, dim=1)

            # c.shape =  [batch_size, num_capsule, input_num_capsule]
            # inputs_hat.shape=[None, num_capsule, input_num_capsule, dim_capsule]
            # The first two dimensions as `batch` dimension,
            # then matmal: [input_num_capsule] x [input_num_capsule, dim_capsule] -> [dim_capsule].
            # outputs.shape=[None, num_capsule, dim_capsule]
            outputs = squash(K.batch_dot(c, inputs_hat, [2, 2]))  # [None, 10, 16]

            if i < self.routings - 1:
                # outputs.shape =  [None, num_capsule, dim_capsule]
                # inputs_hat.shape=[None, num_capsule, input_num_capsule, dim_capsule]
                # The first two dimensions as `batch` dimension,
                # then matmal: [dim_capsule] x [input_num_capsule, dim_capsule]^T -> [input_num_capsule].
                # b.shape=[batch_size, num_capsule, input_num_capsule]
                b += K.batch_dot(outputs, inputs_hat, [2, 3])
        # End: Routing algorithm -----------------------------------------------------------------------#

        return outputs

    def compute_output_shape(self, input_shape):
        return tuple([None, self.num_capsule, self.dim_capsule])

    def get_config(self):
        config = {
            'num_capsule': self.num_capsule,
            'dim_capsule': self.dim_capsule,
            'routings': self.routings
        }
        base_config = super(CapsuleLayer, self).get_config()
        return dict(list(base_config.items()) + list(config.items()))


def PrimaryCap(inputs, dim_capsule, n_channels, kernel_size, strides, padding):
    output = layers.Conv2D(filters=dim_capsule*n_channels, kernel_size=kernel_size, strides=strides, padding=padding,
                           name='primarycap_conv2d')(inputs)
    outputs = layers.Reshape(target_shape=[-1, dim_capsule], name='primarycap_reshape')(output)
    return layers.Lambda(squash, name='primarycap_squash')(outputs)

In [29]:
eval_model = load_model('./eval.h5',
                            custom_objects={'CapsuleLayer': CapsuleLayer, 'Mask': Mask, 'Distance': Distance,
                                            'PrimaryCab': PrimaryCap, 'margin_loss': margin_loss})
eval_model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])
eval_model.summary()
eval_model.save('./eval_2')



__________________________________________________________________________________________________
Layer (type)                    Output Shape         Param #     Connected to                     
input_1 (InputLayer)            (None, 48, 173, 1)   0                                            
__________________________________________________________________________________________________
conv1 (Conv2D)                  (None, 24, 58, 8)    208         input_1[0][0]                    
__________________________________________________________________________________________________
conv2 (Conv2D)                  (None, 20, 27, 256)  51456       conv1[0][0]                      
__________________________________________________________________________________________________
primarycap_conv2d (Conv2D)      (None, 6, 10, 256)   5308672     conv2[0][0]                      
__________________________________________________________________________________________________
primaryc

In [31]:
import pickle
with open('eval.pkl', 'wb') as f:
    pickle.dump(eval_model, f)

In [19]:
import tensorflow as tf
from tensorflow.keras.models import load_model
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
from tensorflow.keras import backend as K
from tensorflow.keras.utils import to_categorical
from capsulelayers import CapsuleLayer, PrimaryCap, Distance, Mask
from tensorflow import keras
import seaborn as sn
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np
import sys; sys.argv=['']; del sys
import argparse
import h5py
import time
import re
import os

In [20]:
! pip list | grep -E '(keras|tensorflow)'

keras                        2.14.0
tensorflow                   2.14.0
tensorflow-estimator         2.14.0
tensorflow-io-gcs-filesystem 0.35.0


In [21]:
with open('eval.pkl', 'rb') as f:
    model = pickle.load(f)

ModuleNotFoundError: No module named 'keras.engine'