In [6]:
import os
import random
import shutil
from pathlib import Path
import numpy as np
import torch
import torchaudio
import soundfile as sf
from multiprocessing import Pool, cpu_count
import multiprocessing as mp
from silero_vad import load_silero_vad
from IPython.display import Audio
from scipy.fft import rfft, fft

In [7]:
import numpy as np

def adjust_array_length(arr: np.ndarray, n: int) -> np.ndarray:
    """
    调整输入数组的长度为 n。
    - 若 arr 长度 >= n，则随机抽取 n 个元素（不放回）；
    - 若 arr 长度 < n，则补齐至 n，补充部分来自于 arr 的随机采样（放回）。

    参数:
        arr (np.ndarray): 输入的一维或二维数组（假设按第一个维度为样本维）。
        n (int): 目标长度。

    返回:
        np.ndarray: 调整后的数组，长度为 n。
    """
    arr_len = len(arr)
    if arr_len >= n:
        idx = np.random.choice(arr_len, n, replace=False)
        return arr[idx]
    else:
        deficit = n - arr_len
        idx = np.random.choice(arr_len, deficit, replace=True)
        extra_arr = arr[idx]
        return np.concatenate([arr, extra_arr], axis=0)


In [8]:
import numpy as np
from collections import Counter

# NOISE
X_NOISE = np.load('OFFICIAL/NOISE/03mfcc/X.npy')
Y_NOISE = np.load('OFFICIAL/NOISE/03mfcc/Y.npy')

# 打印样本数量
print(f'✅ X_NOISE 长度：{len(X_NOISE)}')
print(f'✅ Y_NOISE 长度：{len(Y_NOISE)}')

# 打印形状（假设是 numpy 数组）
print(f'📐 X_NOISE 形状: {X_NOISE.shape}')          # e.g. (13600, 8000)
print(f'📐 Y_NOISE 形状: {Y_NOISE.shape}')          # e.g. (13600,)

# 打印标签各类数量
label_counts = Counter(Y_NOISE)
print('📊 标签类别数量分布:')
for label, count in sorted(label_counts.items()):
    print(f'  类别 {label}: {count} 个样本')


✅ X_NOISE 长度：7000
✅ Y_NOISE 长度：7000
📐 X_NOISE 形状: (7000, 8000)
📐 Y_NOISE 形状: (7000,)
📊 标签类别数量分布:
  类别 0: 500 个样本
  类别 1: 1000 个样本
  类别 2: 500 个样本
  类别 3: 1000 个样本
  类别 4: 1000 个样本
  类别 5: 3000 个样本


In [9]:
import numpy as np
from collections import Counter
from pathlib import Path

categories = ['BOARD', 'IPAD', 'DIY', 'OTHERS', 'PC', 'PHONE']
base_path = Path('OFFICIAL')

# 数据字典：每类一个子字典，包含 X, X_mfcc, Y
data_dict = {}

for cat in categories:
    prefix = cat.upper()
    folder = base_path / prefix / '03mfcc'
    try:
        # 加载数据
        X = np.load(folder / 'X.npy')
        X_mfcc = np.load(folder / 'X_mfcc.npy')
        Y = np.load(folder / 'Y.npy')

        # 存入字典
        data_dict[prefix] = {
            'X': X,
            'X_mfcc': X_mfcc,
            'Y': Y
        }

        # 打印信息
        print(f'\n📁 加载类别：{prefix}')
        print(f'✅ X 长度：{len(X)}')
        print(f'✅ Y 长度：{len(Y)}')
        print(f'✅ X_mfcc 长度：{len(X_mfcc)}')
        print(f'📐 X 形状: {X.shape}')
        print(f'📐 X_mfcc 形状: {X_mfcc.shape}')
        print(f'📐 Y 形状: {Y.shape}')

        # 标签统计
        label_counts = Counter(Y)
        print('📊 标签类别数量分布:')
        for label, count in sorted(label_counts.items()):
            print(f'  类别 {label}: {count} 个样本')

    except FileNotFoundError as e:
        print(f'❌ 类别 {prefix} 加载失败: {e}')



