In [28]:
# 필요한 라이브러리 설치
!pip install asteroid torch torchaudio soundfile tqdm matplotlib thop



In [29]:
!pip install torchinfo ptflops



In [30]:
import os
import torch
import soundfile as sf
# 기존: from asteroid.models import DPTNet
from asteroid.models import DPRNNTasNet
# from asteroid.losses import SI_SNR
from ptflops import get_model_complexity_info

In [31]:
# SI-SNR 계산 함수 (직접 구현)
def calculate_si_snr(estimation, reference):
    """
    Args:
        estimation (torch.Tensor): 추정된 오디오 (B, T)
        reference (torch.Tensor): 참조 오디오 (B, T)

    Returns:
        si_snr (torch.Tensor): SI-SNR 값 (B,)
    """
    reference = reference - reference.mean(dim=-1, keepdim=True)
    estimation = estimation - estimation.mean(dim=-1, keepdim=True)
    # Inner product
    s_target = torch.sum(estimation * reference, dim=-1, keepdim=True) * reference / torch.sum(reference**2, dim=-1, keepdim=True)
    e_noise = estimation - s_target
    si_snr = 10 * torch.log10(torch.sum(s_target**2, dim=-1) / torch.sum(e_noise**2, dim=-1))
    return si_snr


In [32]:
# 데이터셋 로드 함수
def load_dataset(dataset_path):
    """
    Args:
        dataset_path (str): 데이터셋의 최상위 폴더 경로 (/test)

    Returns:
        file_list (list): 데이터 파일 리스트 [(mixed_path, [s1_path, s2_path]), ...]
    """
    mixed_path = os.path.join(dataset_path, "mixed_data")
    source_paths = [os.path.join(dataset_path, "s1"), os.path.join(dataset_path, "s2")]

    file_list = []
    for filename in os.listdir(mixed_path):
        if filename.endswith(".wav"):
            mixed_file = os.path.join(mixed_path, filename)
            sources = [os.path.join(source_path, filename) for source_path in source_paths]
            file_list.append((mixed_file, sources))
    return file_list

In [33]:
# 모델 초기화
def initialize_model():
    model = DPRNNTasNet(n_src=2)  # 2개의 소스 분리
    model.eval()
    return model

In [34]:
from asteroid.metrics import get_metricsdef calculate_sdr(estimation, reference):
    """
    SDR (Signal-to-Distortion Ratio) 계산
    Args:
        estimation (Tensor): 추정된 신호 (B, T)
        reference (Tensor): 참조 신호 (B, T)

    Returns:
        sdr (Tensor): SDR 값 (B,)
    """
    reference_energy = torch.sum(reference**2, dim=-1)
    projection = torch.sum(estimation * reference, dim=-1) * reference / reference_energy
    noise = estimation - projection
    sdr = 10 * torch.log10(torch.sum(projection**2, dim=-1) / torch.sum(noise**2, dim=-1))
    return sdr

def calculate_sdri(estimates, references, mixture):
    """
    SDRi (Signal-to-Distortion Ratio improvement) 계산
    Args:
        estimates (Tensor): 추정된 신호 (N, T)
        references (Tensor): 참조 신호 (N, T)
        mixture (Tensor): 혼합 신호 (1, T)

    Returns:
        sdri (float): SDRi 값
    """
    # SDR 계산
    sdr_mixture = calculate_sdr(mixture.repeat(references.size(0), 1), references).mean().item()
    sdr_estimates = calculate_sdr(estimates, references).mean().item()

    # SDRi 계산
    sdri = sdr_estimates - sdr_mixture
    return sdri

