#### CMA100类模块测试, 俯仰角抑制算法
1. 测试频点分配函数， 该函数根据阵列组半径的差异，将不同的频段置0，使得对应的阵列组处理对应的频段
2. 测试盲源分离算法， 该函数遍历每个阵列组合， 遍历每一个频点，在每一个频点上计算阵列接受信号相位差，根据相位差将频点设置0， 实现俯仰角抑制的效果。
3. 测试流式音频盲源分离实践算法， 笔者在空间中引入两个信号源， 第一个信号源出于关注区域内， 第二个信号源出于俯仰角度之外， 仿真盲源分离算法用于生产中的效果

In [1]:
from Beamform.BFModel import beamforming_model
from Beamform.CMA100 import CMA100
import librosa
import numpy as np
import matplotlib.pyplot as plt
from Beamform.tools import ConvertToTargetDb, NormLength, NormAmplitude, CalRms
import IPython.display as ipd
import soundfile

In [2]:
# 导入音频
audio_data, _ = librosa.load('./test_audios/1.wav', sr=16000)
noise_data1, _ = librosa.load('./test_audios/A8_0.wav', sr=16000)
noise_data2, _ = librosa.load('./noise/noise1.wav', sr=16000)
audio_data = NormLength(audio_data, num_samples=7*16000)
noise_data1 = NormLength(noise_data1, num_samples=7*16000)
noise_data2 = NormLength(noise_data2, num_samples=7*16000)
#ipd.Audio(data= audio_data, rate=16000)

In [3]:
# 初始化cma100阵列
cma100 = CMA100(fs=16000, L=512)

# 检查阵列流形组合的正确性
'''
for i in range(len(cma100.mics_model_list)):
    print("第{}个角度, 第1组mic的x坐标为:".format(i+1))
    print(cma100.mics_model_list[i][0].mic_x)
'''

print("--------------------------------------------------")
'''
for j in range(len(cma100.mics_model_list[0])):
    print("第1个角度, 第{}组mic的x坐标为:".format(j+1))
    print(cma100.mics_model_list[0][j].mic_x)
'''
# 检查带通滤波
test_signal = np.ones((257,), dtype=complex)
cma100.add_signal(0,0,0,test_signal)
k_value = cma100.band_filter()
print(k_value)



--------------------------------------------------
([256, 128, 64, 32], 31.25)


In [4]:
# 第一组mic频段应该在[129, 256], 第二组mic频段应该在[65, 128], 第三组mic频段在[33, 64], 第四组mic频段应该在[0, 32]
# 检查带通滤波的正确性：
'''
for theta_i in range(len(cma100.mics_model_list)):
    print("第一组半径测试:")
    assert cma100.mics_model_list[theta_i][0].receive_signals_rfft[:,:129].all() == 0+0j, "置0失败"
    print(cma100.mics_model_list[theta_i][0].receive_signals_rfft[:,129:])
'''

'''
for theta_i in range(len(cma100.mics_model_list)):
    print("第二组半径测试:")
    assert cma100.mics_model_list[theta_i][1].receive_signals_rfft[:,:65].all() == 0+0j\
        and cma100.mics_model_list[theta_i][1].receive_signals_rfft[:, 129:].all() == 0+0j, "置0失败"
    print(cma100.mics_model_list[theta_i][1].receive_signals_rfft[:,65:129])
'''


'''
for theta_i in range(len(cma100.mics_model_list)):
    print("第三组半径测试:")
    assert cma100.mics_model_list[theta_i][2].receive_signals_rfft[:,:33].all() == 0+0j\
        and cma100.mics_model_list[theta_i][2].receive_signals_rfft[:, 65:].all() == 0+0j, "置0失败"
    print(cma100.mics_model_list[theta_i][2].receive_signals_rfft[:,33:65])
'''


for theta_i in range(len(cma100.mics_model_list)):
    print("第四组半径测试:")
    assert cma100.mics_model_list[theta_i][3].receive_signals_rfft[:, 33:].all() == 0+0j, "置0失败"
    print(cma100.mics_model_list[theta_i][3].receive_signals_rfft[:,0:33])