📁 加载类别：BOARD
✅ X 长度：14500
✅ Y 长度：14500
✅ X_mfcc 长度：14500
📐 X 形状: (14500, 8000)
📐 X_mfcc 形状: (14500, 31, 13)
📐 Y 形状: (14500,)
📊 标签类别数量分布:
  类别 0: 2900 个样本
  类别 1: 2900 个样本
  类别 2: 2900 个样本
  类别 3: 2900 个样本
  类别 4: 2900 个样本

📁 加载类别：IPAD
✅ X 长度：2500
✅ Y 长度：2500
✅ X_mfcc 长度：2500
📐 X 形状: (2500, 8000)
📐 X_mfcc 形状: (2500, 31, 13)
📐 Y 形状: (2500,)
📊 标签类别数量分布:
  类别 0: 500 个样本
  类别 1: 500 个样本
  类别 2: 500 个样本
  类别 3: 500 个样本
  类别 4: 500 个样本

📁 加载类别：DIY
✅ X 长度：9000
✅ Y 长度：9000
✅ X_mfcc 长度：9000
📐 X 形状: (9000, 8000)
📐 X_mfcc 形状: (9000, 31, 13)
📐 Y 形状: (9000,)
📊 标签类别数量分布:
  类别 0: 1800 个样本
  类别 1: 1800 个样本
  类别 2: 1800 个样本
  类别 3: 1800 个样本
  类别 4: 1800 个样本

📁 加载类别：OTHERS
✅ X 长度：2770
✅ Y 长度：2770
✅ X_mfcc 长度：2770
📐 X 形状: (2770, 8000)
📐 X_mfcc 形状: (2770, 31, 13)
📐 Y 形状: (2770,)
📊 标签类别数量分布:
  类别 4: 2770 个样本

📁 加载类别：PC
✅ X 长度：2500
✅ Y 长度：2500
✅ X_mfcc 长度：2500
📐 X 形状: (2500, 8000)
📐 X_mfcc 形状: (2500, 31, 13)
📐 Y 形状: (2500,)
📊 标签类别数量分布:
  类别 0: 500 个样本
  类别 1: 500 个样本
  类别 2: 500 个样本
  类别 3: 500 个样本
  类别 4: 

In [10]:
def sample_and_concat(data_dict, length_dict):
    """
    根据指定长度从 data_dict 中每个类别抽样（或补齐），返回拼接后的 X, X_mfcc, Y。

    参数:
        data_dict (dict): 类别 -> {'X': ..., 'X_mfcc': ..., 'Y': ...}
        length_dict (dict): 类别 -> 每类取样数（int）

    返回:
        X_all, X_mfcc_all, Y_all: 拼接后的 numpy 数组
    """
    X_list = []
    X_mfcc_list = []
    Y_list = []

    for cat, n in length_dict.items():
        if cat not in data_dict:
            print(f"⚠️ 类别 {cat} 不在 data_dict 中，跳过。")
            continue

        group = data_dict[cat]
        X = adjust_array_length(group['X'], n)
        X_mfcc = adjust_array_length(group['X_mfcc'], n)
        Y = adjust_array_length(group['Y'], n)

        X_list.append(X)
        X_mfcc_list.append(X_mfcc)
        Y_list.append(Y)

        print(f"✅ 类别 {cat}: 已采样 {n} 条")

    X_all = np.concatenate(X_list, axis=0)
    X_mfcc_all = np.concatenate(X_mfcc_list, axis=0)
    Y_all = np.concatenate(Y_list, axis=0)

    print(f"🎯 合并后总样本数: {len(Y_all)}")
    return X_all, X_mfcc_all, Y_all


In [7]:
length_dict = {
    'BOARD': 15000,
    'PHONE': 3000,
    'IPAD': 3000,
    'PC': 3000,
    'OTHERS': 2000,
    'DIY': 9000
}

X, X_mfcc, Y = sample_and_concat(data_dict, length_dict)

✅ 类别 BOARD: 已采样 15000 条
✅ 类别 PHONE: 已采样 3000 条
✅ 类别 IPAD: 已采样 3000 条
✅ 类别 PC: 已采样 3000 条
✅ 类别 OTHERS: 已采样 2000 条
✅ 类别 DIY: 已采样 9000 条
🎯 合并后总样本数: 35000


