## 随机抽取测试集计算WER与RTF (无GPU加速)

In [None]:
import time
import os
import pathlib
import pickle
import random
import json
from functools import reduce

# 禁用GPU
os.environ["CUDA_VISIBLE_DEVICES"] = "-1"
os.environ["TF_CPP_MIN_LOG_LEVEL"] = "3"

import librosa
import numpy as np
import pandas as pd
from tqdm import tqdm
from scipy.fftpack import dct

import tensorflow as tf

keras = tf.keras
ctc_decode, get_value, load_model = (
    keras.backend.ctc_decode,
    keras.backend.get_value,
    keras.models.load_model,
)

# 音频/语音标注文件路径
DS_PATH = "data/"
# 模型文件路径
FILES_PATH = "output/"

# mfcc特征维数
MFCC_VALUE = 32
# FFT计算点数
FFT = 512
# 窗长
WINLEN = 0.032

# 音频文件路径
data_path = sorted([str(p) for p in pathlib.Path(DS_PATH).glob("*.wav")])
# 语音标注文件路径
label_path = sorted([str(p) for p in pathlib.Path(DS_PATH).glob("*.trn")])

# 音频时长
SOUND_TIMES = []

# 读取音频模型均值, 标准差
with open(FILES_PATH + "dataset/data_mfcc.pkl", "rb") as file:
    _, mfcc_mean, mfcc_std = pickle.load(file)
    del _

# 读取词库
with open(FILES_PATH + "dataset/words_vec.pkl", "rb") as file:
    char2id, id2char = pickle.load(file)


def loadMfcc(filePath, mfccValue):
    # 读取文件
    data, sr = librosa.load(path=filePath, sr=None)
    SOUND_TIMES.append(librosa.get_duration(y=data, sr=sr))

    # 去除音频中所有的空白静默部分
    y_split = librosa.effects.split(data, top_db=23)
    y_split = np.array(
        list(
            reduce(
                lambda x, y: np.concatenate((x, y)),
                [data[x[0] : x[1]] for x in y_split],
            )
        )
    )

    # 预加重
    y_split = librosa.effects.preemphasis(y_split, coef=0.97)

    # 提取MFCC特征
    y_mfcc = librosa.feature.mfcc(
        y=y_split,
        sr=sr,
        n_mfcc=mfccValue,
        n_fft=512,
        lifter=22,
        hop_length=int(sr * 0.01),
        win_length=int(sr * 0.025),
    )
    # 特征矩阵转置
    y_mfcc = y_mfcc.transpose()
    # 标准化
    y_mfcc = (y_mfcc - mfcc_mean) / (mfcc_std + 1e-14)

    return y_mfcc


def ctc_decode_model(pred_audio, dict_list, model):
    # 使用模型预测结果 (expand_dims用于增加维度匹配模型输入)
    pred = model.predict(np.expand_dims(pred_audio, axis=0), verbose=0)
    # 音频帧数
    input_length = np.array((pred_audio.shape[1],))

    # 解码
    decode_res = ctc_decode(pred, input_length, greedy=True)
    # 获取预测的序列数组, 筛选有效值 (> -1)
    pred_index = get_value(decode_res[0][0])
    pred_index = [item for item in pred_index[0] if item > -1]

    # 使用词库将预测标号转换为文本
    pred_text = ""
    for index in pred_index:
        pred_text += dict_list[index]

    # 返回预测的文本及其标号
    return pred_text, pred_index


