In [1]:
# 必要モジュールのimport
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
import torch.utils.data as data

import os
import sys
import glob
import time
import numpy as np
import scipy
import scipy.signal as signal
import librosa

from tqdm import tqdm
from natsort import natsorted

from models import FCMaskEstimator, BLSTMMaskEstimator, UnetMaskEstimator_kernel3
from beamformer import estimate_covariance_matrix, condition_covariance, estimate_steering_vector, sparse, ds_beamformer, mvdr_beamformer, gev_beamformer, mwf
from utils import AudioProcess, standardize

sys.path.append('..')
from MyLibrary.MyFunc import load_audio_file, save_audio_file, wave_plot, audio_eval

os.environ["CUDA_VISIBLE_DEVICES"]="2"

In [None]:
if __name__ == "__main__":
    
    # 各パラメータを設定
    sample_rate = 16000 # 作成するオーディオファイルのサンプリング周波数を指定
    audio_length = 3 # 単位は秒(second) → fft_size=1024,hop_length=768のとき、audio_length=6が最適かも？
    fft_size = 512 # 高速フーリエ変換のフレームサイズ
    hop_length = 160 # 高速フーリエ変換におけるフレームのスライド幅
    spec_frame_num = 64 # スペクトログラムのフレーム数 spec_freq_dim=512のとき、音声の長さが5秒の場合は128, 3秒の場合は64
    # マスクのチャンネルを指定（いずれはconfigまたはargsで指定）TODO
    target_aware_channel = 0
    noise_aware_channel = 4
    
    # 評価する音声ファイルを格納したディレクトリを指定
    test_data_dir = "../data/NoisySpeechDataset_for_unet_fft_512_multi_wav_1209/test/"
    azimuth_list = natsorted(os.listdir(test_data_dir)) # 0, 15, 30,・・・,90