In [8]:
output_dir = Path("OFFICIAL_ALL/NO_NOISE")
output_dir.mkdir(parents=True, exist_ok=True)

np.save(output_dir / "X.npy", X)
np.save(output_dir / "X_mfcc.npy", X_mfcc)
np.save(output_dir / "Y.npy", Y)

## 加噪声

In [9]:

def mix_audio_batches(signal_batch, noise_batch, snr_list, normalize_audio_batch):
    """
    从 signal_batch (n, 8000) 和 noise_batch (m, 8000) 中随机混音，按 snr_list 中的 SNR 进行混音，直至 signal_batch 抽完。
    返回: 一个字典，键为 SNR，值为对应混音后的 numpy 数组 (n, 8000)。
    """
    n, _ = signal_batch.shape
    m, _ = noise_batch.shape

    # 随机顺序抽取 signal_batch（不放回）
    signal_indices = np.random.permutation(n)

    # 为每个 SNR 创建空列表
    mixed_audio_dict = {snr: [] for snr in snr_list}

    for idx in signal_indices:
        # 当前信号
        signal = signal_batch[idx]

        # 随机从 noise_batch（有放回）抽一个
        noise_idx = np.random.randint(0, m)
        noise = noise_batch[noise_idx]

        # 计算信号和噪声功率
        signal_power = np.mean(signal ** 2)
        noise_power = np.mean(noise ** 2)

        # 避免除0
        if noise_power == 0:
            noise_power = 1e-12

        for snr in snr_list:
            # 计算缩放因子
            target_noise_power = signal_power / (10 ** (snr / 10))
            scaling_factor = np.sqrt(target_noise_power / noise_power)

            # 混音
            mixed = signal + scaling_factor * noise
            mixed_audio_dict[snr].append(mixed)

    # 转换为 numpy 数组并进行归一化
    for snr in snr_list:
        mixed_audio = np.stack(mixed_audio_dict[snr], axis=0)
        mixed_audio = normalize_audio_batch(mixed_audio)
        mixed_audio_dict[snr] = mixed_audio

    return mixed_audio_dict


def concatenate_mixed_audio(mixed_audio_dict: dict) -> np.ndarray:
    """
    将混音字典中的所有 SNR 的 numpy 数组拼接为一个大的 numpy 数组。
    
    参数:
    mixed_audio_dict: dict，键是 SNR，值是 (n, 8000) 的 numpy 数组。
    
    返回:
    拼接后的 numpy 数组，形状是 (sum_n, 8000)。
    """
    # 取出所有值（每个 numpy 数组）并拼接
    concatenated = np.concatenate(list(mixed_audio_dict.values()), axis=0)
    return concatenated

In [10]:
# 混音
snr_list = [-3,0,10,100]
mixed_audio_dict = mix_audio_batches(X, X_NOISE, snr_list, normalize_audio_batch)
mixed_audio_numpy = concatenate_mixed_audio(mixed_audio_dict)

NameError: name 'normalize_audio_batch' is not defined

# 训练

In [12]:
root_dir = Path(f'OFFICIAL_ALL/NO_NOISE')

X = np.load(root_dir / "X.npy")
Y = np.load(root_dir / "Y.npy")
X_mfcc = np.load(root_dir / "X_mfcc.npy")

# 打印加载后的数据形状
print("加载后的 X 形状:", X.shape)  
print("加载后的 Y 形状:", Y.shape)
print("加载后的 X_mfcc 形状:", X_mfcc.shape)

# 打印标签  
unique_labels, counts = np.unique(Y, return_counts=True)
label_distribution = dict(zip(unique_labels, counts))
print("标签分布:", label_distribution)

加载后的 X 形状: (35000, 8000)
加载后的 Y 形状: (35000,)
加载后的 X_mfcc 形状: (35000, 31, 13)
标签分布: {np.int32(0): np.int64(6591), np.int32(1): np.int64(6577), np.int32(2): np.int64(6646), np.int32(3): np.int64(6602), np.int32(4): np.int64(8584)}