def calculate_metrics(estimates, references, mixture):
    """
    Args:
        estimates (Tensor): 추정된 소스 신호 (N, T)
        references (list of torch.Tensor): 참조 신호 리스트
        mixture (Tensor): 혼합 신호

    Returns:
        si_snri (float): SI-SNRi 값
        sdri (float): SDRi 값
    """
    # estimates 텐서 변환: (B, N, T) -> (N, T)
    estimates = estimates.squeeze(0)  # (N, T)
    print(f"Adjusted estimates shape: {estimates.shape}")

    # 모델 출력 길이에 참조 신호 길이 맞추기
    ref_len = estimates.size(1)
    references = [
        torch.tensor(ref[:ref_len], dtype=torch.float32) if len(ref) > ref_len
        else torch.nn.functional.pad(torch.tensor(ref, dtype=torch.float32), (0, ref_len - len(ref)))
        for ref in references
    ]
    references = torch.stack(references)  # (N, T)

    # 혼합 신호 길이 맞추기
    mixture = mixture[:, :ref_len]  # (1, T)

    # SI-SNRi 계산
    si_snr_mixture = calculate_si_snr(mixture.repeat(len(references), 1), references).mean().item()
    si_snr_estimates = calculate_si_snr(estimates, references).mean().item()
    si_snri = si_snr_estimates - si_snr_mixture

    # SDRi 계산
    sdri = calculate_sdri(estimates, references, mixture)

    return si_snri, sdri

SyntaxError: invalid syntax (<ipython-input-34-17003b66c94e>, line 1)

In [35]:
from asteroid.metrics import get_metrics
def calculate_metrics(estimates, references, mixture):
    """
    Args:
        estimates (Tensor): 추정된 소스 신호 (N, T)
        references (list of torch.Tensor): 정답 소스 신호 리스트
        mixture (torch.Tensor): 혼합 신호

    Returns:
        si_snri (float): SI-SNRi 값
        sdri (float): SDRi 값
    """
    # estimates 텐서 변환: (B, N, T) -> (N, T)
    estimates = estimates.squeeze(0)  # (N, T)
    print(f"Adjusted estimates shape: {estimates.shape}")

    # 모델 출력 길이에 참조 신호 길이 맞추기
    ref_len = estimates.size(1)
    references = [
        torch.tensor(ref[:ref_len], dtype=torch.float32) if len(ref) > ref_len
        else torch.nn.functional.pad(torch.tensor(ref, dtype=torch.float32), (0, ref_len - len(ref)))
        for ref in references
    ]
    references = torch.stack(references)  # (N, T)

    # 혼합 신호 길이 맞추기
    mixture = mixture[:, :ref_len]  # (1, T)

    # SI-SNR 계산
    si_snr_mixture = calculate_si_snr(mixture.repeat(len(references), 1), references).mean().item()
    si_snr_estimates = calculate_si_snr(estimates, references).mean().item()

    # SI-SNRi 및 SDRi 계산
    si_snri = si_snr_estimates - si_snr_mixture
    # SDR 계산 (Asteroid의 get_metrics 사용)
    metrics_mixture = get_metrics(
        references.numpy(),         # 정답 신호
        mixture.numpy(),            # 혼합 신호
        references.numpy(),         # 기준은 항상 정답 신호
        sample_rate=8000,
        metrics_list=["sdr"]
    )
    metrics_estimates = get_metrics(
        references.numpy(),         # 정답 신호
        estimates.numpy(),          # 추정 신호
        references.numpy(),         # 기준은 항상 정답 신호
        sample_rate=8000,
        metrics_list=["sdr"]
    )

    # SDRi 계산
    sdr_mixture = metrics_mixture["sdr"]
    sdr_estimates = metrics_estimates["sdr"]
    sdri = sdr_estimates - sdr_mixture
    return si_snri, sdri

