In [None]:
import tensorflow.keras as keras
from tensorflow.keras.models import Model
from tensorflow.keras import layers
from tensorflow.keras.regularizers import l1_l2
from tensorflow.keras.constraints import max_norm
from tensorflow.keras import backend as K
import tensorflow.keras.models as models
import tensorflow.compat.v1 as tf
import numpy as np
from tslearn.metrics import soft_dtw
import os
from tensorflow.keras.models import Model
from tensorflow.keras.layers import Dense, Activation, Permute, Dropout, Input
from tensorflow.keras.layers import Conv2D, MaxPooling2D, AveragePooling2D
from tensorflow.keras.layers import SeparableConv2D, DepthwiseConv2D
from tensorflow.keras.layers import BatchNormalization
from tensorflow.keras.layers import SpatialDropout2D
from tensorflow.keras.regularizers import l1_l2
from tensorflow.keras.layers import Input, Flatten
from tensorflow.keras.constraints import max_norm
from tensorflow.keras import backend as K
from scipy.signal import butter, filtfilt
from sklearn.model_selection import KFold
from scipy import io, signal
import math
import numpy as np
os.environ["CUDA_VISIBLE_DEVICES"] = "0"

In [None]:
def buffer(data, duration):
    number_segments = int(len(data) / duration)
    temp_buf = [data[i:i + duration] for i in range(0, len(data), duration)]
    segmented_data = np.vstack(temp_buf[0:number_segments])
    return segmented_data


def get_segmented_data(data, window_len, sample_rate):
    num_classes = data.shape[0]
    num_chan = data.shape[1]
    num_trials = data.shape[3]

    duration = int(window_len * sample_rate)

    number_segments = int(data.shape[2] / duration)
    
    segmented_data = np.zeros((data.shape[0], data.shape[1],
                               data.shape[3], number_segments, duration))

    for target in range(0, num_classes):
        for channel in range(0, num_chan):
            for trial in range(0, num_trials):
                segmented_data[target, channel, trial, :, :] = buffer(data[target, channel, :, trial],
                                                                      duration)

    return segmented_data



def get_filtered_eeg(eeg, lowcut, highcut, order, sample_rate):
    '''
    Returns bandpass filtered eeg for all channels and trials.

    Args:
        eeg (numpy.ndarray): raw eeg data of shape (num_classes, num_channels, num_samples, num_trials).
        lowcut (float): lower cutoff frequency (Hz).
        highcut (float): lower cutoff frequency (Hz).
        order (int): order of the bandpass filter.
        sample_rate (float): sampling rate (Hz).

    Returns:
        (numpy.ndarray): bandpass filtered eeg of shape (num_classes, num_channels, num_samples, num_trials).
    '''
    
    num_classes = eeg.shape[0]
    num_chan = eeg.shape[1]
    total_trial_len = eeg.shape[2]
    num_trials = eeg.shape[3]
    
    filtered_data = np.zeros((eeg.shape[0], eeg.shape[1], eeg.shape[2], eeg.shape[3]))

    for target in range(0, num_classes):
        for channel in range(0, num_chan):
            for trial in range(0, num_trials):
                signal_to_filter = np.squeeze(eeg[target, channel, :, 
                                               trial])
                filtered_data[target, channel, :, trial] = butter_bandpass_filter(signal_to_filter, lowcut, 
                                                                                  highcut, sample_rate, order)
    return filtered_data

def butter_bandpass_filter(data, lowcut, highcut, sample_rate, order):
    '''
    Returns bandpass filtered data between the frequency ranges specified in the input.

    Args:
        data (numpy.ndarray): array of samples. 
        lowcut (float): lower cutoff frequency (Hz).
        highcut (float): lower cutoff frequency (Hz).
        sample_rate (float): sampling rate (Hz).
        order (int): order of the bandpass filter.

    Returns:
        (numpy.ndarray): bandpass filtered data.
    '''
    
    nyq = 0.5 * sample_rate
    low = lowcut / nyq
    high = highcut / nyq
    b, a = butter(order, [low, high], btype='band')
    y = filtfilt(b, a, data)
    return y

