读取数据

In [17]:
import numpy as np
import pandas as pd
from sklearn.metrics import accuracy_score, precision_score, recall_score, roc_auc_score, f1_score, roc_curve, auc
import matplotlib.pyplot as plt
from keras.models import Sequential
import tensorflow as tf
from keras.layers import Embedding, LSTM, Dense, Attention,Input
from keras.src.callbacks import LambdaCallback
import torch
from keras.models import Model

In [2]:
from imblearn.over_sampling import SMOTE

#不均衡采样
def smote(X_train, y_train):
    # Create an instance of SMOTE
    smote = SMOTE(random_state=10)
    # Apply SMOTE to the training data
    X_train_resampled, y_train_resampled = smote.fit_resample(X_train, y_train)
    return X_train_resampled, y_train_resampled

In [4]:
# 加载本地数据集
# X_train = pd.read_csv(r"scale/X_train_minmaxscaler.csv")['ChatGPT回答']
# X_test = pd.read_csv(r"scale/X_test_minmaxscaler.csv")['ChatGPT回答']
y_train = pd.read_csv(r"data/y_train_minmaxscaler.csv")
y_test = pd.read_csv(r"data/y_test_minmaxscaler.csv")

In [5]:
# 构建词向量
X_train_embedding = torch.load('data_embedding/train_embedding.pt',map_location=torch.device('cpu')).numpy()
X_test_embedding = torch.load('data_embedding/test_embedding.pt',map_location=torch.device('cpu')).numpy()

In [None]:
# smote数据均衡化
X_train_embedding, y_train = smote(X_train_embedding, y_train)
X_test_embedding, y_test = smote(X_test_embedding, y_test)

In [6]:
#改变数据形状
X_train_embedding= torch.unsqueeze(torch.tensor(X_train_embedding),dim=1).numpy()
X_test_embedding= torch.unsqueeze(torch.tensor(X_test_embedding),dim=1).numpy()

TxtRNN模型

In [1]:
# # 构建RNN模型
# model = Sequential()
# model.add(Input(shape=(1, 768)))
# model.add(LSTM(256))
# model.add(Dense(128, activation='relu'))
# model.add(Dense(64, activation='relu'))
# model.add(Dense(1, activation='sigmoid'))
# model.summary()

In [2]:
# 构建RNN_attention模型

inputs = Input(shape=(1, 768))  # 假设输入形状为(1, 768)
# 添加LSTM层
lstm_layer = LSTM(256, return_sequences=True)(inputs)
# 添加Dense层作为query和value
query = Dense(128)(lstm_layer)
value = Dense(128)(lstm_layer)
# 添加Attention层
attention = Attention()([query, value])

# 添加全连接层
dense_layer = Dense(128, activation='relu')(attention)
output_layer = Dense(64, activation='relu')(dense_layer)
# 添加输出层
output = Dense(1, activation='sigmoid')(output_layer)
# 构建模型
model = Model(inputs=inputs, outputs=output)

#模型结构
# model.summary()

In [19]:
# 编译模型
optimizer= tf.keras.optimizers.legacy.Adam(learning_rate=0.0001,decay=1e-6)#注意legacy
model.compile(loss='mean_squared_error', optimizer=optimizer, metrics=['accuracy'])

In [20]:
def on_epoch_end(epoch, logs):  

    # print(f'Epoch {epoch + 1}, Loss: {logs["loss"]}') 
    print(f'Epoch {epoch + 1}') 
    train_loss = logs.get('loss')  
    val_loss = logs.get('val_loss')  
    train_acc = logs.get('accuracy')  
    val_acc = logs.get('val_accuracy')  
      
    print(f'Epoch {epoch + 1}, Loss: {train_loss:.4f}, Val Loss: {val_loss:.4f}, '  
          f'Acc: {train_acc:.4f}, Val Acc: {val_acc:.4f}')  
    print("\n")

In [3]:
# 训练模型
callback = LambdaCallback(on_epoch_end=on_epoch_end)
history=model.fit(X_train_embedding, y_train, batch_size=32, epochs=64, validation_split=0.2,callbacks=[callback])

In [4]:
# 评估模型
# score = model.evaluate(X_test_embedding, y_test, verbose=0)  
# print('Test loss:', score[0])  
# print('Test accuracy:', score[1])

In [None]:
#  # history.history 字典将包含每个epoch的loss和val_loss值  
# loss = history.history['loss']  
# val_loss = history.history['val_loss']
#  
# # 绘制训练和验证loss曲线  
# plt.figure(figsize=(10, 6))
# plt.plot(loss, label='Training Loss')  
# plt.plot(val_loss, label='Validation Loss')  
# plt.title('Loss Curve')  
# plt.xlabel('Epoch')  
# plt.ylabel('Loss')  
# plt.legend()  
# 
# #保存loss曲线
# plt.savefig('txtRNN_loss_curve.png')
# plt.show()

In [None]:
#绘制准确率曲线
# plt.figure(figsize=(10, 6))
# plt.plot(history.history['accuracy'])
# plt.plot(history.history['val_accuracy'])
# plt.title('Model Accuracy')
# plt.ylabel('Accuracy')
# plt.xlabel('Epoch')
# plt.legend(['Train', 'Validation'], loc='lower right')
# plt.savefig('txtRNN_accuracy.png')
# plt.show()

In [None]:
#模型的评价
def evaluate_textRNN_model(model, X_test, y_test):

    # Predict probabilities
    y_pred_proba = model.predict(X_test)
    y_pred_proba=np.squeeze(y_pred_proba,1)
    # Predict labels
    y_pred=np.where(y_pred_proba>0.5,1,0)
    
    # Calculate accuracy, precision, recall, F1-score, and AUC
    accuracy = accuracy_score(y_test, y_pred)
    precision = precision_score(y_test, y_pred)
    recall = recall_score(y_test, y_pred)
    f1 = f1_score(y_test, y_pred)
    auc = roc_auc_score(y_test, y_pred_proba)
    
    # Calculate ROC curve
    fpr, tpr, thresholds = roc_curve(y_test, y_pred_proba)
    
    return [accuracy, precision, recall, f1, auc,fpr.tolist(), tpr.tolist()]

In [None]:
import json

# # 加载模型
# def load_model(model_name):
#     model = tf.keras.models.load_model(model_name)
#     return model

#计算每个模型的评价指标,bp和svm模型需要单独计算
metrics_name = ['accuracy', 'precision', 'recall', 'f1-score','auc','fpr-score','tpr-score']
#计算每个模型的评价指标值，然后按照模型名，指标名称将结果存入一个字典

metrics = evaluate_textRNN_model(model, tf.convert_to_tensor(X_test_embedding), y_test)
textRNN_metrics_dict = {metrics_name[j]: metrics[j] for j in range(len(metrics))}

In [None]:
#以json文件保存字典结果
with open('textRNN_metrics_dict.json', 'w') as f:
    json.dump(textRNN_metrics_dict, f)

In [None]:
# 保存模型
# model.save('textRNN_model.h5')