In [5]:
import os

from pydub import AudioSegment
import numpy as np
import matplotlib.pyplot as plt
import librosa
from scipy.stats import skew, kurtosis
from mpl_toolkits.axes_grid1 import host_subplot
import matplotlib.ticker as mtick



In [8]:
current_path = os.getcwd()
input_file = os.path.join(current_path, "test.amr")
audio = AudioSegment.from_file(input_file, format="amr")
audio.export("output_preprocess_raw.wav", format="wav")
wave_file = os.path.join(current_path, "output_preprocess_raw.wav")
wave_file_vad = wave_file.split(".")[0] + "_vad.wav"

In [None]:
def _print(bl=True, s=None):
    if bl:
        print(s)
    else:
        pass

In [None]:
class Spectrogram:
    """声谱图（语谱图）特征"""

    def __init__(
        self,
        input_file,
        sr=None,
        frame_len=512,
        n_fft=None,
        win_step=2 / 3,
        window="hamming",
        preemph=0.97,
    ):
        """
        初始化
        :param input_file: 输入音频文件
        :param sr: 所输入音频文件的采样率，默认为None
        :param frame_len: 帧长，默认512个采样点(32ms,16kHz),与窗长相同
        :param n_fft: FFT窗口的长度，默认与窗长相同
        :param win_step: 窗移，默认移动2/3，512*2/3=341个采样点(21ms,16kHz)
        :param window: 窗类型，默认汉明窗
        :param preemph: 预加重系数,默认0.97
        """
        self.input_file = input_file
        self.wave_data, self.sr = librosa.load(
            self.input_file, sr=sr
        )  # 音频全部采样点的归一化数组形式数据
        self.wave_data = librosa.effects.preemphasis(
            self.wave_data, coef=preemph
        )  # 预加重，系数0.97
        self.window_len = frame_len  # 窗长512
        if n_fft is None:
            self.fft_num = self.window_len  # 设置NFFT点数与窗长相等
        else:
            self.fft_num = n_fft
        self.hop_length = round(
            self.window_len * win_step
        )  # 重叠部分采样点数设置为窗长的1/3（1/3~1/2）,即帧移(窗移)2/3
        self.window = window

    def get_magnitude_spectrogram(self):
        """
        获取幅值谱:fft后取绝对值
        :return: np.ndarray[shape=(1 + n_fft/2, n_frames), dtype=float32]，（257，全部采样点数/(512*2/3)+1）
        """
        # 频谱矩阵：行数=1 + n_fft/2=257，列数=帧数n_frames=全部采样点数/(512*2/3)+1（向上取整）
        # 快速傅里叶变化+汉明窗
        mag_spec = np.abs(
            librosa.stft(
                self.wave_data,
                n_fft=self.fft_num,
                hop_length=self.hop_length,
                win_length=self.window_len,
                window=self.window,
            )
        )
        return mag_spec

    def get_power_spectrogram(self):
        """
        获取功率谱（能量谱）：幅值谱平方
        :return: np.ndarray[shape=(1 + n_fft/2, n_frames), dtype=float32]，（257，全部采样点数/(512*2/3)+1）
        """
        pow_spec = np.square(self.get_magnitude_spectrogram())
        return pow_spec

    def get_log_power_spectrogram(self):
        """
        获取log尺度功率谱（能量谱）：幅值谱平方S(也就是功率谱),10 * log10(S / ref),其中ref指定为S的最大值
        :return: np.ndarray[shape=(1 + n_fft/2, n_frames), dtype=float32]，（257，全部采样点数/(512*2/3)+1）
        """
        log_pow_spec = librosa.amplitude_to_db(
            self.get_magnitude_spectrogram(), ref=np.max
        )  # 转换为log尺度
        return log_pow_spec

    def get_mel_spectrogram(self, n_mels=26):
        """
        获取Mel谱:
        :param n_mels: Mel滤波器组的滤波器数量，默认26
        :return: np.ndarray[shape=(n_mels, n_frames), dtype=float32]，（26，全部采样点数/(512*2/3)+1）
        """
        # 频谱矩阵：行数=n_mels=26，列数=帧数n_frames=全部采样点数/(512*2/3)+1（向上取整）
        # 快速傅里叶变化+汉明窗,Mel滤波器组的滤波器数量 = 26
        mel_spec = librosa.feature.melspectrogram(
            self.wave_data,
            self.sr,
            n_fft=self.fft_num,
            hop_length=self.hop_length,
            win_length=self.window_len,
            window=self.window,
            n_mels=n_mels,
        )
        log_mel_spec = librosa.power_to_db(mel_spec)  # 转换为log尺度
        return log_mel_spec

    def plot(self, fig=None, show=True, **kwargs):
        """
        绘制声谱图
        :param fig: 指定绘制何种声谱图，mag/pow/log_pow/mel,默认都绘制
        :param show: 默认最后调用plt.show()，显示图形
        :return: None
        """
        if fig == "mag":
            mag_spec = self.get_magnitude_spectrogram()
            librosa.display.specshow(
                mag_spec,
                sr=self.sr,
                hop_length=self.hop_length,
                x_axis="s",
                y_axis="linear",
            )
            plt.title("Magnitude Spectrogram")
            plt.xlabel("Time/ms")
            plt.ylabel("Frequency/Hz")
            plt.gca().xaxis.set_major_formatter(mtick.FuncFormatter(func_format))
            plt.colorbar(shrink=0.7)
        elif fig == "pow":
            pow_spec = self.get_power_spectrogram()
            librosa.display.specshow(
                pow_spec,
                sr=self.sr,
                hop_length=self.hop_length,
                x_axis="s",
                y_axis="linear",
            )
            plt.title("Power Spectrogram")
            plt.xlabel("Time/ms")
            plt.ylabel("Frequency/Hz")
            plt.gca().xaxis.set_major_formatter(mtick.FuncFormatter(func_format))
            plt.colorbar(shrink=0.7)
        elif fig == "log_pow":
            log_pow_spec = self.get_log_power_spectrogram()
            librosa.display.specshow(
                log_pow_spec,
                sr=self.sr,
                hop_length=self.hop_length,
                x_axis="s",
                y_axis="log",
            )
            plt.title("Log-Power Spectrogram")
            plt.xlabel("Time/ms")
            plt.ylabel("Frequency/Hz")
            plt.gca().xaxis.set_major_formatter(mtick.FuncFormatter(func_format))
            plt.colorbar(shrink=0.7, format="%+02.0f dB")
        elif fig == "mel":
            mel_spec = self.get_mel_spectrogram(**kwargs)
            librosa.display.specshow(
                mel_spec,
                sr=self.sr,
                hop_length=self.hop_length,
                x_axis="s",
                y_axis="mel",
            )
            plt.xlabel("Time/ms")
            plt.ylabel("Frequency/Hz")
            plt.gca().xaxis.set_major_formatter(mtick.FuncFormatter(func_format))
            plt.title("Log-Mel Spectrogram")
            plt.colorbar(shrink=0.7, format="%+02.0f dB")
        else:
            plt.figure(figsize=(16, 8))
            plt.subplot(2, 2, 1)
            mag_spec = self.get_magnitude_spectrogram()
            librosa.display.specshow(
                mag_spec,
                sr=self.sr,
                hop_length=self.hop_length,
                x_axis="s",
                y_axis="linear",
            )
            plt.title("Magnitude Spectrogram")
            plt.xlabel("Time/ms")
            plt.ylabel("Frequency/Hz")
            plt.gca().xaxis.set_major_formatter(mtick.FuncFormatter(func_format))
            plt.colorbar(shrink=0.7)

            plt.subplot(2, 2, 2)
            pow_spec = self.get_power_spectrogram()
            librosa.display.specshow(
                pow_spec,
                sr=self.sr,
                hop_length=self.hop_length,
                x_axis="s",
                y_axis="linear",
            )
            plt.title("Power Spectrogram")
            plt.xlabel("Time/ms")
            plt.ylabel("Frequency/Hz")
            plt.gca().xaxis.set_major_formatter(mtick.FuncFormatter(func_format))
            plt.colorbar(shrink=0.7)

            plt.subplot(2, 2, 3)
            log_pow_spec = self.get_log_power_spectrogram()
            librosa.display.specshow(
                log_pow_spec,
                sr=self.sr,
                hop_length=self.hop_length,
                x_axis="s",
                y_axis="linear",
            )
            plt.title("Log-Power Spectrogram")
            plt.xlabel("Time/ms")
            plt.ylabel("Frequency/Hz")
            # plt.gca().xaxis.set_major_formatter(mtick.FuncFormatter(func_format))
            plt.colorbar(shrink=0.7, format="%+02.0f dB")

            plt.subplot(2, 2, 4)
            mel_spec = self.get_mel_spectrogram(**kwargs)
            librosa.display.specshow(
                mel_spec,
                sr=self.sr,
                hop_length=self.hop_length,
                x_axis="s",
                y_axis="mel",
            )
            plt.title("Log-Mel Spectrogram")
            plt.xlabel("Time/ms")
            plt.ylabel("Frequency/Hz")
            plt.gca().xaxis.set_major_formatter(mtick.FuncFormatter(func_format))
            plt.colorbar(shrink=0.7, format="%+02.0f dB")

        plt.tight_layout()
        if show:
            plt.show()