In [None]:
def EEGNet(input_shape):
    model = models.Sequential()
    model.add(layers.Conv2D(72, kernel_size=(1, 250), 
                     kernel_regularizer=keras.regularizers.l2(0.0001), 
                     kernel_initializer=keras.initializers.RandomNormal(mean=0.0, stddev=0.01, seed=None),
                     use_bias=False,
                     input_shape=input_shape, 
                     padding="same"))
    model.add(layers.BatchNormalization())
    
    model.add(layers.DepthwiseConv2D((8, 1), 
                                     kernel_regularizer=keras.regularizers.l2(0.0001), 
                                     kernel_initializer=keras.initializers.RandomNormal(mean=0.0, stddev=0.01, seed=None),
                                     depth_multiplier=1, depthwise_constraint=max_norm(1.),  use_bias=False))
    model.add(layers.BatchNormalization())
    model.add(layers.Activation('elu'))
    model.add(layers.AvgPool2D(pool_size=(1, 4), padding="same"))
    model.add(layers.Dropout(0.5))  
    
    
    model.add(layers.SeparableConv2D(filters=72,
                                     kernel_regularizer=keras.regularizers.l2(0.0001),
                                     kernel_initializer=keras.initializers.RandomNormal(mean=0.0, stddev=0.01, seed=None),
                                     kernel_size=(1, 32), padding="same", use_bias=False))
    model.add(layers.BatchNormalization())
    model.add(layers.Activation(activation="elu"))
    model.add(layers.AvgPool2D(pool_size=(1, 8), padding="same"))
    model.add(layers.Dropout(0.5))
    model.add(layers.Flatten())
    model.add(layers.Dense(9,  activation="sigmoid"))
    return model



In [None]:
all_acc_1 = np.zeros((10, 1))
all_acc_fusion = np.zeros((10, 1))

