In [1]:
import os, re, math, random, warnings
from pathlib import Path
import numpy as np
import pandas as pd
import librosa, librosa.display
import soundfile as sf
import matplotlib.pyplot as plt
import seaborn as sns
from sklearn.model_selection import GroupShuffleSplit, train_test_split
from sklearn.preprocessing import StandardScaler, LabelEncoder
from sklearn.decomposition import PCA
from sklearn.metrics import classification_report, confusion_matrix
import tensorflow as tf
from tensorflow.keras import layers, models, callbacks

2025-11-28 03:24:45.928400: I tensorflow/core/util/port.cc:110] oneDNN custom operations are on. You may see slightly different numerical results due to floating-point round-off errors from different computation orders. To turn them off, set the environment variable `TF_ENABLE_ONEDNN_OPTS=0`.
2025-11-28 03:24:45.990630: I tensorflow/core/platform/cpu_feature_guard.cc:182] This TensorFlow binary is optimized to use available CPU instructions in performance-critical operations.
To enable the following instructions: AVX2 AVX512F AVX512_VNNI FMA, in other operations, rebuild TensorFlow with the appropriate compiler flags.


In [2]:
warnings.filterwarnings("ignore")
plt.rcParams["figure.figsize"] = (10, 5)
sns.set(style="whitegrid")

# ====== 修改为你的 AudioWAV 路径 ======
AUDIO_DIR = "../AudioWAV"
AUDIO_DIR = Path(AUDIO_DIR)
assert AUDIO_DIR.exists(), f"路径不存在：{AUDIO_DIR}"

# 固定随机种子，结果更可复现
SEED = 42
random.seed(SEED); np.random.seed(SEED); tf.random.set_seed(SEED)

In [3]:
# CREMA-D 文件名格式： <speaker>_<sentence>_<emotion>_<intensity>.wav
# 例：1001_DFA_ANG_XX.wav
pat = re.compile(r"(?P<spk>\d{4})_(?P<sent>[A-Z]{3})_(?P<emo>[A-Z]{3})_(?P<inten>[A-Z]{2})\.wav$", re.I)

def parse_filename(p: Path):
    m = pat.search(p.name)
    if not m:
        return None
    gd = m.groupdict()
    return {
        "path": p,
        "speaker": int(gd["spk"]),
        "sentence": gd["sent"].upper(),
        "emotion": gd["emo"].upper(),
        "intensity": gd["inten"].upper(),
    }

files = sorted(AUDIO_DIR.glob("*.wav"))
rows = [parse_filename(p) for p in files]
meta = pd.DataFrame([r for r in rows if r is not None])

# 情绪与强度的映射/可读名
EMO_MAP = {"ANG":"Anger", "DIS":"Disgust", "FEA":"Fear", "HAP":"Happy", "NEU":"Neutral", "SAD":"Sad"}
INTEN_ORDER = ["LO","MD","HI","XX"]  # XX=未知
meta["emotion_name"] = meta["emotion"].map(EMO_MAP)
meta["intensity"] = pd.Categorical(meta["intensity"], categories=INTEN_ORDER, ordered=True)

meta.head()


Unnamed: 0,path,speaker,sentence,emotion,intensity,emotion_name
0,AudioWAV/1001_DFA_ANG_XX.wav,1001,DFA,ANG,XX,Anger
1,AudioWAV/1001_DFA_DIS_XX.wav,1001,DFA,DIS,XX,Disgust
2,AudioWAV/1001_DFA_FEA_XX.wav,1001,DFA,FEA,XX,Fear
3,AudioWAV/1001_DFA_HAP_XX.wav,1001,DFA,HAP,XX,Happy
4,AudioWAV/1001_DFA_NEU_XX.wav,1001,DFA,NEU,XX,Neutral


In [4]:
# 放在 import tensorflow as tf 之后、构建模型之前
gpus = tf.config.list_physical_devices('GPU')
if gpus:
    try:
        for gpu in gpus:
            tf.config.experimental.set_memory_growth(gpu, True)
        print("启用 GPU 显存按需分配")
    except RuntimeError as e:
        print(e)


启用 GPU 显存按需分配


In [7]:
# =========================
# 一些全局配置
# =========================
SR = 16000                # 重采样采样率
DURATION = 3.0            # 统一到 3 秒，可按需改成 4.0
N_MELS = 64               # 梅尔频带数
N_FFT = 1024
HOP_LENGTH = 320          # 约 20ms 帧移
FMIN, FMAX = 50, 8000

