In [None]:
import keras
import random
import tensorflow as tf
import numpy as np
from tensorflow.keras import layers            
from tensorflow.keras.layers import AveragePooling2D,Dense,Dropout
from tensorflow.keras.models import Model
from tensorflow.keras import regularizers
from tensorflow.keras import backend as K
from sklearn.model_selection import KFold
from tensorflow.keras.callbacks import EarlyStopping
from tensorflow.keras import layers,optimizers,losses
from sklearn.metrics import accuracy_score,precision_score,recall_score,f1_score
from keras.layers import Activation
from tensorflow.keras.layers import Input,Flatten,LSTM
from tensorflow.keras.layers import Embedding, Layer 

In [None]:
# 设置GPU使用方式
# 获取GPU列表
gpus = tf.config.experimental.list_physical_devices('GPU')
if gpus:
    try:
        # 设置GPU为增长式占用
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu,True)
    except RuntimeError as e:
        #打印异常
        print(e)

In [None]:
batchsz = 16
data_all=np.load('F:/An/脑电/数据处理/epoch/训练改/xunlian.npy')
label_all=np.concatenate((np.zeros((4549,1),dtype=int),np.ones((4788,1),dtype=int)),axis=0)
label_all=np.eye(2)[label_all]
label_all=np.squeeze(label_all)
n=len(data_all)
A = np.linspace(0,n-1,n,dtype=int)
random.shuffle(A)
data_all=data_all[A]
label_all=label_all[A]
kfold=KFold(n_splits=10, shuffle=True, random_state=1)
historys,test_pred,test_real,accuracy,precision,recall,f1score=list(),list(),list(),list(),list(),list(),list()

In [None]:
class TimeEmbeddingLayer(Layer):
    def __init__(self, num_channels, sequence_length, output_dim, **kwargs):
        super(TimeEmbeddingLayer, self).__init__(**kwargs)
        self.num_channels = num_channels
        self.sequence_length = sequence_length
        self.output_dim = output_dim
        self.time_embedding_layer = Embedding(input_dim=sequence_length, output_dim=output_dim)

    def call(self, inputs):
        position = tf.range(self.sequence_length)
        embed = self.time_embedding_layer(position)
        embed = tf.reshape(embed, (1, 1, self.sequence_length, self.output_dim))
        return inputs + embed
    
    def get_config(self):
        config = super(TimeEmbeddingLayer, self).get_config()
        config.update({
            "num_channels": self.num_channels,
            "sequence_length": self.sequence_length,
            "output_dim": self.output_dim
        })
        return config

class ChannelEmbeddingLayer(Layer):
    def __init__(self, num_channels, sequence_length, output_dim, **kwargs):
        super(ChannelEmbeddingLayer, self).__init__(**kwargs)
        self.num_channels = num_channels
        self.sequence_length = sequence_length
        self.output_dim = output_dim
        self.channel_embedding_layer = Embedding(input_dim=num_channels, output_dim=output_dim)
        
    def call(self, inputs):
        position = tf.range(self.num_channels)
        embed = self.channel_embedding_layer(position)
        embed = tf.reshape(embed, (1, self.num_channels, 1, self.output_dim))
        return inputs + embed
    
    def get_config(self):
        config = super(ChannelEmbeddingLayer, self).get_config()
        config.update({
            "num_channels": self.num_channels,
            "sequence_length": self.sequence_length,
            "output_dim": self.output_dim
        })
        return config

def mymodel1():
    input = layers.Input(shape=(30,36,40))
    x=TimeEmbeddingLayer(num_channels=30, sequence_length=36, output_dim=40)(input)
    x=layers.Conv2D(40,kernel_size=(30,1),strides=(1,1))(x)
    x=layers.BatchNormalization()(x)
    x=layers.ReLU()(x)
    x=layers.Reshape((x.shape[2],x.shape[3]))(x)
    attn_input=x
    x=layers.MultiHeadAttention(num_heads=8,key_dim=40)(x,x)
    x=layers.Add()([x,attn_input])
    x=layers.LSTM(units=100, return_sequences=True)(x)
    model = Model(inputs=input, outputs=x)
    return model

def mymodel2():
    input = layers.Input(shape=(30,36,40))
    x=ChannelEmbeddingLayer(num_channels=30, sequence_length=36, output_dim=40)(input)
    x=layers.Conv2D(40, kernel_size=(1,36), strides=(1, 1))(x)
    x=layers.BatchNormalization()(x)
    x=layers.ReLU()(x)
    x=layers.Reshape((x.shape[1],x.shape[3]))(x)
    attn_input=x
    x=layers.MultiHeadAttention(num_heads=8,key_dim=40)(x,x)
    x=layers.Add()([x,attn_input])
    x=layers.LSTM(units=100, return_sequences=True)(x)
    model = Model(inputs=input, outputs=x)
    return model

def siamese_network(inp_shape=(30,200,1)):
    inp=Input(shape=inp_shape)
    x=layers.Conv2D(40,kernel_size=(1,15),strides=(1,1),
                      name='conv_1d_temporal',
                      kernel_regularizer=regularizers.l2(0.01),
                      bias_regularizer=regularizers.l2(0.01)
                      )(inp)
    x=layers.BatchNormalization()(x)
    x=layers.ReLU()(x)
    x=AveragePooling2D(pool_size=(1,10),strides=(1,5))(x)
    out1=mymodel1()(x)
    out2=mymodel2()(x)
    merged=tf.keras.layers.concatenate([out1,out2],axis=1)
    x=layers.Flatten()(merged)
    x=layers.Dense(100, activation="relu")(x)
    x=layers.Dropout(0.5)(x)
    x=layers.Dense(50, activation="relu")(x)
    model = Model(inputs=inp, outputs=x)
    return model

