In [1]:
import os
os.environ["KERAS_BACKEND"] = "torch"

import torch
import pandas as pd
import librosa
import numpy as np
from sklearn.model_selection import train_test_split
from sklearn.ensemble import RandomForestClassifier
from sklearn.metrics import accuracy_score
from sklearn.preprocessing import LabelEncoder
from keras import models, layers, callbacks
from keras.utils import plot_model

device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
print(f"Using device: {device}")

Using device: cuda


In [2]:
CONFIG = {
  "dataset_path": "../datasets/",
  "story_path": "../datasets/CBU0521DD_stories/",
  "label_path": "../datasets/CBU0521DD_stories_attributes.csv",
  "model_path": "../models/",
}

In [3]:
labels_df = pd.read_csv(CONFIG["label_path"])
labels_df.head()

Unnamed: 0,filename,Language,Story_type
0,00001.wav,Chinese,True Story
1,00002.wav,Chinese,True Story
2,00003.wav,Chinese,True Story
3,00004.wav,Chinese,True Story
4,00005.wav,Chinese,True Story


In [4]:
def extract_features(file_path, sr=16000, n_mels=128, duration=240):
    # 加载音频
    audio, sr = librosa.load(file_path, sr=sr)
    
    # 提取梅尔谱图和对数变换
    mel_spectrogram = librosa.feature.melspectrogram(y=audio, sr=sr, n_mels=n_mels)
    log_mel_spectrogram = librosa.power_to_db(mel_spectrogram)
    
    # 提取MFCC
    mfcc = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=20)
    
    # 提取Chroma特征
    chroma = librosa.feature.chroma_stft(y=audio, sr=sr)
    
    # 提取时域特征
    zero_crossing_rate = librosa.feature.zero_crossing_rate(y=audio)
    short_term_energy = np.sum(audio ** 2) / len(audio)
    duration_feature = len(audio) / sr  # 音频的时长
    
    # 提取频域特征
    spectral_centroid = librosa.feature.spectral_centroid(y=audio, sr=sr)
    spectral_rolloff = librosa.feature.spectral_rolloff(y=audio, sr=sr, roll_percent=0.85)
    spectral_flux = librosa.onset.onset_strength(y=audio, sr=sr)
    
    # 拼接所有特征
    features = np.vstack([
        log_mel_spectrogram, mfcc, chroma, zero_crossing_rate,
        spectral_centroid, spectral_rolloff, spectral_flux
    ])
    
    # 填充或裁剪到统一大小
    target_length = int(sr * duration / 512)  # 计算目标时间步数
    if features.shape[1] < target_length:
        padding = np.zeros((features.shape[0], target_length - features.shape[1]))
        features = np.concatenate([features, padding], axis=1)
    else:
        features = features[:, :target_length]
    
    # 添加时域特征，并进行广播匹配时间步数
    additional_features = np.array([short_term_energy, duration_feature])
    additional_features = np.repeat(additional_features[:, np.newaxis], features.shape[1], axis=1)  # 广播
    
    # 拼接时域特征和其他特征
    features = np.concatenate([features, additional_features], axis=0)

    return features

In [5]:
# 提取所有音频文件的特征和标签
def load_data(dataset_path, labels_df):
    features = []
    labels = []
    
    for idx, row in labels_df.iterrows():
        filename = row['filename']
        file_path = os.path.join(dataset_path, filename)
        
        # 提取音频特征
        audio_features = extract_features(file_path)
        
        # 获取标签：故事的真实性 ("True Story" / "False Story")
        story_type = row['Story_type']
        
        # 将特征和标签添加到列表中
        features.append(audio_features)
        labels.append(story_type)
    
    return np.array(features), np.array(labels)

# 加载数据
features, labels = load_data(CONFIG['story_path'], labels_df)

In [6]:
# 使用LabelEncoder将文本标签转化为数值标签
label_encoder = LabelEncoder()
labels_encoded = label_encoder.fit_transform(labels)

