Separate the dataset

In [None]:
import os
import numpy as np
import pandas as pd
from scipy.stats import zscore
import dimCols

dataDirDev = "." + os.sep + "dataPackage"
featDirDev = "." + os.sep + "data_feats" + os.sep + "combined"
modelDirDev = "." + os.sep + "model_eval"

expType = "task-ils"
dataDir = dataDirDev + os.path.sep + expType
featFiles = [f.path for f in os.scandir(featDirDev) if (f.is_file() and not f.name.startswith('.'))]
column_names = ['data', 'difficulty', 'flag', 'cumulative_total_error','fd']
df = pd.DataFrame(columns=column_names)
perfMetricsDF = pd.read_csv(dataDirDev + os.path.sep + expType + os.path.sep + "PerfMetrics.csv")

# 频域特征
featDirDev = "." + os.sep + "data_feats"
expType = "task-ils"
aggFeatFilePath = featDirDev + os.path.sep + 'devSubjsFeatMat.csv'
featCols = dimCols.featCols
dim_16 = pd.read_csv(aggFeatFilePath)
dim_16.columns = [col.strip() for col in dim_16.columns]
for col in dim_16.columns:
    if pd.api.types.is_string_dtype(dim_16[col]):  # 检查列是否为字符串类型
        dim_16[col] = dim_16[col].str.strip()  # 消除字符串前后的空格
# Percent of subjects to hold out for validation
dim_16[featCols] = zscore(dim_16[featCols])

inclDiffLevels = [1,4]
for featFile in featFiles:
    name_list = featFile.split("_")
    level = int(name_list[3].split("-")[1][-2])
    sub = int(name_list[1].split("\\")[2].split("-")[1][-3:])
    date = name_list[2].split("-")[1]
    run = int(name_list[3].split("-")[3])
    pdf = perfMetricsDF.query(f'subject == {sub} and date == {date} and run == {run}')
    subject = name_list[1].split("\\")[2]
    session = name_list[2]
    run_1 = name_list[3]
    # 替换 '_run' 为 '-run'
    run_1 = run_1.replace('-run', '_run')
    fd = dim_16.query(f'Subject == "{subject}" and Session == "{session}" and Run == "{run_1}"')
    if level not in inclDiffLevels:
        continue
    feat = pd.read_csv(featFile)
    new_row = {
        'data': feat,
        'difficulty': level,  # 将 level 添加到 'difficulty' 列
        'flag': featFile,  # 将文件名添加到 'flag' 列
        'cumulative_total_error': pdf['cumulative_total_error'],
        'fd': fd[featCols].values
    }
    new_index = len(df.index)
    df.loc[new_index] = new_row
numDirs = len(df)
featCols = dimCols.featCols_combin
# 假设我们要打印 0 到 99 的数字

    # 划分数据集 测试25 训练75
pctHoldout = 25
numDirs = len(df)
numVal = int(numDirs * float(pctHoldout) / 100)
numTrain = numDirs - numVal
index_array = np.arange(len(df))
np.random.shuffle(index_array)
df = df.iloc[index_array]
trainDF = df[:numTrain]
valDF = df[numTrain:]

training LSTM model