In [None]:
def levenshtein_distance(hypothesis: list, reference: list):
    """编辑距离
    计算两个序列的levenshtein distance，可用于计算 WER/CER
    参考资料：
        https://www.cuelogic.com/blog/the-levenshtein-algorithm
        https://martin-thoma.com/word-error-rate-calculation/

    C: correct
    W: wrong
    I: insert
    D: delete
    S: substitution

    :param hypothesis: 预测序列
    :param reference: 真实序列
    :return: 1: 错误操作，所需要的 S，D，I 操作的次数;
             2: ref 与 hyp 的所有对齐下标
             3: 返回 C、W、S、D、I 各自的数量
    """
    len_hyp = len(hypothesis)
    len_ref = len(reference)
    cost_matrix = np.zeros((len_hyp + 1, len_ref + 1), dtype=np.int16)

    # 记录所有的操作，0-equal；1-insertion；2-deletion；3-substitution
    ops_matrix = np.zeros((len_hyp + 1, len_ref + 1), dtype=np.int8)

    for i in range(len_hyp + 1):
        cost_matrix[i][0] = i
    for j in range(len_ref + 1):
        cost_matrix[0][j] = j

    # 生成 cost 矩阵和 operation矩阵，i:外层hyp，j:内层ref
    for i in range(1, len_hyp + 1):
        for j in range(1, len_ref + 1):
            if hypothesis[i - 1] == reference[j - 1]:
                cost_matrix[i][j] = cost_matrix[i - 1][j - 1]
            else:
                substitution = cost_matrix[i - 1][j - 1] + 1
                insertion = cost_matrix[i - 1][j] + 1
                deletion = cost_matrix[i][j - 1] + 1

                # compare_val = [insertion, deletion, substitution]   # 优先级
                compare_val = [substitution, insertion, deletion]  # 优先级

                min_val = min(compare_val)
                operation_idx = compare_val.index(min_val) + 1
                cost_matrix[i][j] = min_val
                ops_matrix[i][j] = operation_idx

    match_idx = []  # 保存 hyp与ref 中所有对齐的元素下标
    i = len_hyp
    j = len_ref
    nb_map = {"N": len_ref, "C": 0, "W": 0, "I": 0, "D": 0, "S": 0}
    while i >= 0 or j >= 0:
        i_idx = max(0, i)
        j_idx = max(0, j)

        if ops_matrix[i_idx][j_idx] == 0:  # correct
            if i - 1 >= 0 and j - 1 >= 0:
                match_idx.append((j - 1, i - 1))
                nb_map["C"] += 1

            # 出边界后，这里仍然使用，应为第一行与第一列必然是全零的
            i -= 1
            j -= 1
        # elif ops_matrix[i_idx][j_idx] == 1:   # insert
        elif ops_matrix[i_idx][j_idx] == 2:  # insert
            i -= 1
            nb_map["I"] += 1
        # elif ops_matrix[i_idx][j_idx] == 2:   # delete
        elif ops_matrix[i_idx][j_idx] == 3:  # delete
            j -= 1
            nb_map["D"] += 1
        # elif ops_matrix[i_idx][j_idx] == 3:   # substitute
        elif ops_matrix[i_idx][j_idx] == 1:  # substitute
            i -= 1
            j -= 1
            nb_map["S"] += 1

        # 出边界处理
        if i < 0 and j >= 0:
            nb_map["D"] += 1
        elif j < 0 and i >= 0:
            nb_map["I"] += 1

    match_idx.reverse()
    wrong_cnt = cost_matrix[len_hyp][len_ref]
    nb_map["W"] = wrong_cnt

    return nb_map


In [None]:
# 随机读取测试集语音
TEST_SIZE = 2496

index_list = random.sample(range(0, len(data_path)), TEST_SIZE)
sound_mfcc = []
text_list = []


for i in tqdm(index_list):
    mfcc = loadMfcc(data_path[i], MFCC_VALUE)
    sound_mfcc.append(mfcc)

    # 读取语音对应文本
    with open(label_path[i], "r", encoding="utf-8") as file:
        sound_text = file.readline().strip().replace(" ", "")
        text_list.append(sound_text)

In [None]:
# 加载模型
models_path = [str(x) for x in pathlib.Path(FILES_PATH + "models/").glob("**/*.h5")]
models = []

for model in models_path:
    models.append(load_model(model))

In [None]:
wer_per_model = []
rtf_per_model = []
histories = {}

for pre_model, path in tqdm(zip(models, models_path)):
    wer_list = []
    rtf_list = []
    histories[path] = {"data": [], "wer": -1, "rtf": -1}

    for sound, times, text in zip(sound_mfcc, SOUND_TIMES, text_list):
        # 预测序列
        start = time.time()
        pred_text, _ = ctc_decode_model(sound, id2char, pre_model)
        end = time.time() - start

        # WER
        nb_map = levenshtein_distance(hypothesis=list(pred_text), reference=list(text))
        wer = ((nb_map["S"] + nb_map["D"] + nb_map["I"]) / nb_map["N"]) * 100
        wer_list.append(wer)

        # RTF
        rtf = end / times
        rtf_list.append(rtf)

        # 保存记录
        histories[path]["data"].append(
            {"reference": text, "hypothesis": pred_text, "wer": wer, "rtf": rtf}
        )

    # 所有wer平均值
    wer_per_model.append(np.mean(wer_list))
    histories[path]["wer"] = np.mean(wer_list)
    # 所有rtf平均值
    rtf_per_model.append(np.mean(rtf_list))
    histories[path]["rtf"] = np.mean(rtf_list)

In [None]:
for name, avg_wer, avg_rtf in zip(models_path, wer_per_model, rtf_per_model):
    print(name, "\t\t%.1f%%" % avg_wer, "\t\t%.4f" % avg_rtf)

In [None]:
# 保存计算记录
with open("output/wer_rtf.json", "w", encoding="utf-8") as file:
    json.dump(histories, file, ensure_ascii=False)