In [13]:
print("去掉第一维前形状:", X_mfcc.shape)
X_mfcc_no_first = X_mfcc[:, :, 1:]
print("去掉第一维后形状:", X_mfcc_no_first.shape)
X_mfcc_no_first = X_mfcc_no_first.reshape(X_mfcc_no_first.shape[0], 31, 12, 1)  
print("X_mfcc 形状:", X_mfcc_no_first.shape)  

去掉第一维前形状: (35000, 31, 13)
去掉第一维后形状: (35000, 31, 12)
X_mfcc 形状: (35000, 31, 12, 1)


In [None]:
import tensorflow as tf
import numpy as np
from sklearn.model_selection import train_test_split
# 说话人识别模型（修正：使用 padding='same'）
def create_speaker_model():
    # 定义输入
    inputs = tf.keras.Input(shape=(31, 12, 1))
    
    # 逐层传递x
    # 第一步：Conv2D卷积操作（不包含激活函数）
    x = tf.keras.layers.Conv2D(1024, kernel_size=(5, 12), padding='valid')(inputs)
    # 第二步：单独应用ReLU激活函数
    x = tf.keras.layers.ReLU()(x)
    x = tf.keras.layers.AveragePooling2D(pool_size=(27, 1), padding='valid')(x)
    x = tf.keras.layers.Flatten()(x)
    outputs = tf.keras.layers.Dense(5, activation='softmax')(x)
    
    # 创建模型
    model = tf.keras.Model(inputs=inputs, outputs=outputs)
    return model
# 量化转换函数
def quantize_model(model, X_sample):
    converter = tf.lite.TFLiteConverter.from_keras_model(model)
    converter.optimizations = [tf.lite.Optimize.DEFAULT]
    converter.target_spec.supported_ops = [tf.lite.OpsSet.TFLITE_BUILTINS_INT8]
    converter.inference_input_type = tf.int8
    converter.inference_output_type = tf.int8

    def representative_dataset():
        for i in range(min(100, len(X_sample))):
            data = X_sample[i:i+1]
            yield [tf.cast(data, tf.float32)]
    
    converter.representative_dataset = representative_dataset
    tflite_quant_model = converter.convert()
    return tflite_quant_model


# 数据划分
spk_X_train, spk_X_val, spk_Y_train, spk_Y_val = train_test_split(X_mfcc_no_first, Y, test_size=0.1, random_state=42) 

# 标签 one-hot

spk_Y_train_cat = tf.keras.utils.to_categorical(spk_Y_train, num_classes=5)
spk_Y_val_cat = tf.keras.utils.to_categorical(spk_Y_val, num_classes=5)

# 创建模型
spk_model = create_speaker_model()

# 编译
spk_model.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

# 训练
print("\n开始训练 说话人识别模型")
# class_weight = {0: 1.0, 1: 2.0, 2: 2.0}  # 类别 1、2 权重更高
from tensorflow.keras.callbacks import EarlyStopping
# 配置 EarlyStopping 回调
earlystop = EarlyStopping(
    monitor='val_loss',       # 监控的指标，常用 'val_loss' 或 'val_accuracy'
    patience=20,               # 容忍多少个 epoch 没有提升
    restore_best_weights=True, # 回到验证集表现最好的模型
    verbose=1                 # 显示早停信息
)
spk_history = spk_model.fit(
    spk_X_train, spk_Y_train_cat,
    validation_data=(spk_X_val, spk_Y_val_cat),
    epochs=100,
    batch_size=32,
    callbacks=[earlystop],     # 添加回调函数
    # class_weight=class_weight
)

# 量化转换
print("量化 说话人识别模型")
spk_quant_tflite = quantize_model(spk_model, spk_X_train[:1000])  # 添加一个维度以匹配输入形状 (n, 31, 13, 1)
file_path = "spk-OFFICIAL_ALL-NO_NOISE_hid1024_5X12_epoch100.tflite"
with open(file_path, "wb") as f:
    f.write(spk_quant_tflite)

print(f"\n✅ 两个模型已训练并量化完成！文件: {file_path}")


开始训练 说话人识别模型
Epoch 1/100


[1m805/985[0m [32m━━━━━━━━━━━━━━━━[0m[37m━━━━[0m [1m1s[0m 6ms/step - accuracy: 0.4877 - loss: 1.2688

KeyboardInterrupt: 