# 查看编码后的标签
print("Encoded labels:", labels_encoded)

Encoded labels: [1 1 1 1 1 1 1 1 0 0 0 0 0 0 0 0 1 1 1 1 1 1 1 1 1 0 0 0 0 0 0 0 0 0 1 0 1
 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 1 0 0 0 1 0 1 1
 0 0 1 1 1 0 1 1 1 0 0 0 1 0 1 0 1 0 0 0 1 1 0 1 0 1]


In [7]:
# 划分训练集和测试集
X_train, X_test, y_train, y_test = train_test_split(features, labels_encoded, test_size=0.2, random_state=42)

print(f"Training set size: {X_train.shape[0]}, Test set size: {X_test.shape[0]}")

# 适应神经网络输入格式
X_train = X_train[..., np.newaxis]  # 添加一个维度（1通道）
X_test = X_test[..., np.newaxis]

Training set size: 80, Test set size: 20


In [8]:
X_train.shape

(80, 166, 7500, 1)

In [9]:
def build_cnn_lstm_model(input_shape):
    model = models.Sequential()

    # 输入层
    model.add(layers.Input(shape=input_shape))
    
    # 卷积层部分
    model.add(layers.Conv2D(32, (3, 3), activation='relu', padding='same'))
    model.add(layers.BatchNormalization())
    model.add(layers.MaxPooling2D((2, 2)))
    model.add(layers.Dropout(0.2))

    model.add(layers.Conv2D(64, (3, 3), activation='relu', padding='same'))
    model.add(layers.BatchNormalization())
    model.add(layers.MaxPooling2D((2, 2)))
    model.add(layers.Dropout(0.3))

    model.add(layers.Conv2D(128, (3, 3), activation='relu', padding='same'))
    model.add(layers.BatchNormalization())
    model.add(layers.MaxPooling2D((2, 2)))
    model.add(layers.Dropout(0.4))

    # Flatten 卷积输出
    model.add(layers.Flatten())

    # 调整形状为 LSTM 的输入格式
    model.add(layers.Reshape((-1, 128)))

    # LSTM 层部分
    model.add(layers.Bidirectional(layers.LSTM(128, return_sequences=True)))
    model.add(layers.Bidirectional(layers.LSTM(64)))

    # 全连接层部分
    model.add(layers.Dense(64, activation='relu'))
    model.add(layers.BatchNormalization())
    model.add(layers.Dropout(0.5))
    model.add(layers.Dense(1, activation='sigmoid'))

    # 编译模型
    model.compile(
        optimizer='adam',
        loss='binary_crossentropy',
        metrics=['accuracy']
    )

    return model

In [10]:
def show_history(history):
    import matplotlib.pyplot as plt

    plt.figure(figsize=(12, 4))

    plt.subplot(1, 2, 1)
    plt.plot(history.history['accuracy'], label='Training Accuracy')
    plt.plot(history.history['val_accuracy'], label='Validation Accuracy')
    plt.xlabel('Epoch')
    plt.ylabel('Accuracy')
    plt.legend()

    plt.subplot(1, 2, 2)
    plt.plot(history.history['loss'], label='Training Loss')
    plt.plot(history.history['val_loss'], label='Validation Loss')
    plt.xlabel('Epoch')
    plt.ylabel('Loss')
    plt.legend()

    plt.tight_layout()
    plt.show()