MAX_LEN_SAMPLES = int(SR * DURATION)   # 固定长度的波形点数

print("样本数:", len(meta))
print(meta["emotion_name"].value_counts())

# =========================
# Label 编码（情绪 6 类）
# =========================
le = LabelEncoder()
meta["emo_idx"] = le.fit_transform(meta["emotion"])  # 直接用三字母编码
num_classes = len(le.classes_)
print("情绪标签映射:", dict(zip(le.classes_, range(num_classes))))

# =========================
# 按 speaker 分组切 train/val/test
# =========================
gss = GroupShuffleSplit(n_splits=1, test_size=0.2, random_state=SEED)
train_val_idx, test_idx = next(gss.split(meta, groups=meta["speaker"]))

meta_train_val = meta.iloc[train_val_idx].reset_index(drop=True)
meta_test      = meta.iloc[test_idx].reset_index(drop=True)

# 再从 train_val 里切出 val
gss2 = GroupShuffleSplit(n_splits=1, test_size=0.2, random_state=SEED)
train_idx, val_idx = next(gss2.split(meta_train_val, groups=meta_train_val["speaker"]))

meta_train = meta_train_val.iloc[train_idx].reset_index(drop=True)
meta_val   = meta_train_val.iloc[val_idx].reset_index(drop=True)

print("Train:", len(meta_train), "Val:", len(meta_val), "Test:", len(meta_test))

# =========================
# 音频 -> 固定长度波形 -> log-mel 特征
# =========================
def load_and_fix_length(path, sr=SR, max_len=MAX_LEN_SAMPLES):
    y, _ = librosa.load(path, sr=sr)
    # 实例级 Z-score 标准化，避免响度差异
    y = y - np.mean(y)
    y_std = np.std(y) + 1e-9
    y = y / y_std

    if len(y) < max_len:
        # 居中 padding
        pad_width = max_len - len(y)
        left = pad_width // 2
        right = pad_width - left
        y = np.pad(y, (left, right), mode="constant")
    elif len(y) > max_len:
        # 居中裁剪
        start = (len(y) - max_len) // 2
        y = y[start:start+max_len]
    return y

def waveform_to_logmel(y, sr=SR, n_mels=N_MELS, n_fft=N_FFT, hop_length=HOP_LENGTH,
                       fmin=FMIN, fmax=FMAX):
    S = librosa.feature.melspectrogram(
        y=y, sr=sr, n_fft=n_fft, hop_length=hop_length,
        n_mels=n_mels, fmin=fmin, fmax=fmax, power=2.0
    )
    logS = librosa.power_to_db(S, ref=np.max)
    # 实例级标准化
    m = logS.mean()
    s = logS.std() + 1e-9
    logS = (logS - m) / s
    return logS.astype(np.float32)   # shape: (n_mels, time)

def extract_features(df):
    X = []
    y = []
    for i, row in df.iterrows():
        path = row["path"]
        emo_idx = row["emo_idx"]
        y_wav = load_and_fix_length(path)
        logmel = waveform_to_logmel(y_wav)  # (n_mels, time)
        X.append(logmel)
        y.append(emo_idx)
        if (i+1) % 500 == 0:
            print(f"  已处理 {i+1}/{len(df)} 条音频")
    X = np.stack(X, axis=0)   # (N, n_mels, time)
    y = np.array(y, dtype=np.int32)
    # 增加一个通道维度 -> (N, n_mels, time, 1)
    X = X[..., np.newaxis]
    return X, y

print("提取 Train 特征...")
X_train, y_train = extract_features(meta_train)

print("提取 Val 特征...")
X_val, y_val = extract_features(meta_val)

print("提取 Test 特征...")
X_test, y_test = extract_features(meta_test)

print("X_train:", X_train.shape, "y_train:", y_train.shape)
print("X_val:", X_val.shape, "X_test:", X_test.shape)

# =========================
# tf.data 数据管线（可选简单增强）
# 这里先不上 SpecAugment，后面你可以在此基础上加
# =========================

def make_dataset(X, y, training=False):
    ds = tf.data.Dataset.from_tensor_slices((X, y))
    if training:
        ds = ds.shuffle(buffer_size=len(X), seed=SEED)
    ds = ds.batch(BATCH_SIZE).prefetch(tf.data.AUTOTUNE)
    return ds