#     azimuth_list.pop(1)
    print("azimuth_list:", azimuth_list)
    
    # マスク推定モデルの種類を指定
    model_type = 'Unet' # 'FC' or 'BLSTM' or 'Unet'
    # ビームフォーマの種類を指定
    beamformer_type = 'MVDR' # 'DS' or 'MVDR' or 'GEV', or 'MWF' or 'Sparse'
    
    # モデルの設定
    # 学習済みのパラメータを保存したチェックポイントファイルのパスを指定
    checkpoint_path = "./ckpt/ckpt_NoisySpeechDataset_for_unet_fft_512_multi_wav_Unet_aware_1208/ckpt_epoch110.pt"
    # ネットワークモデルを定義
    if model_type == 'BLSTM':
        model = BLSTMMaskEstimator()
    elif model_type == 'FC':
        model = FCMaskEstimator()
    elif model_type == 'Unet':
        model = UnetMaskEstimator_kernel3()
        pass
    # 前処理クラスのインスタンスを作成
    transform = AudioProcess(audio_length, sample_rate, fft_size, hop_length, model_type)
    # GPUが使える場合はGPUを使用、使えない場合はCPUを使用
    device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
    print("使用デバイス：" , device)
    # 学習済みのパラメータをロード
    model_params = torch.load(checkpoint_path, map_location=device)
    model.load_state_dict(model_params['model_state_dict'])
    # print("モデルのパラメータ数：", count_parameters(model))
    # MaskEstimatorを使って推論
    # ネットワークを推論モードへ
    model.eval()
    
    # 干渉音の到来方向ごとに評価
    for interference_azimuth in azimuth_list:
        ######################雑音除去＋音声評価#########################
        # 音声評価結果の合計値を格納するリストを用意
        sdr_mix_list = []
        sir_mix_list = []
        sar_mix_list = []
        sdr_est_list = []
        sir_est_list = []
        sar_est_list = []
        # 合計処理時間を測るための変数を用意
        processing_duration_sum = 0

        mixed_audio_path_list = natsorted(glob.glob(os.path.join(test_data_dir, interference_azimuth, "*_mixed.wav"))) # （例）p232_016_mixed.wav
        for mixed_audio_path in tqdm(mixed_audio_path_list):            
            # 処理の開始時間
            iter_start_time = time.perf_counter()
            # マルチチャンネル音声データを複素スペクトログラムと振幅スペクトログラムに変換
            mixed_complex_spec, mixed_amp_spec = transform(mixed_audio_path)
            """mixed_complex_spec: (num_channels, freq_bins, time_steps), mixed_amp_spec: (num_channels, freq_bins, time_steps)"""
            # 振幅スペクトログラムを標準化
            mixed_amp_spec = standardize(mixed_amp_spec)
            # numpy形式のデータをpytorchのテンソルに変換
            mixed_amp_spec = torch.from_numpy(mixed_amp_spec.astype(np.float32)).clone()
            # モデルに入力できるようにバッチサイズの次元を追加
            mixed_amp_spec = mixed_amp_spec.unsqueeze(0)
            """mixed_amp_spec: (batch_size, num_channels, freq_bins, time_steps)"""
            # 音源方向推定情報を含むマスクを推定
            target_mask_output, noise_mask_output = model(mixed_amp_spec)
            if model_type == 'FC' or 'Unet':
                # マスクのチャンネルを指定（目的音に近いチャンネルと雑音に近いチャンネル）
                estimated_target_mask = target_mask_output[:, target_aware_channel, :, :]
                """estimated_target_mask: (batch_size, freq_bins, time_steps)"""
                estimated_noise_mask = noise_mask_output[:, noise_aware_channel, :, :]
                """estimated_noise_mask: (batch_size, freq_bins, time_steps)"""
            elif model_type == 'BLSTM':
                # 複数チャンネル間のマスク値の中央値をとる（median pooling）
                (estimated_target_mask, _) = torch.median(target_mask_output, dim=1)
                """estimated_target_mask: (batch_size, freq_bins, time_steps)"""
                (estimated_noise_mask, _) = torch.median(noise_mask_output, dim=1)
                """estimated_noise_mask: (batch_size, freq_bins, time_steps)"""
            else:
                print("Please specify the correct model type")
            # バッチサイズの次元を削除
            estimated_target_mask = estimated_target_mask.squeeze(0)
            """estimated_target_mask: (freq_bins, time_steps)"""
            estimated_noise_mask = estimated_noise_mask.squeeze(0)
            """estimated_noise_mask: (freq_bins, time_steps)"""
            # U-Netの場合paddingされた分を削除する
            if model_type == 'Unet':
                # とりあえずハードコーディング TODO
                mixed_complex_spec = mixed_complex_spec[:, :, :301]
                estimated_target_mask = estimated_target_mask[:, :301] 
                estimated_noise_mask = estimated_noise_mask[:, :301]

            # pytorchのテンソルをnumpy形式のデータに変換
            estimated_target_mask = estimated_target_mask.detach().numpy().copy() # CPU
            estimated_noise_mask = estimated_noise_mask.detach().numpy().copy() # CPU
            # 目的音のマスクと雑音のマスクからそれぞれの空間共分散行列を推定
            target_covariance_matrix = estimate_covariance_matrix(mixed_complex_spec, estimated_target_mask)
            noise_covariance_matrix = estimate_covariance_matrix(mixed_complex_spec, estimated_noise_mask)
            noise_covariance_matrix = condition_covariance(noise_covariance_matrix, 1e-6) # これがないと性能が大きく落ちる（雑音の共分散行列のみで良い）
            # noise_covariance_matrix /= np.trace(noise_covariance_matrix, axis1=-2, axis2=-1)[..., None, None]
            # ビームフォーマによる雑音除去を実行
            if beamformer_type == 'MVDR':
                # target_steering_vectors = estimate_steering_vector(target_covariance_matrix)
                # estimated_spec = mvdr_beamformer(mixed_complex_spec, target_steering_vectors, noise_covariance_matrix)
                estimated_spec = mvdr_beamformer(mixed_complex_spec, target_covariance_matrix, noise_covariance_matrix)
            elif beamformer_type == 'GEV':
                estimated_spec = gev_beamformer(mixed_complex_spec, target_covariance_matrix, noise_covariance_matrix)
            elif beamformer_type == "DS":
                target_steering_vectors = estimate_steering_vector(target_covariance_matrix)
                estimated_spec = ds_beamformer(mixed_complex_spec, target_steering_vectors)
            elif beamformer_type == "MWF":
                estimated_spec = mwf(mixed_complex_spec, target_covariance_matrix, noise_covariance_matrix)
            elif beamformer_type == 'Sparse':
                estimated_spec = sparse(mixed_complex_spec, estimated_target_mask) # マスクが正常に推定できているかどうかをテストする用
            else:
                print("Please specify the correct beamformer type")
            """estimated_spec: (num_channels, freq_bins, time_frames)"""

            # マルチチャンネルスペクトログラムを音声波形に変換
            mixed_audio_data = load_audio_file(mixed_audio_path, audio_length, sample_rate)
            """mixed_audio_data: (num_samples, num_channels)"""
            multichannel_estimated_voice_data= np.zeros(mixed_audio_data.shape, dtype='float64') # マルチチャンネル音声波形を格納する配列
            # 1chごとスペクトログラムを音声波形に変換
            for i in range(estimated_spec.shape[0]):
                estimated_voice_data = librosa.core.istft(estimated_spec[i, :, :], hop_length=hop_length)
                multichannel_estimated_voice_data[:, i] = estimated_voice_data
            """multichannel_estimated_voice_data: (num_samples, num_channels)"""
            # 処理の終了時間
            iter_finish_time = time.perf_counter()
            # 1ループ当たりの処理時間（音声波形→STFT→雑音除去→iSTFT→音声波形）
            iter_processing_duration = iter_finish_time - iter_start_time
            processing_duration_sum += iter_processing_duration
            
            # オーディオデータを保存
            estimated_voice_path = "./estimated_voice.wav"
            save_audio_file(estimated_voice_path, multichannel_estimated_voice_data, sample_rate)
            # ファイル名を取得
            file_num = os.path.basename(mixed_audio_path).split('.')[0].rsplit('_', maxsplit=1)[0] # （例） p232_016
            # 干渉雑音の方位角を取得
            target_voice_path = os.path.join(test_data_dir, interference_azimuth, file_num + "_target.wav")
            interference_audio_path = os.path.join(test_data_dir, interference_azimuth, file_num + "_interference.wav") # （例）p232_016_interference.wav
            # 音声評価
            sdr_mix, sir_mix, sar_mix, sdr_est, sir_est, sar_est = audio_eval(audio_length, sample_rate, \
            target_voice_path, interference_audio_path, mixed_audio_path, estimated_voice_path)
            # 音声評価結果を記録
            sdr_mix_list.append(sdr_mix)
            sir_mix_list.append(sir_mix)
            sar_mix_list.append(sar_mix)
            sdr_est_list.append(sdr_est)
            sir_est_list.append(sir_est)
            sar_est_list.append(sar_est)
            # 推定音声が蓄積されないように削除
            os.remove(estimated_voice_path)

        # データの数を取得
        num_file = len(mixed_audio_path_list)
        print("#" * 50)
        print("使用デバイス：" , device)
        print("干渉音の方向:", interference_azimuth + 'deg')
        print("合計処理時間：", str(processing_duration_sum) + 'sec')
        print("平均処理時間：", str(processing_duration_sum/num_file) + 'sec')
        print("平均 | SDR_mix: {:.3f}, SIR_mix: {:.3f}, SAR_mix: {:.3f}".format(np.mean(sdr_mix_list), np.mean(sir_mix_list), np.mean(sar_mix_list)))
        print("平均 | SDR_est: {:.3f}, SIR_est: {:.3f}, SAR_est: {:.3f}".format(np.mean(sdr_est_list), np.mean(sir_est_list), np.mean(sar_est_list)))
        print("標準偏差 | SDR_mix: {:.3f}, SIR_mix: {:.3f}, SAR_mix: {:.3f}".format(np.std(sdr_mix_list), np.std(sir_mix_list), np.std(sar_mix_list)))
        print("標準偏差 | SDR_est: {:.3f}, SIR_est: {:.3f}, SAR_est: {:.3f}".format(np.std(sdr_est_list), np.std(sir_est_list), np.std(sar_est_list)))

azimuth_list: ['0', '15', '30', '45', '60', '75', '90']
使用デバイス： cuda:0


 20%|██        | 6/30 [1:02:26<3:57:26, 593.59s/it]