In [3]:

"""
Created on Sat Dec 28 23:24:05 2019

Implemented using TensorFlow 1.0.1 and Keras 2.2.1
 
M. Zhao, S. Zhong, X. Fu, et al., Deep Residual Shrinkage Networks for Fault Diagnosis, 
IEEE Transactions on Industrial Informatics, 2019, DOI: 10.1109/TII.2019.2943898

@author: zhihu: https://zhuanlan.zhihu.com/p/115241985
"""

import tensorflow.keras as keras
import numpy as np
import h5py
from tensorflow.keras.datasets import mnist
from tensorflow.keras.layers import Dense, Conv1D, Conv2D, BatchNormalization, Activation
from tensorflow.keras.layers import Input, AveragePooling1D, Input, GlobalAveragePooling1D
from tensorflow.keras.optimizers import Adam
from tensorflow.keras.regularizers import l2
from tensorflow.keras import backend as K
from tensorflow.keras.models import Model, load_model, save_model
from tensorflow.keras.layers import Lambda
K.set_learning_phase(1)

Instructions for updating:
Simply pass a True/False value to the `training` argument of the `__call__` method of your layer or model.


In [2]:
def abs_backend(inputs):
    return K.abs(inputs)

def expand_dim_backend(inputs):
    return K.expand_dims(inputs,-1)

def sign_backend(inputs):
    return K.sign(inputs)

def pad_backend(inputs, in_channels, out_channels):
    pad_dim = K.abs(out_channels - in_channels)//2
    inputs = K.expand_dims(inputs, -1)
    inputs = K.spatial_2d_padding(inputs, ((0,0),(pad_dim,pad_dim)), 'channels_last')
    return K.squeeze(inputs, -1)

In [3]:
data = h5py.File('./CNN_samples/test_20.mat')
train_x = np.transpose(data['train_x'])
train_y = np.transpose(data['train_y'])
test_x = np.transpose(data['test_x'])
test_y = np.transpose(data['test_y'])
print('train_x.shape', train_x.shape)
print('train_y.shape', train_y.shape)
print('test_x.shape', test_x.shape)
print('test_y.shape', test_y.shape)

index = np.arange(train_x.shape[0])
np.random.shuffle(index)
train_x = train_x[index,:]
train_y = train_y[index]

train_y -= 1
test_y -= 1
train_y = keras.utils.to_categorical(train_y, 31)
test_y = keras.utils.to_categorical(test_y, 31)