第四组半径测试:
[[ 0.2       +0.j         -0.19357801+0.05027477j  0.17472448-0.0973209j
  -0.14465016+0.13811709j  0.10528643-0.17004343j -0.05916122+0.1910496j
   0.00923669-0.19978659j  0.04128102+0.19569332j -0.08914767-0.17903266j
   0.13128927+0.15087454j -0.16499949-0.11302728j  0.18811347+0.06792143j
  -0.19914684-0.01845367j  0.19739102-0.03219918j -0.18295877+0.0807842j
   0.15677695-0.12418127j -0.12052693+0.15960345j  0.07653669-0.18477591j
  -0.02763127+0.19808209j -0.02304862-0.19866746j  0.07224833+0.18649445j
  -0.11680827-0.16234478j  0.15386679+0.12776936j -0.18104402-0.08498861j
   0.19659462+0.0367499j  -0.19951995+0.01384888j  0.18963213-0.06355828j
  -0.16756617+0.10918599j  0.13473913-0.14780178j -0.09325916+0.17692577j
   0.04579011-0.19468761j  0.00461958+0.19994664j -0.0547326 -0.19236513j]
 [ 0.1999345 +0.j         -0.19356207+0.05007521j  0.17485101-0.09695837j
  -0.14499406+0.13766091j  0.10589444-0.16958824j -0.06004457+0.19070515j
   0.01036715-0.19966553j  0.03

In [5]:
# 下面检验简单信号的俯仰角抑制算法, 先搞个在范围内的:
# 先重置CMA100阵列模型
'''
cma100.reset()
audio_data, _ = librosa.load('./test_audios/1.wav', sr=16000)
signal_rfft = librosa.stft(audio_data, n_fft=512, win_length=512, hop_length=128)[:,100]
cma100.add_signal(1,3,0,signal_rfft)
cma100.angleFilter_Beamform()
enhanced_signal_rfft = cma100.build_signal_rfft()


signal = np.fft.irfft(signal_rfft)
signal_enhanced = np.fft.irfft(enhanced_signal_rfft)
print(np.sum(signal**2))
print(np.sum(signal_enhanced**2))
'''
cma100.reset()
audio_data, _ = librosa.load('./test_audios/1.wav', sr=16000)
signal_rfft = librosa.stft(audio_data, n_fft=512, win_length=512, hop_length=128)[:,100]
cma100.add_signal(0,4,0,signal_rfft)
cma100.angle_filter()
enhanced_signal_rfft = cma100.build_signal_rfft()


signal = np.fft.irfft(signal_rfft)
signal_enhanced = np.fft.irfft(enhanced_signal_rfft)

In [4]:
# 检验简单信号的的俯仰角抑制算法， 在收音范围之外:
cma100.reset()
audio_data, _ = librosa.load('./test_audios/1.wav', sr=16000)
signal_rfft = librosa.stft(audio_data, n_fft=512, win_length=512, hop_length=128)[:,100]
cma100.add_signal(10,0,0,signal_rfft)
cma100.angle_filter()
print(cma100.k_value)
enhanced_signal_rfft = cma100.build_signal_rfft()


signal = np.fft.irfft(signal_rfft)
signal_enhanced = np.fft.irfft(enhanced_signal_rfft)
#print(np.sum(signal**2))
#print(np.sum(signal_enhanced**2))



[256, 128, 64, 32]


In [7]:
# ----------------------------------分割线---------------------------------------
# 短语音demo
# 以下实现流式盲源分离算法。

# 读取音频数据 + STFT
audio_data0, _ = librosa.load('./test_audios/1.wav', sr=16000)
audio_data1, _ = librosa.load("./test_audios/A8_0.wav", sr=16000)
audio_data0 = audio_data0[:5*16000]
audio_data1 = audio_data1[:5*16000]
audio0_rfft = librosa.stft(audio_data0, n_fft=512, win_length=512, hop_length=128)
audio1_rfft = librosa.stft(audio_data1, n_fft=512, win_length=512, hop_length=128)
# 设置声源位置
audio0_x = 11.0
audio0_y = 0.0
audio0_z = 0.0

audio1_x = 2.0
audio1_y = 0.0
audio1_z = 0.0
# 重置阵列
cma100.reset()
# 模拟阵列流式接受信号， 并处理的过程
frameLength = 512
N = audio0_rfft.shape[1]
enhancedAudio = np.zeros_like(audio0_rfft)
micAudio = np.zeros_like(audio0_rfft)


# 流式处理：
for i in range(N):
    cma100.add_signal(S_x = audio0_x, S_y = audio0_y, S_z = audio0_z, signal_rfft = audio0_rfft[:, i])
    cma100.add_signal(S_x = audio1_x, S_y = audio1_y, S_z = audio1_z, signal_rfft = audio1_rfft[:, i])
    micAudio[:,i] = cma100.build_signal_rfft(method=0) # 把处理前的信号提取出来
    cma100.band_filter() # 频点分配
    cma100.angle_filter() # 角度抑制
    #cma100.angleFilter_Beamform()
    enhancedAudio[:,i] = cma100.build_signal_rfft(method=0) # 将处理后的信号提取出来
    cma100.reset() # 重置阵列，用于处理下一帧


enhancedAudio = librosa.istft(enhancedAudio, n_fft=512, win_length=512, hop_length=128)
micAudio = librosa.istft(micAudio, n_fft=512, win_length=512, hop_length=128)
enhancedAudio = NormAmplitude(enhancedAudio)
micAudio = NormAmplitude(micAudio)
import soundfile
soundfile.write("./angle_filter_input.wav", micAudio, samplerate=16000)
soundfile.write("./angle_filter_output.wav", enhancedAudio, samplerate=16000)






In [4]:
# ----------------------------------分割线---------------------------------------
# 长语音demo
# 以下实现流式盲源分离算法。

# 读取音频数据 + STFT
audio_data0, _ = librosa.load('./test_audios/M1-mono.wav', sr=16000)
audio_data1, _ = librosa.load("./test_audios/F1-mono.wav", sr=16000)
audio_data0 = audio_data0[:179*16000]
audio_data1 = audio_data1[:179*16000]
audio0_rfft = librosa.stft(audio_data0, n_fft=512, win_length=512, hop_length=128)
audio1_rfft = librosa.stft(audio_data1, n_fft=512, win_length=512, hop_length=128)
# 设置声源位置
audio0_x = 2.0
audio0_y = 0.0
audio0_z = 0.0

audio1_x = 11.0
audio1_y = 0.0
audio1_z = 0.0
# 重置阵列
cma100.reset()
# 模拟阵列流式接受信号， 并处理的过程
frameLength = 512
N = audio0_rfft.shape[1]
enhancedAudio = np.zeros_like(audio0_rfft)
micAudio = np.zeros_like(audio0_rfft)


# 流式处理：
for i in range(N):
    cma100.add_signal(S_x = audio0_x, S_y = audio0_y, S_z = audio0_z, signal_rfft = audio0_rfft[:, i])
    cma100.add_signal(S_x = audio1_x, S_y = audio1_y, S_z = audio1_z, signal_rfft = audio1_rfft[:, i])
    micAudio[:,i] = cma100.build_signal_rfft(method=0) # 把处理前的信号提取出来
    cma100.band_filter() # 频点分配
    cma100.angle_filter() # 角度抑制
    #cma100.angleFilter_Beamform()
    enhancedAudio[:,i] = cma100.build_signal_rfft(method=0) # 将处理后的信号提取出来
    cma100.reset() # 重置阵列，用于处理下一帧


enhancedAudio = librosa.istft(enhancedAudio, n_fft=512, win_length=512, hop_length=128)
micAudio = librosa.istft(micAudio, n_fft=512, win_length=512, hop_length=128)
enhancedAudio = NormAmplitude(enhancedAudio)
micAudio = NormAmplitude(micAudio)
import soundfile
soundfile.write("./angle_filter_long_input.wav", micAudio, samplerate=16000)
soundfile.write("./angle_filter_long_output.wav", enhancedAudio, samplerate=16000)



In [6]:
# 短语音demo + bf。 看下是否能解决多mic混响的问题
# 读取音频数据 + STFT
cma100.reset()
audio_data0, _ = librosa.load('./example_audio/angle_filter_input.wav', sr=16000)

# 读取音频数据 + STFT
audio_data0, _ = librosa.load('./test_audios/1.wav', sr=16000)
audio_data1, _ = librosa.load("./test_audios/A8_0.wav", sr=16000)
audio_data0 = audio_data0[:5*16000]
audio_data1 = audio_data1[:5*16000]
audio0_rfft = librosa.stft(audio_data0, n_fft=512, win_length=512, hop_length=128)
audio1_rfft = librosa.stft(audio_data1, n_fft=512, win_length=512, hop_length=128)
# 设置声源位置
audio0_x = 11.0
audio0_y = 0.0
audio0_z = 0.0

audio1_x = 2.0
audio1_y = 0.0
audio1_z = 0.0
# 重置阵列
cma100.reset()
# 模拟阵列流式接受信号， 并处理的过程
frameLength = 512
N = audio0_rfft.shape[1]
enhancedAudio = np.zeros_like(audio0_rfft)
micAudio = np.zeros_like(audio0_rfft)


# 流式处理：
for i in range(N):
    cma100.add_signal(S_x = audio0_x, S_y = audio0_y, S_z = audio0_z, signal_rfft = audio0_rfft[:, i])
    cma100.add_signal(S_x = audio1_x, S_y = audio1_y, S_z = audio1_z, signal_rfft = audio1_rfft[:, i])
    micAudio[:,i] = cma100.build_signal_rfft(method=0) # 把处理前的信号提取出来
    cma100.band_filter() # 频点分配
    cma100.angle_filter() # 角度抑制
    cma100.beamforming(audio1_x, audio1_y, audio1_z)
    enhancedAudio[:,i] = cma100.build_signal_rfft(method=0) # 将处理后的信号提取出来
    cma100.reset() # 重置阵列，用于处理下一帧


enhancedAudio = librosa.istft(enhancedAudio, n_fft=512, win_length=512, hop_length=128)
micAudio = librosa.istft(micAudio, n_fft=512, win_length=512, hop_length=128)
enhancedAudio = NormAmplitude(enhancedAudio)
micAudio = NormAmplitude(micAudio)
import soundfile
soundfile.write("./angle_filter_input.wav", micAudio, samplerate=16000)
soundfile.write("./angle_filter+bf_output.wav", enhancedAudio, samplerate=16000)


KeyboardInterrupt: 