In [13]:
def train_multiple_cnn_lstm_models(X_train, y_train, X_test, y_test, n_models=3):
    models_list = []
    for i in range(n_models):
        print(f"Training Model {i + 1}")

        # 构建模型
        model = build_cnn_lstm_model((X_train.shape[1], X_train.shape[2], X_train.shape[3]))

        # # 设置回调函数
        # early_stopping = callbacks.EarlyStopping(
        #     monitor='val_loss', patience=10, restore_best_weights=True
        # )
        # reduce_lr = callbacks.ReduceLROnPlateau(
        #     monitor='val_loss', factor=0.5, patience=5, min_lr=1e-6, verbose=1
        # )
        model_checkpoint = callbacks.ModelCheckpoint(
            filepath=os.path.join(CONFIG['model_path'], f'best_model_{i + 1}.keras'),
            monitor='val_accuracy', save_best_only=True, verbose=1
        )
        callbacks_list = [model_checkpoint]

        # 训练模型
        history = model.fit(
            X_train, y_train,
            validation_data=(X_test, y_test),
            epochs=10,
            batch_size=1,
            callbacks=callbacks_list,
            verbose=1
        )

        # 可视化训练过程
        show_history(history)

        # 添加模型到列表
        models_list.append(model)

    return models_list

In [14]:
# 训练多个CNN-LSTM模型
models_list = train_multiple_cnn_lstm_models(X_train, y_train, X_test, y_test, n_models=1)

# 输出：显示每个模型的训练过程
for i, model in enumerate(models_list):
    print(f"Model {i + 1} training complete.")

Training Model 1


In [None]:
# 训练一个随机森林模型
rf_model = RandomForestClassifier(n_estimators=100, random_state=42)
rf_model.fit(X_train.reshape(X_train.shape[0], -1), y_train)

In [15]:
def ensemble_predict(models_list, rf_model, X_test, weights=None):
    # CNN-LSTM 模型预测
    nn_preds = np.zeros((len(models_list), X_test.shape[0]))
    for i, model in enumerate(models_list):
        nn_preds[i] = model.predict(X_test).flatten()

    # 随机森林模型预测
    rf_preds = rf_model.predict_proba(X_test.reshape(X_test.shape[0], -1))[:, 1]

    # 加权投票
    if weights is None:
        weights = [1 / len(models_list)] * len(models_list)  # 默认等权重
    nn_pred_avg = np.average(nn_preds, axis=0, weights=weights)
    
    # 最终预测
    final_pred_prob = (nn_pred_avg + rf_preds) / 2
    final_pred = (final_pred_prob > 0.5).astype(int)
    
    return final_pred

In [None]:
# 预测并评估集成模型的效果
final_pred = ensemble_predict(models_list, rf_model, X_test)
ensemble_accuracy = accuracy_score(y_test, final_pred)
print(f"Ensemble Model Test Accuracy: {ensemble_accuracy}")

# 输出预测结果与真实标签
predicted_labels = label_encoder.inverse_transform(final_pred)
true_labels = label_encoder.inverse_transform(y_test)
for true, pred in zip(true_labels, predicted_labels):
    print(f"Actual: {true}, Predicted: {pred}")

In [17]:
def load_models(models_path, n_models):
    models_list = []
    for i in range(n_models):
        model = models.load_model(os.path.join(models_path, f'best_model_{i + 1}.keras'))
        models_list.append(model)
    return models_list

In [19]:
models_list = load_models(CONFIG['model_path'], 1)

In [23]:
from sklearn.metrics import confusion_matrix, classification_report, auc
from scipy.interpolate import interp1d
import matplotlib.pyplot as plt
import seaborn as sns

In [27]:
# 计算并绘制混淆矩阵
def plot_confusion_matrix(y_true, y_pred, labels):
    cm = confusion_matrix(y_true, y_pred)
    plt.figure(figsize=(6, 6))
    sns.heatmap(cm, annot=True, fmt="d", cmap="Blues", xticklabels=labels, yticklabels=labels)
    plt.ylabel('Actual')
    plt.xlabel('Predicted')
    plt.title('Confusion Matrix')
    plt.show()