In [36]:
# 메인 함수
def main(dataset_path):
    # 데이터 로드
    file_list = load_dataset(dataset_path)

    # 모델 초기화
    model = initialize_model()

    total_si_snri = 0
    total_sdri = 0
    total_files = len(file_list)

    print(f"총 {total_files}개의 파일을 처리합니다...")

    # 각 파일에 대해 처리
    for idx, (mixed_file, source_files) in enumerate(file_list):
        print(f"[{idx + 1}/{total_files}] Processing: {mixed_file}")

        # 혼합 오디오 및 소스 로드
        mixed_wave, _ = sf.read(mixed_file)
        target_waves = [sf.read(src)[0] for src in source_files]

        # 입력 데이터 준비
        mixture = torch.tensor(mixed_wave, dtype=torch.float32).unsqueeze(0)  # (1, T)
        references = [torch.tensor(tgt, dtype=torch.float32) for tgt in target_waves]


        # 음성 분리 수행
        with torch.no_grad():
            estimates = model(mixture)  # 모델 추정 결과 (B, T, N)

        # 디버깅: estimates 내용 확인
        print(f"Model output shape: {estimates.shape}")

        # SI-SNRi 및 SDRi 계산
        si_snri, sdri = calculate_metrics(estimates, references, mixture)
        total_si_snri += si_snri
        total_sdri += sdri

        print(f"  SI-SNRi: {si_snri:.2f} dB, SDRi: {sdri:.2f} dB")

    # 최종 결과 출력
    avg_si_snri = total_si_snri / total_files
    avg_sdri = total_sdri / total_files
    print(f"\n평균 SI-SNRi: {avg_si_snri:.2f} dB")
    print(f"평균 SDRi: {avg_sdri:.2f} dB")


In [37]:
DATASET_PATH = "/content/drive/MyDrive/Dataset/mk_1800_dataset/test"
main(DATASET_PATH)

총 180개의 파일을 처리합니다...
[1/180] Processing: /content/drive/MyDrive/Dataset/mk_1800_dataset/test/mixed_data/Animal_Animal_401.wav
Model output shape: torch.Size([1, 2, 55125])
Adjusted estimates shape: torch.Size([2, 55125])


  else torch.nn.functional.pad(torch.tensor(ref, dtype=torch.float32), (0, ref_len - len(ref)))


  SI-SNRi: -18.33 dB, SDRi: -9.87 dB
[2/180] Processing: /content/drive/MyDrive/Dataset/mk_1800_dataset/test/mixed_data/Animal_Animal_402.wav
Model output shape: torch.Size([1, 2, 220500])
Adjusted estimates shape: torch.Size([2, 220500])
  SI-SNRi: -19.69 dB, SDRi: -12.18 dB
[3/180] Processing: /content/drive/MyDrive/Dataset/mk_1800_dataset/test/mixed_data/Animal_Animal_403.wav
Model output shape: torch.Size([1, 2, 220500])
Adjusted estimates shape: torch.Size([2, 220500])
  SI-SNRi: -27.53 dB, SDRi: 7.63 dB
[4/180] Processing: /content/drive/MyDrive/Dataset/mk_1800_dataset/test/mixed_data/Animal_Animal_404.wav
Model output shape: torch.Size([1, 2, 220500])
Adjusted estimates shape: torch.Size([2, 220500])
  SI-SNRi: -7.12 dB, SDRi: 7.97 dB
[5/180] Processing: /content/drive/MyDrive/Dataset/mk_1800_dataset/test/mixed_data/Animal_Animal_405.wav
Model output shape: torch.Size([1, 2, 220500])
Adjusted estimates shape: torch.Size([2, 220500])
  SI-SNRi: -21.36 dB, SDRi: -20.82 dB
[6/180] 

In [38]:
from torchinfo import summary
from ptflops import get_model_complexity_info

def calculate_model_metrics(model):
    """
    Args:
        model: PyTorch 모델

    Returns:
        params (str): 모델 파라미터 수
        macs (str): 모델 MACs 수
    """
    # 입력 데이터 크기 (예: 1초 길이의 신호, 8000 Hz 샘플링 레이트 가정)
    input_size = (1, 32000)  # (Batch, Time)

    # 모델 파라미터 수 계산
    print("모델 요약:")
    summary(model, input_size=input_size)

    # MACs 수 계산
    macs, params = get_model_complexity_info(
        model,
        input_size,
        as_strings=True,
        print_per_layer_stat=False,
        verbose=False
    )
    return params, macs

# DualPathRNN 또는 DPTNet 모델 초기화
from asteroid.models import DPRNNTasNet

# 모델 생성
model = DPRNNTasNet(n_src=2)

# 파라미터와 MACs 계산
params, macs = calculate_model_metrics(model)
print(f"모델 파라미터 수: {params}")
print(f"모델 MACs 수: {macs}")

모델 요약:
모델 파라미터 수: 3.65 M
모델 MACs 수: 30.12 GMac