train_ds = make_dataset(X_train, y_train, training=True)
val_ds   = make_dataset(X_val, y_val, training=False)
test_ds  = make_dataset(X_test, y_test, training=False)

# =========================
# Transformer 相关模块
# =========================
class PositionalEmbedding(layers.Layer):
    """简单可训练位置编码：对时间轴做 embedding。"""
    def __init__(self, maxlen, d_model, **kwargs):
        super().__init__(**kwargs)
        self.maxlen = maxlen
        self.d_model = d_model
        self.pos_emb = layers.Embedding(input_dim=maxlen, output_dim=d_model)

    def call(self, x):
        # x: (batch, time, d_model)
        length = tf.shape(x)[1]
        positions = tf.range(start=0, limit=length, delta=1)
        positions = self.pos_emb(positions)  # (time, d_model)
        return x + positions

def transformer_encoder(x, head_size, num_heads, ff_dim, dropout=0.1):
    """标准 Transformer Encoder Block。"""
    # Multi-Head Self Attention
    attn = layers.MultiHeadAttention(num_heads=num_heads, key_dim=head_size)(x, x)
    attn = layers.Dropout(dropout)(attn)
    x = layers.LayerNormalization(epsilon=1e-6)(x + attn)

    # Feed-Forward
    ff = layers.Dense(ff_dim, activation="relu")(x)
    ff = layers.Dropout(dropout)(ff)
    ff = layers.Dense(x.shape[-1])(ff)
    x = layers.LayerNormalization(epsilon=1e-6)(x + ff)
    return x

# =========================
# 构建 CNN 前端 + Transformer 编码器 + 判别头 模型
# =========================
def build_cnn_transformer_model(
    input_shape,
    num_classes,
    cnn_channels=(32, 64, 128),
    d_model=256,
    num_heads=4,
    num_transformer_blocks=4,
    ff_dim=512,
    dropout_cnn=0.2,
    dropout_transformer=0.2,
    dropout_head=0.4
):
    """
    input_shape: (n_mels, time, 1)
    """
    inp = layers.Input(shape=input_shape)

    x = inp
    # ---- CNN 声学前端：多层 Conv2D + BN + ReLU + MaxPool ----
    for ch in cnn_channels:
        x = layers.Conv2D(ch, (3, 3), padding="same", use_bias=False)(x)
        x = layers.BatchNormalization()(x)
        x = layers.Activation("relu")(x)
        # 对频率轴做 pool，尽量保留时间分辨率
        x = layers.MaxPooling2D(pool_size=(2, 2))(x)
        x = layers.Dropout(dropout_cnn)(x)

    # x: (batch, mel', time', ch_last)
    # 我们想要以“时间”为序列维度，把 (mel', ch) 合并成特征维度
    # 先把维度变成 (batch, time', mel', ch)
    x = layers.Permute((2, 1, 3))(x)  # (batch, time', mel', ch)
    time_steps = tf.shape(x)[1]
    feat_dim = x.shape[2] * x.shape[3]  # 静态维度

    x = layers.Reshape((-1, feat_dim))(x)  # (batch, time', feat_dim)

    # 线性投到 Transformer 的 d_model 维度
    x = layers.Dense(d_model)(x)

    # 加位置编码
    # 这里 maxlen 用 input_shape 的时间轴近似，也可以用 x.shape[1]
    maxlen = x.shape[1]
    if maxlen is None:
        # fallback：给个较大的 maxlen
        maxlen = 500
    pos_emb = PositionalEmbedding(maxlen=maxlen, d_model=d_model)
    x = pos_emb(x)

    # ---- 多层 Transformer Encoder ----
    for _ in range(num_transformer_blocks):
        x = transformer_encoder(
            x,
            head_size=d_model // num_heads,
            num_heads=num_heads,
            ff_dim=ff_dim,
            dropout=dropout_transformer
        )

    # ---- 判别性分类头：Attention Pooling + Dense ----
    # Attention Pooling：让模型自动关注情感强的时刻
    attn_scores = layers.Dense(1)(x)               # (batch, time, 1)
    attn_scores = tf.nn.softmax(attn_scores, axis=1)
    x_pooled = tf.reduce_sum(x * attn_scores, axis=1)  # (batch, d_model)

    x = layers.Dense(256, activation="relu")(x_pooled)
    x = layers.Dropout(dropout_head)(x)
    x = layers.BatchNormalization()(x)
    x = layers.Dense(128, activation="relu")(x)
    x = layers.Dropout(dropout_head)(x)

    out = layers.Dense(num_classes, activation="softmax")(x)

    model = models.Model(inputs=inp, outputs=out)
    return model