for n in range(1, 1):
    print(f"subject{n} is training......." , flush=True)
    data = io.loadmat(f'./SSVEP数据集/9目标数据集/S{n}.mat')['eeg']
    data = data[:, :, :]
    data = get_segmented_data(data, window_length, 250)
    print(data.shape)
    cv_acc_1 = np.zeros((15, 1))
    cv_acc_fusion = np.zeros((20, 1))
    for block in range(0, 20):
        train_i = []
        for k in range(15):
            if k != block:
                train_i.append(k)
        test_i = [block]

        train_data = data[:, :, train_i, :, :]
        targets = train_data.shape[0]
        channels = train_data.shape[1]
        trails = train_data.shape[2]
        segments = train_data.shape[3]
        samples = train_data.shape[4]
        train_data = train_data.reshape(targets, channels, trails*segments, samples)
        train_data = train_data.transpose(0, 1, 3, 2)

        train_data_1 = get_filtered_eeg(train_data, 8, 55, 2, 250)
        temp = train_data_1[:, :, :, 0]
        for i in range(1, trails*segments):
            temp = np.append(temp, train_data_1[:, :, :, i], axis=0)
        train_data_1 = temp
        train_data_1 = np.expand_dims(train_data_1, 3)

        train_data_2 = get_filtered_eeg(train_data, 16, 55, 2, 250)
        temp = train_data_2[:, :, :, 0]
        for i in range(1, trails*segments):
            temp = np.append(temp, train_data_2[:, :, :, i], axis=0)
        train_data_2 = temp
        train_data_2 = np.expand_dims(train_data_2, 3)


        train_data_3 = get_filtered_eeg(train_data, 24, 55, 2, 250)
        temp = train_data_3[:, :, :, 0]
        for i in range(1, trails*segments):
            temp = np.append(temp, train_data_3[:, :, :, i], axis=0)
        train_data_3 = temp
        train_data_3 = np.expand_dims(train_data_3, 3)

        label = np.arange(12)
        label = np.tile(label, trails*segments)
        label = keras.utils.to_categorical(label)  
        for l in label:
            for j in range(12):
                if l[j] == 0:
                    l[j] = 0.25
                else:
                    l[j] = 1
        fusion_label = np.arange(12)
        fusion_label = np.tile(fusion_label, trails*segments)
        fusion_label = keras.utils.to_categorical(fusion_label)
        
        
        test_data = data[:, :, test_i, :, :]
        targets = test_data.shape[0]
        channels = test_data.shape[1]
        trails = test_data.shape[2]
        segments = test_data.shape[3]
        samples = test_data.shape[4]

        test_data = test_data.reshape(targets, channels, trails*segments, samples)
        test_data = test_data.transpose(0, 1, 3, 2)

        test_label = np.arange(12)
        test_label = np.tile(test_label, trails*segments)
        test_label = keras.utils.to_categorical(test_label)  

        test_data_1 = get_filtered_eeg(test_data, 8, 55, 2, 250)
        temp = test_data_1[:, :, :, 0]
        for i in range(1, trails*segments):
            temp = np.append(temp, test_data_1[:, :, :, i], axis=0)
        test_data_1 = temp
        test_data_1 = np.expand_dims(test_data_1, 3)

        test_data_2 = get_filtered_eeg(test_data, 16, 55, 2, 250)
        temp = test_data_2[:, :, :, 0]
        for i in range(1, trails*segments):
            temp = np.append(temp, test_data_2[:, :, :, i], axis=0)
        test_data_2 = temp
        test_data_2 = np.expand_dims(test_data_2, 3)

        test_data_3 = get_filtered_eeg(test_data, 24, 55, 2, 250)
        temp = test_data_3[:, :, :, 0]
        for i in range(1, trails*segments):
            temp = np.append(temp, test_data_3[:, :, :, i], axis=0)
        test_data_3 = temp
        test_data_3 = np.expand_dims(test_data_3, 3)


        model_1 = EEGNet(train_data_1.shape[1:])
        model_2 = EEGNet(train_data_2.shape[1:])
        model_3 = EEGNet(train_data_3.shape[1:])

        opt = keras.optimizers.Adam(learning_rate = 0.0005)

        model_1.compile(loss='categorical_crossentropy', optimizer = "Adam", metrics = ["accuracy"])
        model_1.fit(train_data_1, label, batch_size=64, epochs=500, shuffle=True, verbose=1, validation_data=(test_data_1, test_label))
        score_1 = model_1.evaluate(test_data_1, test_label, verbose=0)
        print(f"EEGNET1 acc: {score_1[1]}", flush=True)
        cv_acc_1[block-1, :] = score_1[1]
        model_2.compile(loss='categorical_crossentropy', optimizer = "Adam", metrics = ["accuracy"])
        model_2.fit(train_data_2, label, batch_size=64, epochs=500, shuffle=True, verbose=1, validation_data=(test_data_2, test_label))
        score_2 = model_2.evaluate(test_data_2, test_label, verbose=0)
        print(f"EEGNET2 acc: {score_2[1]}", flush=True)    


        model_3.compile(loss='categorical_crossentropy', optimizer = "Adam", metrics = ["accuracy"])
        model_3.fit(train_data_3, label, batch_size=64, epochs=500, shuffle=True, verbose=1, validation_data=(test_data_3, test_label))
        score_3 = model_3.evaluate(test_data_3, test_label, verbose=0)
        print(f"EEGNET3 acc: {score_3[1]}", flush=True)

        model_1.trainable = False
        model_2.trainable = False
        model_3.trainable = False

        features = layers.concatenate([model_1.layers[-2].output, model_2.layers[-2].output, model_3.layers[-2].output])
        features = layers.Dense(12, activation="softmax")(features)
        
        fusion_model = models.Model([model_1.input, model_2.input, model_3.input], features)

        fusion_model.compile(loss='categorical_crossentropy', optimizer = "Adam", metrics = ["accuracy"])
        fusion_model.fit([train_data_1, train_data_2, train_data_3], fusion_label, batch_size=64, epochs=300, shuffle=True, verbose=1, validation_data=([test_data_1, test_data_2, test_data_3], test_label))
        score_fusion = fusion_model.evaluate([test_data_1, test_data_2, test_data_3], test_label, verbose=0)
        print(f"FB-EEGNET acc: {score_fusion[1]}", flush=True)
        cv_acc_fusion[block-1, :] = score_fusion[1]
    print(f"subject{n} EEGNet acc: {np.mean(cv_acc_1)}", flush=True)
    print(f"subject{n} FB-EEGNet acc: {np.mean(cv_acc_fusion)}", flush=True)
    all_acc_1[n-1, :] = np.mean(cv_acc_1)
    all_acc_fusion[n-1, :] = np.mean(cv_acc_fusion)
print(f"EEGNet all-subject mean acc: {np.mean(all_acc_1)}", flush=True)
print(f"FB-EEGNet all-subject mean acc: {np.mean(all_acc_fusion)}", flush=True)