In [ ]:
def trainDiffPredictionModel_LSTM(trainDF, valDF, inclDiffLevels, featCols):
    # 准备训练数据
    x_train_list, train_steps = scaler.get_lstm_data_standard(trainDF, featCols)
    y_train = trainDF['difficulty'].values
    y_train[y_train == 4] = 0
    x_train_ragged = tf.ragged.constant(x_train_list, dtype=np.float32)

    # 准备验证数据
    x_valid_list, valid_steps = scaler.get_lstm_data_standard(valDF, featCols)
    y_valid = valDF['difficulty'].values
    y_valid[y_valid == 4] = 0
    x_valid_ragged = tf.ragged.constant(x_valid_list, dtype=np.float32)

    # 构建模型
    model = Sequential()
    # 设置 LSTM 层的 input_shape 以接受任意长度的序列
    model.add(LSTM(128, activation='tanh', return_sequences=False, input_shape=(None, len(featCols))))
    # 使用Dropout来减少过拟合
    model.add(Dropout(0.5))
    # 添加一个Dense层用于最终预测，使用sigmoid激活函数适用于二分类问题
    model.add(Dense(1, activation='sigmoid', kernel_regularizer='l2'))

    # 编译模型  'categorical_crossentropy'多分类 'binary_crossentropy'
    optimizer = Adam(learning_rate=0.0003, clipnorm=1.0)

    model.compile(optimizer=optimizer, loss='binary_crossentropy', metrics=['acc'])
    early_stopping = EarlyStopping(monitor='loss', patience=3, restore_best_weights=True)
    # 训练模型
    history = model.fit(x_train_ragged, y_train, epochs=20, batch_size=16, verbose=1, callbacks=[early_stopping],
                        validation_data=(x_valid_ragged, y_valid))
    plothisAccuracy(history)
    # 返回模型
    return model

def plothisAccuracy(history):
    plt.figure(figsize=(12, 6))

    # 绘制损失曲线
    plt.subplot(1, 2, 1)
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.title('Loss Over Epochs')
    plt.xlabel('Epochs')
    plt.ylabel('Loss')
    plt.legend()

    # 绘制准确度曲线
    plt.subplot(1, 2, 2)
    plt.plot(history.history['acc'], label='Training Accuracy')
    plt.plot(history.history['val_acc'], label='Validation Accuracy')
    plt.title('Accuracy Over Epochs')
    plt.xlabel('Epochs')
    plt.ylabel('Accuracy')
    plt.legend()

    plt.tight_layout()
    plt.show()


test and verify

In [ ]:
import tensorflow as tf

def validateDiffPredictionModel_lstm(valMatDF, inclDiffLevels, featCols, clf):
    y_actual = valMatDF['difficulty'].values
    y_actual[y_actual == 4] = 0
    x_list, steps = scaler.get_lstm_data_standard(valMatDF, featCols)

    x_ragged = tf.ragged.constant(x_list, dtype=np.float32)
    y_p = clf.predict(x_ragged)
    print("combined2  y_p = " + str(y_p))
    y_p = y_p[:, 0]
    optimal_threshold_1 = 0.5
    y_pred = np.where(y_p > optimal_threshold_1, 1, 0)
    my_plot("combined2", y_pred, y_actual, inclDiffLevels)
    roc_auc(y_actual, y_p)
    
def my_plot(method, y_pred, y_actual, inclDiffLevels):
    print(method + " Actual = " + str(y_actual))
    print(method + " Predicted = " + str(y_pred))
    # print(method + " Model's Confidence Probabilities = " + str(probas))
    precision = None
    recall = None
    f1 = None
    if len(inclDiffLevels) > 2:
        precision = precision_score(y_actual, y_pred, average='macro')
        recall = recall_score(y_actual, y_pred, average='macro')
        f1 = f1_score(y_actual, y_pred, average='macro')
    else:
        precision = precision_score(y_actual, y_pred)
        recall = recall_score(y_actual, y_pred)
        f1 = f1_score(y_actual, y_pred)

    print(method + f" 精确值: {precision:.4f}")  # 添加小数点后四位的格式化
    print(method + f" 召回率: {recall:.4f}")
    print(method + f" F1分数: {f1:.4f}")


def roc_auc(y_actual, y_p):
    fpr, tpr, thresholds = roc_curve(y_actual, y_p)
    roc_auc = auc(fpr, tpr)
    # 绘制ROC曲线
    roc_auc = auc(fpr, tpr)
    plt.figure(figsize=(5, 5))
    plt.plot(fpr, tpr, lw=2)
    plt.xlim([-0.05, 1.05])
    plt.ylim([-0.05, 1.05])
    plt.xlabel('False Positive Rate')
    plt.ylabel('True Positive Rate')
    plt.title('Multimodal Data Fusion LSTM \n ROC Curve for Validation Set. AUC = ' + str(roc_auc))
    plt.show()