In [None]:
# from google.colab import drive
# drive.mount('/content/drive')

## scale data method--我们在论文中提出的按照七种不同缩放方法和四个不同处理方向的预处理策略

In [None]:
import numpy as np

# #######################
from sklearn.preprocessing import (
    MaxAbsScaler,
    MinMaxScaler,
    Normalizer,
    PowerTransformer,
    QuantileTransformer,
    RobustScaler,
    StandardScaler,
)


def ScaleData(train_x, scaling_method, dimension, random_seed):
    if scaling_method == "none":
        return train_x
    if np.isinf(train_x).any():
        print("Train or test set contains infinity.")
        exit(-1)
    scaling_dict = {
        "minmax": MinMaxScaler(),
        "maxabs": MaxAbsScaler(),
        "standard": StandardScaler(),
        "robust": RobustScaler(),
        "quantile": QuantileTransformer(random_state=random_seed),
        "powert": PowerTransformer(),
        "normalize": Normalizer(),
    }
    if scaling_method not in scaling_dict.keys():
        print(f"Scaling method {scaling_method} not found.")
        exit(-1)
    if dimension not in ["timesteps", "channels", "all", "both"]:
        print(f"Dimension {dimension} not found.")
        exit(-1)

    dim1 = -1
    dim2 = 1
    if scaling_method == "normalize":
        dim1 = 1
        dim2 = -1
    out_train_x = np.zeros_like(train_x, dtype=np.float64)

    train_shape = train_x.shape
    if dimension == "all":
        out_train_x = (
            scaling_dict[scaling_method]
            .fit_transform(train_x.reshape((dim1, dim2)))
            .reshape(train_shape)
        )
    else:
        if dimension == "channels":
            train_channel_shape = train_x[:, 0, :].shape
            for i in range(train_x.shape[1]):
                out_train_x[:, i, :] = (
                    scaling_dict[scaling_method]
                    .fit_transform(train_x[:, i, :].reshape((dim1, dim2)))
                    .reshape(train_channel_shape)
                )

        elif dimension == "timesteps":
            train_timest_shape = train_x[:, :, 0].shape

            for i in range(train_x.shape[2]):
                out_train_x[:, :, i] = (
                    scaling_dict[scaling_method]
                    .fit_transform(train_x[:, :, i].reshape((dim1, dim2)))
                    .reshape(train_timest_shape)
                )

        elif dimension == "both":
            train_both_shape = train_x[:, 0, 0].shape
            for i in range(train_x.shape[1]):
                for j in range(train_x.shape[2]):
                    out_train_x[:, i, j] = (
                        scaling_dict[scaling_method]
                        .fit_transform(train_x[:, i, j].reshape((dim1, dim2)))
                        .reshape(train_both_shape)
                    )

        else:
            print(f"Dimension {dimension} not found.")
            exit(-1)
    return out_train_x

## import nacessray library

In [None]:
import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
from sklearn.metrics import (
    accuracy_score,
    classification_report,
    confusion_matrix,
    plot_roc_curve,
    roc_auc_score,
    roc_curve,
)
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import MinMaxScaler

!#Importing libraries

import os

import matplotlib.pyplot as plt
import numpy as np
import pandas as pd
import tensorflow.keras as keras

## Use follow code to get ADNI Data and Label

In [None]:
import numpy as np
import scipy.io as io

y = io.loadmat("D:/机器学习前沿实验/实验课一/dataset/ADNI.mat")
##AD
AD = np.asarray(y["AD"])
AD_label = np.zeros([AD.shape[0], 1], dtype=int)
##MCI
MCI = np.asarray(y["MCI"])
MCIn = np.asarray(y["MCIn"])
MCIp = np.asarray(y["MCIp"])
MCI = np.vstack((MCI, MCIn, MCIp))
MCI_label = np.ones([MCI.shape[0], 1], dtype=int)
##NC
NC = np.asarray(y["NC"])
NC_label = np.full((NC.shape[0], 1), 2, dtype=int)

##合并:
print(AD_label.shape)
# print(EMCI_lable.shape)
# print(LMCI_lable.shape)
print(MCI_label.shape)
print(NC_label.shape)


Data = np.vstack((AD, MCI, NC))
Label = np.vstack((AD_label, MCI_label, NC_label))

print(Data.shape)
print(Label.shape)

## Use follow code to get PPMI Data and Label

In [None]:
# import scipy.io as io
# import numpy as np
# y=io.loadmat("D:/机器学习前沿实验/实验课一/dataset/PPMI.mat")
# NC=np.asarray(y['NC'])
# NC_label=np.full((NC.shape[0],1),0,dtype=int)
# PD=np.asarray(y['PD'])
# PD_label=np.full((PD.shape[0],1),1,dtype=int)


# Label_1=np.vstack((NC_label,PD_label))
# Data=np.vstack((NC,PD))
# Label=Label_1.reshape(Label_1.shape[0])

# print(Data.shape)
# print(Label.shape)