def mymodel(dropoutRate=0.5):
    inp_shape=(2,30,200,1)
    inp = layers.Input(shape=inp_shape)
    split1,split2=tf.unstack(inp,axis=1)
    siamese=siamese_network(inp_shape=(30, 200, 1))
    feature1=siamese(split1)
    feature2=siamese(split2)
    distance = tf.sqrt(tf.reduce_sum(tf.square(feature1 - feature2), axis=1, keepdims=True))
    distance_output = layers.Lambda(lambda x: x, name='distance_output')(distance)
    concat_features = layers.Concatenate()([feature1, feature2, distance])
    x=layers.Dense(100, activation='relu')(concat_features)
    x=layers.Dropout(dropoutRate)(x)
    x=layers.Dense(50, activation='relu')(x)
    classification_output = layers.Dense(2, activation='softmax', name='classification_output')(x)
    model = keras.models.Model(inputs=inp, outputs=[distance_output, classification_output])
    return model

In [None]:
def contrastive_loss(margin=2.0):
    def loss_fn(y_true, y_pred):
        label = tf.cast(tf.argmax(y_true, axis=1), tf.float32)
        same_class_loss = label * tf.square(y_pred)
        diff_class_loss = (1 - label) * tf.square(tf.maximum(margin - y_pred, 0))
        return tf.reduce_mean(same_class_loss + diff_class_loss)
    return loss_fn

# 创建十折交叉验证中显示第几折的指示变量
ind_fold=0
# 进行十折交叉验证
for train_ind,test_ind in kfold.split(data_all,label_all):
    ind_fold=ind_fold+1
    print('fold hao:',ind_fold)
    n=len(train_ind)
    A=np.linspace(0,n-1,n,dtype=int)
    random.shuffle(A)
    # 构建训练集、验证集、测试集
    epoch_train=data_all[train_ind[A[:int(0.8*n)]]]
    epoch_val=data_all[train_ind[A[int(0.8*n):]]]
    epoch_test=data_all[test_ind]
    label_train=label_all[train_ind[A[:int(0.8*n)]]]
    label_val=label_all[train_ind[A[int(0.8*n):]]]
    label_test=label_all[test_ind]
    print(epoch_train.shape)
    print(label_train.shape)
    label_train = tf.cast(label_train, tf.float32)
    label_val = tf.cast(label_val, tf.float32)
    label_test = tf.cast(label_test, tf.float32)

    db_train=tf.data.Dataset.from_tensor_slices((
        epoch_train,
        {
            'distance_output': label_train,
            'classification_output': label_train
        }
    ))
    db_val=tf.data.Dataset.from_tensor_slices((
        epoch_val,
        {
            'distance_output': label_val,
            'classification_output': label_val
        }
    ))
    db_train=db_train.shuffle(1000).batch(batchsz)
    db_val=db_val.shuffle(1000).batch(batchsz)
    model=mymodel(nb_classes=2,dropoutRate=0.3)  
    model.summary()
    early_stopping = EarlyStopping(
        monitor='val_classification_output_accuracy',
        min_delta=0.001,
        patience=10,
        restore_best_weights=True
    )
    reduce_lr = keras.callbacks.ReduceLROnPlateau(
        monitor='val_classification_output_accuracy',
        factor=0.5,
        patience=5,
        min_lr=1e-6
    )
    model.compile(optimizer=optimizers.Adam(lr=1e-4),
                   loss={
                       'distance_output': contrastive_loss(margin=2.0),
                       'classification_output': 'categorical_crossentropy'
                   },
                   loss_weights={
                       'distance_output': 0.3,  # 对比损失权重
                       'classification_output': 1.0  # 分类损失权重
                   },
                   metrics={
                       'classification_output': 'accuracy'
                   })
    
    print('开始训练!!') 
    history=model.fit(db_train, 
                     validation_data=db_val,
                     validation_freq=1,  
                     shuffle=True, 
                     epochs=100,
                     callbacks=[early_stopping, reduce_lr])
    
    history = history.history
    historys.append(history)
    [_, pred_probs] = model.predict(epoch_test)
    pred_test = np.argmax(pred_probs, axis=1)
    label_test_indices = np.argmax(label_test, axis=1)
    test_pred.append(pred_test)
    test_real.append(label_test_indices)
    
    # 计算准确率，精确率，召回率，f1评分
    acc=accuracy_score(label_test_indices, pred_test)
    pre=precision_score(label_test_indices, pred_test, average='macro')
    rec=recall_score(label_test_indices, pred_test, average='macro')
    f1=f1_score(label_test_indices, pred_test, average='macro')
    accuracy.append(acc)
    precision.append(pre)
    recall.append(rec)
    f1score.append(f1)
    print(f"$$ 测试集准确率为 accuracy:{acc}")
    print(f"$$ 测试集精确率为 precision:{pre}")
    print(f"$$ 测试集召回率为 recall:{rec}")
    print(f"$$ 测试集f1评分为 f1_score:{f1}")
# 将每一折history中误差结果保存（训练集和测试集，用于反映训练过程）    
loss_train=[]
loss_val=[]
for history_s in historys:
    loss_val.append(history_s['val_loss'])
    loss_train.append(history_s['loss'])

print(f"$$ 测试集准确率为 accuracy:{np.mean(accuracy)}")
print(f"$$ 测试集精确率为 precision:{np.mean(precision)}")
print(f"$$ 测试集召回率为 recall:{np.mean(recall)}")
print(f"$$ 测试集f1评分为 f1_score:{np.mean(f1score)}")