# 计算其他评估指标
def evaluate_model(y_true, y_pred):
    report = classification_report(y_true, y_pred)
    print("Classification Report:\n", report)

    # 计算 Sensitivity, Specificity, Precision, F1 Score 等指标
    cm = confusion_matrix(y_true, y_pred)
    TP = cm[1, 1]  # True Positive
    TN = cm[0, 0]  # True Negative
    FP = cm[0, 1]  # False Positive
    FN = cm[1, 0]  # False Negative

    sensitivity = TP / (TP + FN)  # True Positive Rate
    specificity = TN / (TN + FP)  # True Negative Rate
    precision = TP / (TP + FP)  # Precision
    f1_score = 2 * (precision * sensitivity) / (precision + sensitivity)  # F1 Score

    print(f"Sensitivity: {sensitivity:.2f}")
    print(f"Specificity: {specificity:.2f}")
    print(f"Precision: {precision:.2f}")
    print(f"F1 Score: {f1_score:.2f}")

# 绘制ROC曲线（通过不同阈值生成混淆矩阵）
def plot_roc_curve(y_true, y_pred_prob):
    # 定义所有可能的阈值
    thresholds = np.linspace(0, 1, 1000)  # 在 [0, 1] 之间生成 100 个均匀分布的阈值

    # 初始化 TPR（Sensitivity）和 FPR（1-Specificity）的列表
    tpr_list = []
    fpr_list = []

    # 遍历每个阈值
    for threshold in thresholds:
        # 根据阈值生成预测标签
        y_pred = (y_pred_prob >= threshold).astype(int)

        # 计算混淆矩阵
        tn, fp, fn, tp = confusion_matrix(y_true, y_pred).ravel()

        # 计算 TPR 和 FPR
        tpr = tp / (tp + fn) if (tp + fn) > 0 else 0
        fpr = fp / (fp + tn) if (fp + tn) > 0 else 0

        tpr_list.append(tpr)
        fpr_list.append(fpr)

    # 计算 AUC
    roc_auc = auc(fpr_list, tpr_list)

    # 绘制 ROC 曲线
    plt.figure(figsize=(8, 6))
    plt.plot(fpr_list, tpr_list, label=f'ROC curve (AUC = {roc_auc:.2f})', color='blue')

    # 绘制随机分类器的基线
    plt.plot([0, 1], [0, 1], linestyle='--', color='gray', label='Random Guess')

    # 设置图形标题和标签
    plt.xlabel('1 - Specificity (False Positive Rate)')
    plt.ylabel('Sensitivity (True Positive Rate)')
    plt.title('Receiver Operating Characteristic (ROC) Curve with Cost Ratio')
    plt.legend(loc='lower right')
    plt.show()

# 对于集成模型进行评估
def evaluate_ensemble_model(y_true, models_list, rf_model, X_test):
    # 获取CNN-LSTM模型的预测概率
    nn_preds_prob = np.zeros((len(models_list), X_test.shape[0]))
    for i, model in enumerate(models_list):
        nn_preds_prob[i] = model.predict(X_test).flatten()

    # 获取随机森林模型的预测概率
    rf_preds_prob = rf_model.predict_proba(X_test.reshape(X_test.shape[0], -1))[:, 1]

    # 计算加权平均的预测概率
    nn_pred_avg = np.mean(nn_preds_prob, axis=0)
    final_pred_prob = (nn_pred_avg + rf_preds_prob) / 2

    # 生成最终的预测标签
    final_pred = (final_pred_prob > 0.5).astype(int)

    # 评估结果
    print("\nEnsemble Model Evaluation:")
    evaluate_model(y_true, final_pred)
    plot_confusion_matrix(y_true, final_pred, labels=['False Story', 'True Story'])
    plot_roc_curve(y_true, final_pred_prob)
    
    return final_pred, final_pred_prob

In [None]:
# 评估集成模型
final_pred, final_pred_prob = evaluate_ensemble_model(y_test, models_list, rf_model, X_test)

# 输出预测结果与真实标签
predicted_labels = label_encoder.inverse_transform(final_pred)
true_labels = label_encoder.inverse_transform(y_test)
for true, pred in zip(true_labels, predicted_labels):
    print(f"Actual: {true}, Predicted: {pred}")

In [None]:
plot_model(models_list[0], show_shapes=True)