In [None]:
class RhythmFeatures:
    """韵律学特征"""

    def __init__(
        self,
        input_file,
        sr=None,
        frame_len=512,
        n_fft=None,
        win_step=2 / 3,
        window="hamming",
    ):
        """
        初始化
        :param input_file: 输入音频文件
        :param sr: 所输入音频文件的采样率，默认为None
        :param frame_len: 帧长，默认512个采样点(32ms,16kHz),与窗长相同
        :param n_fft: FFT窗口的长度，默认与窗长相同
        :param win_step: 窗移，默认移动2/3，512*2/3=341个采样点(21ms,16kHz)
        :param window: 窗类型，默认汉明窗
        """
        self.input_file = input_file
        self.frame_len = frame_len  # 帧长，单位采样点数
        self.wave_data, self.sr = librosa.load(self.input_file, sr=sr)
        self.window_len = frame_len  # 窗长512
        if n_fft is None:
            self.fft_num = self.window_len  # 设置NFFT点数与窗长相等
        else:
            self.fft_num = n_fft
        self.win_step = win_step
        self.hop_length = round(
            self.window_len * win_step
        )  # 重叠部分采样点数设置为窗长的1/3（1/3~1/2）,即帧移(窗移)2/3
        self.window = window

    def lld(self, **kwargs):
        """
        LLDs（low level descriptors）指的是手工设计的一些低水平特征。
        LLDs一般是在一帧frame语音上进行的计算，是用来表示一帧语音的特征。
        :param kwargs: activity_detect参数
        :return: 浊音(1，n)、轻音段(1，2*n)、有效语音段持续时间(1，n)，单位ms,numpy.uint32
                基频F0，单位Hz、一阶、二阶差分(1，按列拉直提取非0元素后个数，>=n_frames),numpy.float32
                对数能量值、一阶、二阶差分(1，n_frames),numpy.float32
                短时能量、一阶、二阶差分(1，无加窗n_frames),numpy.float64
                过零率，单位次,uint32、一阶、二阶差分(1，无加窗n_frames),numpy.float64
                声压级，单位dB、一阶、二阶差分(1，无加窗n_frames),numpy.float64
        """
        duration_voiced, duration_unvoiced, duration_all = self.duration(**kwargs)
        f0, mag = self.pitch()
        f0 = f0.T[np.nonzero(f0.T)]  # 按列提取非0元素，组成一维数组
        f0_de = librosa.feature.delta(f0, width=3)
        f0_de2 = librosa.feature.delta(f0, width=3, order=2)
        energy = np.log(self.energy())
        energy_de = librosa.feature.delta(energy, width=3)
        energy_de2 = librosa.feature.delta(energy, width=3, order=2)
        ste = self.short_time_energy()
        ste_de = librosa.feature.delta(ste, width=3)
        ste_de2 = librosa.feature.delta(ste, width=3, order=2)
        zcr = self.zero_crossing_rate()
        zcr_de = librosa.feature.delta(zcr, width=3)
        zcr_de2 = librosa.feature.delta(zcr, width=3, order=2)
        spl = self.intensity()
        spl_de = librosa.feature.delta(spl, width=3)
        spl_de2 = librosa.feature.delta(spl, width=3, order=2)
        return (
            duration_voiced,
            duration_unvoiced,
            duration_all,
            f0,
            f0_de,
            f0_de2,
            energy,
            energy_de,
            energy_de2,
            ste,
            ste_de,
            ste_de2,
            zcr,
            zcr_de,
            zcr_de2,
            spl,
            spl_de,
            spl_de2,
        )

    def hsf(self, **kwargs):
        """
        HSFs（high level statistics functions）是在LLDs的基础上做一些统计而得到的特征，比如均值，最值等。
        HSFs是对一段语音utterance上的多帧语音做统计，是用来表示一个utterance的特征。
        :param kwargs: lld参数:activity_detect参数
        :return: 1*120维HSFs特征,numpy.float64: 浊音/轻音/有效语音段duration的最小值/最大值/极差/均值/标准差（第0-14维）；
                 F0/F0_de/F0_de2的最小值/最大值/极差/均值/标准差/偏度/峰度（第15-35维）；
                 energy/energy_de/energy_de2的最小值/最大值/极差/均值/标准差/偏度/峰度（第36-56维）；
                 ste/ste_de/ste_de2的最小值/最大值/极差/均值/标准差/偏度/峰度（第57-77维）；
                 zcr/zcr_de/zcr_de2的最小值/最大值/极差/均值/标准差/偏度/峰度（第78-98维）；
                 spl/spl_de/spl_de2的最小值/最大值/极差/均值/标准差/偏度/峰度（第99-119维）
        """
        llds = self.lld(**kwargs)
        hsfs = []
        for i in range(len(llds)):
            hsfs = np.append(
                hsfs,
                [
                    np.min(llds[i]),
                    np.max(llds[i]),
                    np.ptp(llds[i]),
                    np.mean(llds[i]),
                    np.std(llds[i]),
                ],
            )
            if i > 2:  # 前3个为duration，不计算其偏度和峰度
                hsfs = np.append(hsfs, [skew(llds[i]), kurtosis(llds[i])])
        return hsfs

    def short_time_energy(self):
        """
        计算语音短时能量：每一帧中所有语音信号的平方和
        :return: 语音短时能量列表(值范围0-每帧归一化后能量平方和，这里帧长512，则最大值为512)，
        np.ndarray[shape=(1，无加窗，帧移为0的n_frames), dtype=float64]
        """
        energy = []  # 语音短时能量列表
        energy_sum_per_frame = 0  # 每一帧短时能量累加和
        for i in range(len(self.wave_data)):  # 遍历每一个采样点数据
            energy_sum_per_frame += self.wave_data[i] ** 2  # 求语音信号能量的平方和
            if (i + 1) % self.frame_len == 0:  # 一帧所有采样点遍历结束
                energy.append(energy_sum_per_frame)  # 加入短时能量列表
                energy_sum_per_frame = 0  # 清空和
            elif i == len(self.wave_data) - 1:  # 不满一帧，最后一个采样点
                energy.append(energy_sum_per_frame)  # 将最后一帧短时能量加入列表
        energy = np.array(energy)
        energy = np.where(
            energy == 0, np.finfo(np.float64).eps, energy
        )  # 避免能量值为0，防止后续取log出错(eps是取非负的最小值)
        return energy

    def zero_crossing_rate(self):
        """
        计算语音短时过零率：单位时间(每帧)穿过横轴（过零）的次数
        :return: 每帧过零率次数列表，np.ndarray[shape=(1，无加窗，帧移为0的n_frames), dtype=uint32]
        """
        zcr = []  # 语音短时过零率列表
        counting_sum_per_frame = 0  # 每一帧过零次数累加和，即过零率
        for i in range(len(self.wave_data)):  # 遍历每一个采样点数据
            if i % self.frame_len == 0:  # 开头采样点无过零，因此每一帧的第一个采样点跳过
                continue
            if self.wave_data[i] * self.wave_data[i - 1] < 0:  # 相邻两个采样点乘积小于0，则说明穿过横轴
                counting_sum_per_frame += 1  # 过零次数加一
            if (i + 1) % self.frame_len == 0:  # 一帧所有采样点遍历结束
                zcr.append(counting_sum_per_frame)  # 加入短时过零率列表
                counting_sum_per_frame = 0  # 清空和
            elif i == len(self.wave_data) - 1:  # 不满一帧，最后一个采样点
                zcr.append(counting_sum_per_frame)  # 将最后一帧短时过零率加入列表
        return np.array(zcr, dtype=np.uint32)

    def energy(self):
        """
        每帧内所有采样点的幅值平方和作为能量值
        :return: 每帧能量值，np.ndarray[shape=(1，n_frames), dtype=float64]
        """
        mag_spec = np.abs(
            librosa.stft(
                self.wave_data,
                n_fft=self.fft_num,
                hop_length=self.hop_length,
                win_length=self.frame_len,
                window=self.window,
            )
        )
        pow_spec = np.square(mag_spec)
        energy = np.sum(pow_spec, axis=0)
        energy = np.where(
            energy == 0, np.finfo(np.float64).eps, energy
        )  # 避免能量值为0，防止后续取log出错(eps是取非负的最小值)
        return energy

    def intensity(self):
        """
        计算声音强度，用声压级表示：每帧语音在空气中的声压级Sound Pressure Level(SPL)，单位dB
        公式：20*lg(P/Pref)，P为声压（Pa），Pref为参考压力(听力阈值压力)，一般为2.0*10-5 Pa
        这里P认定为声音的幅值：求得每帧所有幅值平方和均值，除以Pref平方，再取10倍lg
        :return: 每帧声压级，dB，np.ndarray[shape=(1，无加窗，帧移为0的n_frames), dtype=float64]
        """
        p0 = 2.0e-5  # 听觉阈限压力auditory threshold pressure: 2.0*10-5 Pa
        e = self.short_time_energy()
        spl = 10 * np.log10(1 / (np.power(p0, 2) * self.frame_len) * e)
        return spl

    def duration(self, **kwargs):
        """
        持续时间：浊音、轻音段持续时间，有效语音段持续时间,一段有效语音段由浊音段+浊音段两边的轻音段组成
        :param kwargs: activity_detect参数
        :return: np.ndarray[dtype=uint32],浊音shape=(1，n)、轻音段shape=(1，2*n)、有效语音段持续时间列表shape=(1，n)，单位ms
        """
        wav_dat_split_f, wav_dat_split, voiced_f, unvoiced_f = self.activity_detect(
            **kwargs
        )  # 端点检测
        duration_voiced = []  # 浊音段持续时间
        duration_unvoiced = []  # 轻音段持续时间
        duration_all = []  # 有效语音段持续时间
        if np.array(voiced_f).size > 1:  # 避免语音过短，只有一帧浊音段
            for voiced in voiced_f:  # 根据帧分割计算浊音段持续时间，两端闭区间
                duration_voiced.append(
                    round((voiced[1] - voiced[0] + 1) * self.frame_len / self.sr * 1000)
                )
        else:  # 只有一帧时
            duration_voiced.append(round(self.frame_len / self.sr * 1000))
        for unvoiced in unvoiced_f:  # 根据帧分割计算清音段持续时间，浊音段左侧左闭右开，浊音段右侧左开右闭
            duration_unvoiced.append(
                round((unvoiced[1] - unvoiced[0]) * self.frame_len / self.sr * 1000)
            )
        if len(duration_unvoiced) <= 1:  # 避免语音过短，只有一帧浊音段
            duration_unvoiced.append(0)
        for i in range(len(duration_voiced)):  # 浊音段+浊音段两边的轻音段组成一段有效语音段
            duration_all.append(
                duration_unvoiced[i * 2]
                + duration_voiced[i]
                + duration_unvoiced[i * 2 + 1]
            )
        return (
            np.array(duration_voiced, dtype=np.uint32),
            np.array(duration_unvoiced, dtype=np.uint32),
            np.array(duration_all, dtype=np.uint32),
        )

    def pitch(self, ts_mag=0.25):
        """
        获取每帧音高，即基频，这里应该包括基频和各次谐波，最小的为基频（一次谐波），其他的依次为二次、三次...谐波
        各次谐波等于基频的对应倍数，因此基频也等于各次谐波除以对应的次数，精确些等于所有谐波之和除以谐波次数之和
        :param ts_mag: 幅值倍乘因子阈值，>0，大于np.average(np.nonzero(magnitudes)) * ts_mag则认为对应的音高有效,默认0.25
        :return: 每帧基频及其对应峰的幅值(>0)，
                 np.ndarray[shape=(1 + n_fft/2，n_frames), dtype=float32]，（257，全部采样点数/(512*2/3)+1）
        """
        mag_spec = np.abs(
            librosa.stft(
                self.wave_data,
                n_fft=self.fft_num,
                hop_length=self.hop_length,
                win_length=self.frame_len,
                window=self.window,
            )
        )
        # pitches:shape=(d,t)  magnitudes:shape=(d.t), Where d is the subset of FFT bins within fmin and fmax.
        # pitches[f,t] contains instantaneous frequency at bin f, time t
        # magnitudes[f,t] contains the corresponding magnitudes.
        # pitches和magnitudes大于maximal magnitude时认为是一个pitch，否则取0，maximal默认取threshold*ref(S)=1*mean(S, axis=0)
        pitches, magnitudes = librosa.piptrack(
            S=mag_spec, sr=self.sr, threshold=1.0, ref=np.mean, fmin=50, fmax=500
        )  # 人类正常说话基频最大可能范围50-500Hz
        ts = np.average(magnitudes[np.nonzero(magnitudes)]) * ts_mag
        pit_likely = pitches
        mag_likely = magnitudes
        pit_likely[magnitudes < ts] = 0
        mag_likely[magnitudes < ts] = 0
        return pit_likely, mag_likely

    def activity_detect(
        self, min_interval=15, e_low_multifactor=1.0, zcr_multifactor=1.0, pt=False
    ):
        """
        利用短时能量，短时过零率，使用双门限法进行端点检测
        :param min_interval: 最小浊音间隔，默认15帧
        :param e_low_multifactor: 能量低阈值倍乘因子，默认1.0
        :param zcr_multifactor: 过零率阈值倍乘因子，默认1.0
        :param pt: 输出打印标志位，默认为False
        :return: 全部有效语音段:按帧分割后(list,n*2)、按全部采样点的幅值分割(np.ndarray[shape=(n, 采样值数), dtype=float32])、
                浊音段(list,n*2)、轻音段(list,n*2)
        """
        ste = self.short_time_energy()
        zcr = self.zero_crossing_rate()
        energy_average = sum(ste) / len(ste)  # 求全部帧的短时能量均值
        energy_high = energy_average / 4  # 能量均值的4分之一作为能量高阈值
        energy_low = (
            sum(ste[:5]) / 5 + energy_high / 5
        ) * e_low_multifactor  # 前5帧能量均值+能量高阈值的5分之一作为能量低阈值
        zcr_threshold = (
            sum(zcr) / len(zcr) * zcr_multifactor
        )  # 过零率均值*zcr_multfactor作为过零率阈值
        voiced_sound = []  # 语音段的浊音部分
        voiced_sound_added = []  # 浊音扩充后列表
        wave_detected = []  # 轻音扩充后的最终列表
        # 首先利用能量高阈值energy_high进行初步检测，得到语音段的浊音部分
        add_flag = True  # 加入voiced_sound列表标志位
        for i in range(len(ste)):  # 遍历短时能量数据
            if len(voiced_sound) == 0 and add_flag and ste[i] >= energy_high:  # 第一次达到阈值
                voiced_sound.append(i)  # 加入列表
                add_flag = False  # 接下来禁止加入
            if (not add_flag) and ste[i] < energy_high:  # 直到未达到阈值，此时该阶段为一段浊音语音
                if i - voiced_sound[-1] <= 2:  # 检测帧索引间隔，去掉间隔小于2的索引，判断该段为噪音
                    voiced_sound = voiced_sound[:-1]  # 该段不加入列表
                else:  # 否则加入列表
                    voiced_sound.append(i)
                add_flag = True  # 继续寻找下一段浊音（下一个阈值）
            # 再次达到阈值，判断两个浊音间隔是否大于最小浊音间隔
            elif (
                add_flag
                and ste[i] >= energy_high
                and i - voiced_sound[-1] > min_interval
            ):
                voiced_sound.append(i)  # 大于，则分段，加入列表
                add_flag = False  # 接下来禁止加入
            elif (
                add_flag
                and ste[i] >= energy_high
                and i - voiced_sound[-1] <= min_interval
            ):
                voiced_sound = voiced_sound[:-1]  # 小于，则不分段，该段不加入列表
                add_flag = False  # 接下来禁止加入
            if (i == len(ste) - 1) and (
                len(voiced_sound) % 2 == 1
            ):  # 当到达最后一帧，发现浊音段为奇数，则此时到最后一帧为浊音段
                if i - voiced_sound[-1] <= 2:  # 检测帧索引间隔，去掉间隔小于2的索引，判断该段为噪音
                    voiced_sound = voiced_sound[:-1]  # 该段不加入列表
                else:  # 否则加入列表
                    voiced_sound.append(i)
        _print(pt, "能量高阈值:{}，浊音段:{}".format(energy_high, voiced_sound))
        # 再通过能量低阈值energy_low在浊音段向两端进行搜索，超过energy_low便视为有效语音
        for j in range(len(voiced_sound)):  # 遍历浊音列表
            i_minus_flag = False  # i值减一标志位
            i = voiced_sound[j]  # 浊音部分帧索引
            if j % 2 == 1:  # 每段浊音部分的右边帧索引
                while i < len(ste) and ste[i] >= energy_low:  # 搜索超过能量低阈值的帧索引
                    i += 1  # 向右搜索
                voiced_sound_added.append(i)  # 搜索到则加入扩充列表，右闭
            else:  # 每段浊音部分的左边帧索引
                while i > 0 and ste[i] >= energy_low:  # 搜索超过能量低阈值的帧索引
                    i -= 1  # 向左搜索
                    i_minus_flag = True  # i值减一标志位置位
                if i_minus_flag:  # 搜索到则加入扩充列表，左闭
                    voiced_sound_added.append(i + 1)
                else:
                    voiced_sound_added.append(i)
        _print(pt, "能量低阈值:{}，浊音再次扩展后:{}".format(energy_low, voiced_sound_added))
        # 最后通过过零率对浊音扩充后列表向两端再次进行搜索，获取轻音部分
        for j in range(len(voiced_sound_added)):  # 遍历浊音扩充后列表
            i_minus_flag = False  # i值减一标志位
            i = voiced_sound_added[j]  # 浊音扩充后部分帧索引
            if j % 2 == 1:  # 每段浊音扩充部分的右边帧索引
                while i < len(zcr) and zcr[i] >= zcr_threshold:  # 搜索超过过零率阈值的帧索引
                    i += 1  # 向右搜索
                wave_detected.append(i)  # 搜索到则加入扩充列表，右开
            else:  # 每段浊音扩充部分的左边帧索引
                while i > 0 and zcr[i] >= zcr_threshold:  # 搜索超过过零率阈值的帧索引
                    i -= 1  # 向左搜索
                    i_minus_flag = True  # i值减一标志位置位
                if i_minus_flag:  # 搜索到则加入扩充列表，左闭
                    wave_detected.append(i + 1)
                else:
                    wave_detected.append(i)
        _print(pt, "过零率阈值:{}，轻音段增加后:{}".format(zcr_threshold, wave_detected))
        wave_data_detected_frame = []  # 端点检测后，以帧为单位的有效语音列表
        for index in range(len(wave_detected)):
            if index % 2 == 0:  # 按段分割成列表
                wave_data_detected_frame.append(wave_detected[index : index + 2])
            else:
                continue
        _print(
            pt,
            "分割后共{}段语音，按帧分割为{}".format(
                len(wave_data_detected_frame), wave_data_detected_frame
            ),
        )
        wave_data_detected = []  # 端点检测后，对应全部采样点的幅值列表，其中列表代表每个有效语音段
        for index in wave_data_detected_frame:
            try:
                wave_data_detected.append(
                    self.wave_data[
                        index[0] * int(self.frame_len) : index[1] * int(self.frame_len)
                    ]
                )
            except IndexError:
                wave_data_detected.append(
                    self.wave_data[index[0] * int(self.frame_len) : -1]
                )
        _print(
            pt,
            "分割后共{}段语音，按全部采样点的幅值分割为{}".format(
                len(wave_data_detected), wave_data_detected
            ),
        )
        if np.array(voiced_sound_added).size > 1:  # 避免语音过短，只有一帧浊音段
            voiced_frame = (
                np.array(voiced_sound_added).reshape((-1, 2)).tolist()
            )  # 按帧分割的浊音段
        else:  # 只有一帧时
            voiced_frame = np.array(voiced_sound_added).tolist()
        unvoiced_frame = []  # 按帧分割的轻音段
        for i in range(len(wave_detected)):  # 根据最终的扩充后列表和浊音段列表求得轻音段
            if wave_detected[i] < voiced_sound_added[i]:
                unvoiced_frame.append([wave_detected[i], voiced_sound_added[i]])
            elif wave_detected[i] > voiced_sound_added[i]:
                unvoiced_frame.append([voiced_sound_added[i], wave_detected[i]])
            else:
                unvoiced_frame.append([0, 0])
        return (
            wave_data_detected_frame,  # 端点检测后，以帧为单位的有效语音列表
            wave_data_detected,  # 端点检测后，对应全部采样点的幅值列表，其中列表代表每个有效语音段
            voiced_frame,  # 按帧分割的浊音段
            unvoiced_frame,  # 按帧分割的轻音段
        )

    def plot(self, energy="ste", show=True):
        """
        绘制语音波形曲线和（短时）能量、过零率曲线叠加，log功率谱和基频、声压级曲线叠加图
        :param energy: "ste"短时能量，"energy"能量，默认"ste"
        :param show: 默认最后调用plt.show()，显示图形
        :return: None
        """
        plt.figure(figsize=(8, 6))
        # 以下绘制波形图
        wave_ax = host_subplot(211, axes_class=AA.Axes)  # type:AA.Axes
        wave_ax.set_title("Wave Form")
        aa = wave_ax.axis["left"]  # type:AA.axis_artist.AxisArtist
        aa.line.set_color("b")
        aa.major_ticks.set_color("b")
        aa.major_ticklabels.set_color("b")
        wave_ax.set_xticks([])
        audio_total_time = int(len(self.wave_data) / self.sr * 1000)  # 音频总时间ms
        wave_ax.set_xlim(0, audio_total_time)
        wave_ax.set_ylabel("Normalized Amplitude", c="b")
        wave_ax.set_ylim(-1, 1)
        x = np.linspace(0, audio_total_time, len(self.wave_data))  # 从0ms开始，到总时长结束，共采样点数
        wave_ax.plot(x, self.wave_data, c="b", lw=1, label="wave curve")  # 语音波形曲线
        wave_ax.axhline(y=0, c="pink", ls=":", lw=1)  # Y轴0线
        # 以下在波形曲线上叠加绘制(短时)能量曲线
        if energy == "ste":
            e = self.short_time_energy()
        elif energy == "energy":
            e = self.energy()
        else:
            raise ValueError(
                "Incorrect energy type parameter input, choose from 'ste' or 'energy'."
            )
        e_ax = wave_ax.twinx()  # type:AA.Axes  # 共享X轴
        aa = e_ax.axis["right"]  # type:AA.axis_artist.AxisArtist
        aa.toggle(all=True)
        aa.line.set_color("r")
        aa.major_ticks.set_color("r")
        aa.major_ticklabels.set_color("r")
        e_ax.set_ylabel("Energy", c="r")
        e_ax.set_ylim(0, np.max(e))
        x = np.linspace(
            self.frame_len / self.sr * 1000, audio_total_time, len(e)
        )  # 从第1帧结束对应时间开始，到总时长结束
        x = np.append(0, x)  # 第1帧时间之前补一个点：（0,0）
        e = np.append(0, e)
        if energy == "ste":
            e_ax.plot(x, e, c="r", lw=1.5, label="short time energy")
        else:
            e_ax.plot(x, e, c="r", lw=1.5, label="energy")
        # 以下在波形曲线上再次叠加绘制过零率曲线
        zcr = self.zero_crossing_rate()
        zcr_ax = wave_ax.twinx()  # type:AA.Axes
        zcr_ax.axis["right"] = zcr_ax.get_grid_helper().new_fixed_axis(
            loc="right", axes=zcr_ax, offset=(45, 0)
        )
        aa = zcr_ax.axis["right"]  # type:AA.axis_artist.AxisArtist
        aa.toggle(all=True)
        aa.line.set_color("g")
        aa.major_ticks.set_color("g")
        aa.major_ticklabels.set_color("g")
        zcr_ax.set_ylabel("Times of Zero Crossing", c="g")
        zcr_ax.set_ylim(0, np.max(zcr))
        x = np.linspace(
            self.frame_len / self.sr * 1000, audio_total_time, len(zcr)
        )  # 从第1帧结束对应时间开始，到总时长结束
        x = np.append(0, x)  # 第1帧时间之前补一个点：（0,0）
        zcr = np.append(0, zcr)
        zcr_ax.plot(x, zcr, c="g", lw=1.5, label="zero crossing rate")
        wave_ax.legend(
            prop={"family": "Times New Roman", "size": 10},
            loc="upper right",
            framealpha=0.5,
            ncol=3,
            handletextpad=0.2,
            columnspacing=0.7,
        )
        # 以下绘制灰度对数功率谱图
        spec = Spectrogram(
            self.input_file,
            self.sr,
            self.frame_len,
            self.fft_num,
            self.win_step,
            self.window,
            0,
        )
        log_power_spec = librosa.amplitude_to_db(
            spec.get_magnitude_spectrogram(), ref=np.max
        )
        log_power_spec_ax = host_subplot(212, axes_class=AA.Axes)  # type:AA.Axes
        log_power_spec_ax.set_title("Pitches on Log-Power Spectrogram")
        librosa.display.specshow(
            log_power_spec[:, 1:],
            cmap="gray_r",
            sr=self.sr,
            hop_length=self.hop_length,
            x_axis="s",
            y_axis="linear",
        )
        log_power_spec_ax.set_xlabel("Time/ms")
        log_power_spec_ax.set_ylabel("Frequency/Hz")
        log_power_spec_ax.xaxis.set_major_formatter(mtick.FuncFormatter(func_format))
        # 以下在灰度对数功率谱图上叠加绘制pitches中可能的基频F0点图
        pitches, mags = self.pitch()  # 获取每帧基频
        f0_likely = []  # 可能的基频F0
        for i in range(pitches.shape[1]):  # 按列遍历非0最小值，作为每帧可能的F0
            try:
                f0_likely.append(np.min(pitches[np.nonzero(pitches[:, i]), i]))
            except ValueError:
                f0_likely.append(np.nan)  # 当一列，即一帧全为0时，赋值最小值为nan
        f0_all = np.array(f0_likely)
        x = np.linspace(
            0.5 * self.hop_length / self.sr, audio_total_time / 1000, f0_all.size
        )
        y = f0_all
        f0_all_ax = log_power_spec_ax.twinx()  # type:AA.Axes  # 共享X轴
        aa = f0_all_ax.axis["right"]  # type:AA.axis_artist.AxisArtist
        aa.toggle(all=True)
        aa.line.set_color("r")
        aa.major_ticks.set_color("r")
        aa.major_ticklabels.set_color("r")
        f0_all_ax.set_ylabel("Pitches/Hz", c="r")
        f0_all_ax.set_ylim(50, 500)
        f0_all_ax.scatter(x, y, s=10, c="r", label="F0")
        # 以下在灰度对数功率谱图上叠加绘制声压级曲线
        spl = self.intensity()
        x = np.linspace(
            0.5 * self.frame_len / self.sr, audio_total_time / 1000, spl.size
        )
        y = spl
        spl_ax = log_power_spec_ax.twinx()  # type:AA.Axes  # 共享X轴
        spl_ax.axis["right"] = spl_ax.get_grid_helper().new_fixed_axis(
            loc="right", axes=spl_ax, offset=(45, 0)
        )
        aa = spl_ax.axis["right"]  # type:AA.axis_artist.AxisArtist
        aa.toggle(all=True)
        aa.line.set_color("g")
        aa.major_ticks.set_color("g")
        aa.major_ticklabels.set_color("g")
        spl_ax.set_ylabel("Intensity(SPL)/dB", c="g")
        spl_ax.set_ylim(30, 100)
        spl_ax.plot(x, y, "g", lw=1.5, label="SPL")
        plt.legend(
            prop={"family": "Times New Roman", "size": 10},
            loc="upper right",
            framealpha=0.5,
            ncol=3,
            handletextpad=0.2,
            columnspacing=0.7,
        )

        plt.tight_layout()
        if show:
            plt.show()