input_shape = X_train.shape[1:]  # (n_mels, time, 1)
model = build_cnn_transformer_model(
    input_shape=input_shape,
    num_classes=num_classes,
    cnn_channels=(32, 64, 128, 256),   # 显存够，可以再深一点
    d_model=256,
    num_heads=4,
    num_transformer_blocks=4,
    ff_dim=512
)

model.summary()


样本数: 7442
emotion_name
Anger      1271
Disgust    1271
Fear       1271
Happy      1271
Sad        1271
Neutral    1087
Name: count, dtype: int64
情绪标签映射: {'ANG': 0, 'DIS': 1, 'FEA': 2, 'HAP': 3, 'NEU': 4, 'SAD': 5}
Train: 4667 Val: 1223 Test: 1552
提取 Train 特征...
  已处理 500/4667 条音频
  已处理 1000/4667 条音频
  已处理 1500/4667 条音频
  已处理 2000/4667 条音频
  已处理 2500/4667 条音频
  已处理 3000/4667 条音频
  已处理 3500/4667 条音频
  已处理 4000/4667 条音频
  已处理 4500/4667 条音频
提取 Val 特征...
  已处理 500/1223 条音频
  已处理 1000/1223 条音频
提取 Test 特征...
  已处理 500/1552 条音频
  已处理 1000/1552 条音频
  已处理 1500/1552 条音频
X_train: (4667, 64, 151, 1) y_train: (4667,)
X_val: (1223, 64, 151, 1) X_test: (1552, 64, 151, 1)
Model: "model_1"
__________________________________________________________________________________________________
 Layer (type)                Output Shape                 Param #   Connected to                  
 input_2 (InputLayer)        [(None, 64, 151, 1)]         0         []                            
                      

In [9]:
# =========================
# 优化器 & 学习率调度
# =========================
EPOCHS = 80
BATCH_SIZE = 64

# 初始学习率
base_lr = 1e-3

# Cosine Decay + Warmup 的简单实现（Keras 回调方式）
class CosineAnnealingWithWarmup(callbacks.Callback):
    def __init__(self, base_lr, total_steps, warmup_steps=0):
        super().__init__()
        self.base_lr = base_lr
        self.total_steps = total_steps
        self.warmup_steps = warmup_steps
        self.global_step = 0

    def on_train_batch_begin(self, batch, logs=None):
        self.global_step += 1
        if self.global_step < self.warmup_steps:
            lr = self.base_lr * self.global_step / float(self.warmup_steps)
        else:
            progress = (self.global_step - self.warmup_steps) / float(
                max(1, self.total_steps - self.warmup_steps)
            )
            lr = 0.5 * self.base_lr * (1 + math.cos(math.pi * progress))
        tf.keras.backend.set_value(self.model.optimizer.lr, lr)

    def on_epoch_end(self, epoch, logs=None):
        lr = tf.keras.backend.get_value(self.model.optimizer.lr)
        print(f"\n[Epoch {epoch+1}] current lr: {lr:.6e}")

steps_per_epoch = math.ceil(len(X_train) / BATCH_SIZE)
total_steps = steps_per_epoch * EPOCHS
warmup_steps = int(0.1 * total_steps)  # 前 10% step 线性 warmup

cosine_cb = CosineAnnealingWithWarmup(
    base_lr=base_lr,
    total_steps=total_steps,
    warmup_steps=warmup_steps
)

early_stop_cb = callbacks.EarlyStopping(
    monitor="val_loss",
    patience=10,
    restore_best_weights=True
)

checkpoint_cb = callbacks.ModelCheckpoint(
    "../best_cnn_transformer_ser.h5",
    monitor="val_macro_f1",
    mode="max",
    save_best_only=True,
    save_weights_only=False
)