## transfer Data to three dimension so that we can use scale data

In [None]:
Data = np.expand_dims(Data, axis=-1)
Data = Data.transpose(0, 2, 1)
Data.shape, Label.shape


##转换标签以适应神经网络结构
Label = keras.utils.to_categorical(Label)

## construct the fcn neural net work

In [None]:
class Classifier_FCN:
    def __init__(self, output_directory, input_shape, nb_classes, verbose=False, build=True):
        self.output_directory = output_directory
        if build == True:
            self.model = self.build_model(input_shape, nb_classes)
            if verbose == True:
                self.model.summary()
            self.verbose = verbose
            self.model.save_weights(self.output_directory + "ECG_FCN_model_init.hdf5")
        return

    def build_model(self, input_shape, nb_classes):
        input_layer = keras.layers.Input(input_shape)

        conv1 = keras.layers.Conv1D(filters=128, kernel_size=8, padding="same")(input_layer)
        conv1 = keras.layers.BatchNormalization()(conv1)
        conv1 = keras.layers.Activation(activation="relu")(conv1)

        conv2 = keras.layers.Conv1D(filters=256, kernel_size=5, padding="same")(conv1)
        conv2 = keras.layers.BatchNormalization()(conv2)
        conv2 = keras.layers.Activation("relu")(conv2)

        conv3 = keras.layers.Conv1D(128, kernel_size=3, padding="same")(conv2)
        conv3 = keras.layers.BatchNormalization()(conv3)
        conv3 = keras.layers.Activation("relu")(conv3)

        gap_layer = keras.layers.GlobalAveragePooling1D()(conv3)

        output_layer = keras.layers.Dense(nb_classes, activation="softmax")(gap_layer)

        model = keras.models.Model(inputs=input_layer, outputs=output_layer)

        model.compile(
            loss="categorical_crossentropy",
            optimizer=keras.optimizers.Adam(),
            metrics=["accuracy"],
        )

        reduce_lr = keras.callbacks.ReduceLROnPlateau(
            monitor="lr", factor=0.5, patience=50, min_lr=0.0001
        )

        file_path = self.output_directory + "ECG_best_FCN_model.hdf5"

        model_checkpoint = keras.callbacks.ModelCheckpoint(
            filepath=file_path, monitor="loss", save_best_only=True
        )

        # early_stopping = keras.callbacks.EarlyStopping(monitor='val_accuracy', patience=10, baseline = 0.99)

        self.callbacks = [model_checkpoint]  # , early_stopping] reduce_lr,

        return model

    def fit(self, x_train, y_train, x_val, y_val, batch_size, nb_epochs):

        mini_batch_size = int(min(x_train.shape[0] / 10, batch_size))
        hist = self.model.fit(
            x_train,
            y_train,
            batch_size=mini_batch_size,
            epochs=nb_epochs,
            verbose=self.verbose,
            validation_data=(x_val, y_val),
            callbacks=self.callbacks,
        )

        self.model.save(self.output_directory + "last_FCN_model.hdf5")

        model = keras.models.load_model(self.output_directory + "best_FCN_model.hdf5")

        y_pred = model.predict(x_val)

        # convert the predicted from binary to integer
        y_pred = np.argmax(y_pred, axis=1)

        keras.backend.clear_session()

    def predict(self, x_train, y_train, x_test, y_test):
        model_path = self.output_directory + "best_FCN_model.hdf5"
        model = keras.models.load_model(model_path)
        y_pred_train = model.predict(x_train, verbose=1)
        y_pred_test = model.predict(x_test, verbose=1)

        return y_pred_train, y_pred_test

## 五折交叉验证，利用注释处不同代码获得不同效果/不同数据集/是否使用scaledata策略，scaledata策略参数等等

In [None]:
from sklearn import metrics
from sklearn.model_selection import KFold

kf = KFold(n_splits=5, shuffle=True, random_state=100)  # 5折交叉验证

i = 1
a = []
X_1 = ScaleData(Data, "quantile", "all", 100)  # 使用scaledata策略
# X_1=Data   #不使用scaledata策略
for train_index, test_index in kf.split(X_1, Label):
    print("\n{} of kfold {}".format(i, kf.n_splits))
    X_train, X_test = X_1[train_index], X_1[test_index]
    y_train, y_test = Label[train_index], Label[test_index]
    fcn_classifier = Classifier_FCN(
        os.getcwd(), X_train.shape[1:], nb_classes=y_train.shape[1], verbose=True
    )
    fcn_classifier.fit(X_train, y_train, X_test, y_test, 16, 3000)
    i += 1
    best_val_accuracy = np.asarray(fcn_classifier.model.history.history["val_accuracy"])
    print("best performance of this circle:)", best_val_accuracy.max())
    a.append(best_val_accuracy.max())
#     model.save(f"1dConv-Fold{i}")

## 打印输出五折交叉验证结果

In [None]:
b = np.asarray(a)
print("五折交叉验证结果为:", b.mean())