#train_x: (3000,1)
train_x = K.expand_dims(train_x, -1)
test_x = K.expand_dims(test_x, -1)
print('After shuffle train_x.shape', train_x.shape)
print('After shuffle test_x.shape', test_x.shape)
print('After shuffle train_y.shape', train_y.shape)
print('After shuffle test_y.shape', test_y.shape)

  """Entry point for launching an IPython kernel.


train_x.shape (62000, 3000)
train_y.shape (62000, 1)
test_x.shape (15500, 3000)
test_y.shape (15500, 1)
After shuffle train_x.shape (62000, 3000, 1)
After shuffle test_x.shape (15500, 3000, 1)
After shuffle train_y.shape (62000, 31)
After shuffle test_y.shape (15500, 31)


In [5]:
# Residual Shrinakge Block
def residual_shrinkage_block(incoming, nb_blocks, out_channels, downsample=False,
                             downsample_strides=2):
    
    residual = incoming
    in_channels = incoming.get_shape().as_list()[-1]
    
    residual = BatchNormalization()(residual)
    residual = Activation('relu')(residual)
    residual = Conv1D(out_channels, 3, padding='causal', dilation_rate=3, kernel_initializer='he_normal', 
                        kernel_regularizer=l2(1e-4))(residual)
    
    residual = BatchNormalization()(residual)
    residual = Activation('relu')(residual)
    residual = Conv1D(out_channels, 3, padding='causal', dilation_rate=3, kernel_initializer='he_normal', 
                          kernel_regularizer=l2(1e-4))(residual)   
    for i in range(nb_blocks):
        
        identity = residual
        print('identityfirst.shape',identity.shape)
        if not downsample:
            downsample_strides = 1
        
        residual = Conv1D(out_channels, 3, strides=downsample_strides, 
                          padding='same', kernel_initializer='he_normal', 
                          kernel_regularizer=l2(1e-4))(residual)
        
        residual = BatchNormalization()(residual)
        residual = Activation('relu')(residual)
        residual = Conv1D(out_channels, 3, padding='same', kernel_initializer='he_normal', 
                          kernel_regularizer=l2(1e-4))(residual)
        
        print('residual.shape', residual.shape)
        # Calculate global means
        residual_abs = Lambda(abs_backend)(residual)
        abs_mean = GlobalAveragePooling1D()(residual_abs)
        print('abs_mean.shape', abs_mean.shape)
        # Calculate scaling coefficients
        scales = Dense(out_channels, activation=None, kernel_initializer='he_normal', 
                       kernel_regularizer=l2(1e-4))(abs_mean)
        scales = BatchNormalization()(scales)
        scales = Activation('relu')(scales)
        scales = Dense(out_channels, activation='sigmoid', kernel_regularizer=l2(1e-4))(scales)
        #scales = Lambda(expand_dim_backend)(scales)
        print('scales.shape', scales.shape)
        # Calculate thresholds
        thres = keras.layers.multiply([abs_mean, scales])
        print('thres', thres.shape)
        # Soft thresholding
        sub = keras.layers.subtract([residual_abs, thres])
        zeros = keras.layers.subtract([sub, sub])
        n_sub = keras.layers.maximum([sub, zeros])
        residual = keras.layers.multiply([Lambda(sign_backend)(residual), n_sub])
        print('residualsign.shape',residual.shape)

        #Downsampling (it is important to use the pooL-size of (1, 1))
        if downsample_strides > 1:    # casual conv do not need downstride
            identity = AveragePooling1D(pool_size=1, strides=2)(identity)
            print('identitypool.shape',identity.shape)
            
        #Zero_padding to match channels (it is important to use zero padding rather than 1by1 convolution)
        if in_channels != out_channels:
            identity = Lambda(pad_backend, arguments={'in_channels':in_channels,'out_channels':out_channels})(identity)

        print('identity.shape',identity.shape)
        residual = keras.layers.add([residual, identity])
    
    return residual


# define and train a model
inputs = Input(shape=(3000,1))
net = Conv1D(10, 3, padding='same', kernel_initializer='he_normal', kernel_regularizer=l2(1e-4))(inputs)
net = residual_shrinkage_block(net, 5, 10, downsample=True)
#net = residual_shrinkage_block(net, 1, 10, downsample=True)
net = BatchNormalization()(net)
net = Activation('relu')(net)
net = GlobalAveragePooling1D()(net)
net = Dense(300, activation='elu', kernel_initializer='he_normal', kernel_regularizer=l2(1e-4))(net)
net = BatchNormalization()(net)
net = Dense(200, activation='elu', kernel_initializer='he_normal', kernel_regularizer=l2(1e-4))(net)
net = BatchNormalization()(net)
net = Dense(100, activation='elu', kernel_initializer='he_normal', kernel_regularizer=l2(1e-4))(net)
net = BatchNormalization()(net)
outputs = Dense(31, activation='softmax', kernel_initializer='he_normal', kernel_regularizer=l2(1e-4))(net)
model = Model(inputs=inputs, outputs=outputs)
model.compile(loss='categorical_crossentropy', optimizer=Adam(), metrics=['accuracy'])

identityfirst.shape (None, 3000, 10)
residual.shape (None, 1500, 10)
abs_mean.shape (None, 10)
scales.shape (None, 10)
thres (None, 10)
residualsign.shape (None, 1500, 10)
identitypool.shape (None, 1500, 10)
identity.shape (None, 1500, 10)
identityfirst.shape (None, 1500, 10)
residual.shape (None, 750, 10)
abs_mean.shape (None, 10)
scales.shape (None, 10)
thres (None, 10)
residualsign.shape (None, 750, 10)
identitypool.shape (None, 750, 10)
identity.shape (None, 750, 10)
identityfirst.shape (None, 750, 10)
residual.shape (None, 375, 10)
abs_mean.shape (None, 10)
scales.shape (None, 10)
thres (None, 10)
residualsign.shape (None, 375, 10)
identitypool.shape (None, 375, 10)
identity.shape (None, 375, 10)
identityfirst.shape (None, 375, 10)
residual.shape (None, 188, 10)
abs_mean.shape (None, 10)
scales.shape (None, 10)
thres (None, 10)
residualsign.shape (None, 188, 10)
identitypool.shape (None, 188, 10)
identity.shape (None, 188, 10)
identityfirst.shape (None, 188, 10)
residual.shape (No

KeyboardInterrupt: 

In [None]:
times = 10
for i in arange(1, times):
    fileName = 'test-15_5s'+ str(i)+'.mat'
    data = h6py.File('./CNN_samples/'+fileName)
    data = h5py.File('./CNN_samples/test_20.mat')
    train_x = np.transpose(data['train_x'])
    train_y = np.transpose(data['train_y'])
    test_x = np.transpose(data['test_x'])
    test_y = np.transpose(data['test_y'])
    print('train_x.shape', train_x.shape)
    print('train_y.shape', train_y.shape)
    print('test_x.shape', test_x.shape)
    print('test_y.shape', test_y.shape)

    index = np.arange(train_x.shape[0])
    np.random.shuffle(index)
    train_x = train_x[index,:]
    train_y = train_y[index]

    train_y -= 1
    test_y -= 1
    train_y = keras.utils.to_categorical(train_y, 31)
    test_y = keras.utils.to_categorical(test_y, 31)

    #train_x: (3000,1)
    train_x = K.expand_dims(train_x, -1)
    test_x = K.expand_dims(test_x, -1)
    print('After shuffle train_x.shape', train_x.shape)
    print('After shuffle test_x.shape', test_x.shape)
    print('After shuffle train_y.shape', train_y.shape)
    print('After shuffle test_y.shape', test_y.shape)

    model.fit(train_x, train_y, batch_size=128, epochs=10, verbose=1, validation_split=0.2)
    DRSN_train_score = model.evaluate(train_x, train_y, batch_size=100, verbose=0)
    print('Train loss:', DRSN_train_score[0])
    print('Train accuracy:', DRSN_train_score[1])
    print('Test loss:', DRSN_test_score[0])
    print('Test accuracy:', DRSN_test_score[1])
# get results
#K.set_learning_phase(0)


# DRSN_test_score = model.evaluate(test_x, test_y, batch_size=100, verbose=0)

model.save('./modelcheckpoint/resnet.h5')

In [None]:
model.load_model('./modelcheckpoint/resnet.h5')

0
1
2
3
4
5
6
7
8
9


In [None]:
#需要的对比实验：snr下整体的变化，每一类的识别率随snr的变化（一类到5类），混淆矩阵？
from matplotlib import pyplot as plt

#数据导入
begin_snr = 0
end_snr = 15
test_loss = []
test_acc = []
for snr in begin_snr:end_snr:
    fileName = 'test'+ str(snr)+'.mat'
    data = h5py.File('./CNN_samples/'+fileName)
    test_x = np.transpose(data['test_x'])
    test_y = np.transpose(data['test_y'])
    test_y -= 1
    test_y = keras.utils.to_categorical(test_y, 31)
    test_x = K.expand_dims(test_x, -1)
    test_score = model.evaluate(test_x, test_y, batch_size=100, verbose=0)
    test_loss.append(test_score[0])
    test_acc.append(test_score[1])
    
n = len(test_acc)
x = np.arange(n) 
plt.title("accuary") 
plt.xlabel("SNR") 
plt.ylabel("accuracy") 
plt.plot(x,y) 
plt.show()

In [1]:
#每一大类的数目
classes = 200
begin_snr = 0
end_snr = 15
test_loss1 = []
test_acc1 = []
test_loss2 = []
test_acc2 = []
test_loss3 = []
test_acc3 = []
test_loss4 = []
test_acc4 = []
test_loss5 = []
test_acc5 = []
for snr in begin_snr:end_snr:
    fileName = 'test'+ str(snr)+'.mat'
    data = h5py.File('./CNN_samples/'+fileName)
    test_x = np.transpose(data['test_x'])
    test_y = np.transpose(data['test_y'])
    test_y -= 1
    test_y = keras.utils.to_categorical(test_y, 31)
    test_x = K.expand_dims(test_x, -1)
    
    test_x1 = test_x[:5*classes, :]
    test_y1 = test_y[:5*classes, :]
    test_x2 = test_x[5*classes: 15*classes, :]
    test_y2 = test_y[5*classes: 15*classes, :]
    test_x3 = test_x[15*classes: 25*classes, :]
    test_y3 = test_y[15*classes: 25*classes, :]
    test_x4 = test_x[25*classes: 30*classes, :]
    test_y4 = test_y[25*classes: 30*classes, :]
    test_x5 = test_x[30*classes: 31*classes, :]
    test_y5 = test_y[30*classes: 31*classes, :]
    
    test_score1 = model.evaluate(test_x1, test_y1, batch_size=128, verbose=0)
    test_loss1.append(test_score1[0])
    test_acc1.append(test_score1[1])
    
    test_score2 = model.evaluate(test_x2, test_y2, batch_size=128, verbose=0)
    test_loss2.append(test_score2[0])
    test_acc2.append(test_score2[1])
    
    test_score3 = model.evaluate(test_x3, test_y3, batch_size=128, verbose=0)
    test_loss3.append(test_score3[0])
    test_acc3.append(test_score3[1])
    
    test_score4 = model.evaluate(test_x4, test_y4, batch_size=128, verbose=0)
    test_loss4.append(test_score4[0])
    test_acc4.append(test_score4[1])
    
    test_score5 = model.evaluate(test_x5, test_y5, batch_size=128, verbose=0)
    test_loss5.append(test_score5[0])
    test_acc5.append(test_score5[1])
    

x = np.range(begin_snr,end_snr)
fig,ax = plt.subplots()
ax.plot(x,test_loss1,label='loss1')
ax.plot(x,test_loss2,label='loss2')
ax.plot(x,test_loss3,label='loss3')
ax.plot(x,test_loss4,label='loss4')
ax.plot(x,test_loss5,label='loss5')
ax.set_xlabel('SNR')
ax.set_ylabel('loss')
ax.set_title('Loss')
ax.legend()

x = np.range(begin_snr,end_snr)
fig,ax = plt.subplots()
ax.plot(x,test_acc1,label='accuarcy1')
ax.plot(x,test_acc2,label='accuarcy2')
ax.plot(x,test_acc3,label='accuarcy3')
ax.plot(x,test_acc4,label='accuarcy4')
ax.plot(x,test_acc5,label='accuarcy5')
ax.set_xlabel('SNR')
ax.set_ylabel('accuarcy')
ax.set_title('accuarcy')
ax.legend()

SyntaxError: invalid syntax (<ipython-input-1-366731d62933>, line 14)