In [9]:
class VAD:
    """语音端点检测"""

    def __init__(
        self,
        wav_file,
        frame_len=400,
        min_interval=15,
        e_low_multifactor=1.0,
        zcr_multifactor=1.0,
        pt=True,
    ):
        """
        初始化函数
        语音信号是非平稳信号，但是可以认为10~30ms的时间范围内，语音信号是平稳信号,比如这里我取25ms作为一帧
        此时一帧包含25ms*采样率(16kHz)*通道数（1）=400个采样点
        :param wav_file: 输入.wav音频文件
        :param frame_len: 帧长，默认400个采样点
        :param min_interval: 最小浊音间隔，默认15帧
        :param e_low_multifactor: 能量低阈值倍乘因子，默认1.0
        :param zcr_multifactor: 过零率阈值倍乘因子，默认1.0
        :param pt: 输出打印标志位，默认为True
        """
        rf = RhythmFeatures(wav_file, None, frame_len)
        self.wave_data = rf.wave_data  # 获取音频全部采样点的数组形式数据,每个采样点类型为np.float32
        self.sampling_rate = rf.sr
        self.frame_len_samples = frame_len  # 帧长，单位采样点数
        self.frame_len_time = round(
            self.frame_len_samples * 1000 / self.sampling_rate
        )  # 帧长，单位ms
        self.energy = rf.short_time_energy()  # 获取短时能量
        self.zcr = rf.zero_crossing_rate()  # 获取过零率
        # 获取端点检测后的有效语音段
        (
            self.wav_dat_split_f,
            self.wav_dat_split,
            self.voiced_f,
            self.unvoiced_f,
        ) = rf.activity_detect(min_interval, e_low_multifactor, zcr_multifactor, pt)
        # 语音首尾端点检测，中间不检测
        if len(self.wav_dat_split_f[-1]) > 1:  # 避免语音过短，只有一帧
            self.wav_dat_utterance = self.wave_data[
                self.wav_dat_split_f[0][0]
                * int(self.frame_len_samples) : self.wav_dat_split_f[-1][1]
                * int(self.frame_len_samples)
            ]
        else:  # 只有一帧时
            self.wav_dat_utterance = self.wave_data[
                self.wav_dat_split_f[0][0] * int(self.frame_len_samples) :
            ]

    def plot(self):
        """
        绘制音频波形、短时能量和过零率曲线
        :return: None
        """
        audio_total_time = int(len(self.wave_data) / self.sampling_rate * 1000)  # 音频总时间
        plt.figure(figsize=(16, 6))
        # 以下绘制短时能量曲线
        plt.subplot(1, 3, 2)
        frames = [i for i in range(0, len(self.energy))]  # 横轴为帧数轴
        plt.title("Short Time Energy")
        plt.xlabel("Frames")
        plt.ylabel("Energy")
        plt.plot(frames, self.energy, c="g", lw=1)
        plt.grid()
        # 以下绘制过零率曲线
        plt.subplot(1, 3, 3)
        frames = [i for i in range(0, len(self.zcr))]  # 横轴为帧数轴
        plt.title("Zero Crossing Rate")
        plt.xlabel("Frames")
        plt.ylabel("Times of Zero Crossing")
        plt.plot(frames, self.zcr, c="r", lw=1)
        plt.grid()
        # 以下绘制语音波形曲线+端点检测分割线
        plt.subplot(1, 3, 1)
        plt.gca().yaxis.set_major_formatter(mtick.FormatStrFormatter("%.1f"))
        time = [
            int(i * (audio_total_time / len(self.wave_data)))
            for i in range(0, len(self.wave_data))
        ]
        plt.title("Wave Form")
        plt.xlabel("Time/ms")
        plt.ylabel("Normalized Amplitude")
        plt.ylim(-1, 1)
        plt.plot(time, self.wave_data, c="b", lw=1)  # 语音波形曲线
        c = "g"
        for i in range(len(self.wav_dat_split_f)):  # 端点检测分割线
            for j in range(len(self.wav_dat_split_f[i])):
                if (i == 0 and j == 0) or (
                    i == len(self.wav_dat_split_f) - 1 and j == 1
                ):
                    plt.axvline(
                        x=self.wav_dat_split_f[i][j] * self.frame_len_time,
                        c=c,
                        ls="-",
                        lw=2,
                    )
                else:
                    plt.axvline(
                        x=self.wav_dat_split_f[i][j] * self.frame_len_time,
                        c=c,
                        ls="--",
                        lw=1.5,
                    )
            if c == "r":
                c = "g"
            else:
                c = "r"
        plt.grid()

        plt.tight_layout()
        plt.show()