# 自定义 Macro-F1 指标（tf 里以 batch 内近似，最终评估还是用 sklearn 更准）
def macro_f1(y_true, y_pred):
    y_true = tf.cast(y_true, tf.int32)
    y_pred_labels = tf.argmax(y_pred, axis=-1, output_type=tf.int32)

    # 混淆矩阵: (num_classes, num_classes)
    cm = tf.math.confusion_matrix(
        y_true,
        y_pred_labels,
        num_classes=num_classes,
        dtype=tf.float32
    )
    # per-class precision, recall
    tp = tf.linalg.diag_part(cm)
    precision = tp / (tf.reduce_sum(cm, axis=0) + 1e-9)
    recall = tp / (tf.reduce_sum(cm, axis=1) + 1e-9)
    f1 = 2 * precision * recall / (precision + recall + 1e-9)
    return tf.reduce_mean(f1)

# 编译模型
model.compile(
    optimizer=tf.keras.optimizers.Adam(learning_rate=base_lr),
    loss="sparse_categorical_crossentropy",
    metrics=["accuracy", macro_f1]
)

# 训练
history = model.fit(
    train_ds,
    validation_data=val_ds,
    epochs=EPOCHS,
    callbacks=[cosine_cb, early_stop_cb, checkpoint_cb]
)


Epoch 1/80


2025-11-28 03:37:32.176467: E tensorflow/core/grappler/optimizers/meta_optimizer.cc:954] layout failed: INVALID_ARGUMENT: Size of values 0 does not match size of permutation 4 @ fanin shape inmodel_1/dropout_14/dropout/SelectV2-2-TransposeNHWCToNCHW-LayoutOptimizer


[Epoch 1] current lr: 6.335616e-05
Epoch 2/80
[Epoch 2] current lr: 1.267123e-04
Epoch 3/80
[Epoch 3] current lr: 1.900685e-04
Epoch 4/80
[Epoch 4] current lr: 2.534247e-04
Epoch 5/80
[Epoch 5] current lr: 3.167808e-04
Epoch 6/80
[Epoch 6] current lr: 3.801370e-04
Epoch 7/80
[Epoch 7] current lr: 4.434932e-04
Epoch 8/80
[Epoch 8] current lr: 5.068493e-04
Epoch 9/80
[Epoch 9] current lr: 5.702055e-04
Epoch 10/80
[Epoch 10] current lr: 6.335616e-04
Epoch 11/80
[Epoch 11] current lr: 6.969178e-04
Epoch 12/80
[Epoch 12] current lr: 7.602739e-04
Epoch 13/80
[Epoch 13] current lr: 8.236301e-04
Epoch 14/80
[Epoch 14] current lr: 8.869863e-04
Epoch 15/80
[Epoch 15] current lr: 9.503425e-04
Epoch 16/80
[Epoch 16] current lr: 9.999942e-04
Epoch 17/80
[Epoch 17] current lr: 9.998191e-04
Epoch 18/80
[Epoch 18] current lr: 9.993996e-04


In [None]:
# =========================
# 在 Test 集上做最终评估
# =========================
model.load_weights("best_cnn_transformer_ser.h5")  # 载入最优权重

test_loss, test_acc, test_macro_f1 = model.evaluate(test_ds)
print(f"Test Loss: {test_loss:.4f}")
print(f"Test Acc : {test_acc:.4f}")
print(f"Test Macro-F1: {test_macro_f1:.4f}")

# 取出预测概率和标签
y_prob = model.predict(test_ds)
y_pred = np.argmax(y_prob, axis=-1)

# 注意：test_ds 是 batch 的，需要把 y_test 对齐一下（这里我们是直接用 X_test/y_test 组成的 test_ds，顺序一致）
print("classification_report:")
print(classification_report(
    y_test,
    y_pred,
    target_names=[EMO_MAP[c] for c in le.classes_]
))

# 混淆矩阵
cm = confusion_matrix(y_test, y_pred)
cm_norm = cm.astype("float") / cm.sum(axis=1, keepdims=True)

plt.figure(figsize=(8, 6))
sns.heatmap(
    cm_norm,
    annot=True,
    fmt=".2f",
    cmap="Blues",
    xticklabels=[EMO_MAP[c] for c in le.classes_],
    yticklabels=[EMO_MAP[c] for c in le.classes_]
)
plt.xlabel("Predicted")
plt.ylabel("True")
plt.title("Normalized Confusion Matrix (Test)")
plt.tight_layout()
plt.show()