'd:\\PycharmProjects\\Speech_Depression\\output_preprocess_raw_vad.wav'

In [3]:
import librosa
import librosa.display
import numpy as np
import matplotlib.pyplot as plt

# 读取音频文件
audio_file = "output_preprocess.wav"
y, sr = librosa.load(audio_file, sr=None)

# 计算音频的短时能量
energy = librosa.feature.rms(y=y)

# 设置能量阈值
energy_threshold = np.percentile(energy, 80)

# 端点检测
start_idx = 0
end_idx = len(y) - 1
for i in range(len(energy[0])):
    if energy[0][i] > energy_threshold:
        start_idx = i
        break

for i in range(len(energy[0]) - 1, 0, -1):
    if energy[0][i] > energy_threshold:
        end_idx = i
        break

# 去掉静音部分和主试噪音
cleaned_audio = y[start_idx:end_idx]

# 可视化展示前后的变化
plt.figure(figsize=(12, 8))

plt.subplot(2, 1, 1)
librosa.display.waveplot(y, sr=sr)
plt.title('Original Audio')

plt.subplot(2, 1, 2)
librosa.display.waveplot(cleaned_audio, sr=sr)
plt.title('Cleaned Audio')

plt.tight_layout()
plt.show()

# 输出端点位置
print("Start point:", start_idx)
print("End point:", end_idx)


OSError: cannot load library 'libsndfile.dll': error 0x7e