## 전체 음성 데이터 전처리

전체 반려동물 울음소리 음성 데이터(.wav)에 대해 전처리를 수행한다.

음성의 진폭과 주파수에 따른 특징이 잘 드러나도록 세그먼트 분할, 노이즈 제거, 등의 작업을 1~6 번 과정에서 수행하며

7~9 번 과정에서 음성 길이 통일, masking, 등의 과정을 통해 동일한 크기의 벡터 생성 및 데이터 증강을 수행한다.

본 문서는 수집된 전체 음성 데이터에 대해 수행되며 각 단계에 대한 필요성 언급 및 시각화를 하지 않는다. 이는 전체 데이터를 효과적으로 전처리하기 위함이며 각 단계의 필요성 및 시각적 자료는 trans_data_preview.ipynb 파일에 명시되어 있다.

<br>

전처리 순서는 아래와 같다.

1. sample rate를 16000으로 통일한다.

2. 로그 멜 스펙트럼을 통해 음성의 파워를 측정한다.

3. 파워의 지역 최솟값을 기준으로 음성을 분할한다.

4. 분할된 음성의 앞뒤 화이트 노이즈(특정 임계값 이하의 에너지를 가지는 시점)을 제거한다.

5. 분할된 음성이 노이즈를 제거하여 패턴을 보다 잘 드러나게 한다.

6. 음성 정규화를 수행한다.

7. Padding과 Trimming을 통해 평균 세그멘테이션 길이를 가지도록 음성의 길이를 통일한다.

8. 음성의 유사도를 측정하여 하위 10%의 데이터는 제외한다.

9. 유사도 상위 10% 데이터에 대하여 SpecAugment를 적용하여 Training 데이터에 대해 마스킹 작업을 수행한다. 각 파일 당 마스킹 파일 2개를 생성한다.


In [1]:
# Load packages
import os
import sys
import wave
import librosa
import numpy as np
import pandas as pd
import seaborn as sns
from tqdm import tqdm
import soundfile as sf
import matplotlib.pyplot as plt
from uuid import uuid4

In [2]:
target_animal = 'dog'  # 'cat', 'dog'

# Set Path
main_path = os.path.join(os.getcwd().rsplit(
    'FurEmotion-AI')[0], 'FurEmotion-AI')
data_path = os.path.join(main_path, 'data', target_animal)
temp_data_path = os.path.join(main_path, 'temp_data', target_animal)
csv_path = os.path.join(main_path, 'origin_data_info.csv')
origin_data_path = os.path.join(main_path, 'origin_data', target_animal)

sys.path.append(main_path)

if not os.path.exists(temp_data_path):
    os.makedirs(temp_data_path)
if not os.path.exists(data_path):
    os.makedirs(data_path)
if not os.path.exists(origin_data_path):
    raise ValueError(f'No such animal data path: {origin_data_path}')

In [3]:
# 하이퍼 파라미터
search_in_sec = 3   # 파워값을 측정하는 시간 간격

state_list = [dir_path for dir_path in os.listdir(origin_data_path) if os.path.isdir(
    os.path.join(origin_data_path, dir_path))]

# play는 데이터양이 너무 적어서 제외한다.
if 'play' in state_list:
    state_list.remove('play')
print("state list: ", state_list)

state list:  ['relax', 'hostile', 'whining']


In [4]:
# 시각화 함수들을 정의한다.
def show_spectrum(y, sr):
    # STFT 계산
    D = librosa.amplitude_to_db(abs(librosa.stft(y)), ref=np.max)

    # 시간과 주파수 축을 위한 값들 계산
    times = np.linspace(0, len(y) / sr, num=D.shape[1], endpoint=False)
    freqs = librosa.fft_frequencies(sr=sr, n_fft=2048)

    # 스펙트럼 시각화
    plt.figure(figsize=(8, 4))
    plt.imshow(D, aspect='auto', origin='lower', extent=[
               times.min(), times.max(), freqs.min(), freqs.max()])
    plt.colorbar(format='%+2.0f dB')
    plt.xlabel('Time (s)')
    plt.ylabel('Frequency (Hz)')
    plt.title('Spectrogram')
    plt.yscale('log')
    plt.tight_layout()
    plt.show()


def show_mel_power(power, dot_list=[], dot_color='red', dot_label=''):
    # 파워 시각화
    plt.figure(figsize=(12, 5))
    plt.plot(power)
    plt.title("Log Mel Spectrum Power")
    plt.xlabel("Time Frame")
    plt.ylabel("Power")

    if (len(dot_list) > 0):
        plt.scatter(dot_list, power[dot_list],
                    color=dot_color, label=dot_label)
        plt.legend()

    plt.grid(True)
    plt.tight_layout()
    plt.show()


def show_masked_mel(db_masked_mel):
    plt.figure(figsize=(10, 4))
    plt.imshow(db_masked_mel, origin='lower', aspect='auto',
               extent=[0, db_masked_mel.shape[1], 0, db_masked_mel.shape[0]])
    plt.colorbar(format='%+2.0f dB')
    plt.title('Realistically Masked Mel spectrogram (using matplotlib)')
    plt.xlabel('Time frames')
    plt.ylabel('Mel frequency bins')
    plt.tight_layout()
    plt.show()

In [5]:
# 데이터 경로를 불러오는 함수를 정의한다.
def get_sound_files_by_state(target_path, state_list):
    sound_files = {}
    for state in state_list:
        sound_files[state] = []

    for (root, dir, files) in os.walk(target_path):
        folder_state = root.split('/')[-1]
        if folder_state not in state_list:
            continue
        for file in files:
            if file.endswith('.wav'):
                sound_files[root.split(
                    '/')[-1]].append(os.path.join(root, file))
    return sound_files


origin_sound_files = get_sound_files_by_state(origin_data_path, state_list)
for state in state_list:
    print(
        f'{state}: {len(origin_sound_files[state])}', origin_sound_files[state][:2])

relax: 71 ['/Users/jaewone/Downloads/FurEmotion-AI/origin_data/dog/relax/Dog190.wav', '/Users/jaewone/Downloads/FurEmotion-AI/origin_data/dog/relax/Dog184.wav']
hostile: 69 ['/Users/jaewone/Downloads/FurEmotion-AI/origin_data/dog/hostile/dog59.wav', '/Users/jaewone/Downloads/FurEmotion-AI/origin_data/dog/hostile/dog65.wav']
whining: 181 ['/Users/jaewone/Downloads/FurEmotion-AI/origin_data/dog/whining/whining122.wav', '/Users/jaewone/Downloads/FurEmotion-AI/origin_data/dog/whining/whining136.wav']


In [6]:
# 2. sampling reate를 16000으로 통일한다.
import sox


def resampling(file_path_list: list[str], output_path: str, target_sample_rate: int = 16000):
    """
    Change sample rate

    Before start:

        sox를 설치하여야 아래 함수를 수행할 수 있다.

        OS X: brew install sox

        linux: apt-get install sox

        windows: exe 파일을 다운받아 실행: https://sourceforge.net/projects/sox/files/sox/14.4.1/

    Parameters:
        file_path_list: 변환하고자 하는 wav 파일 리스트

        output_path: 변환된 결과물을 저장하고자 하는 폴더 경로.

        target_sample_rate: 변환하고자 하는 sample rate

    Returns: None
    """
    tfm = sox.Transformer()
    tfm.convert(samplerate=target_sample_rate)

    for i in tqdm(range(len(file_path_list))):
        if not os.path.exists(file_path_list[i]):
            raise OSError(f'File {file_path_list[i]} not exist')

        file = file_path_list[i].rsplit('/', 1)[1]
        output_file_path = os.path.join(output_path, file)

        try:
            tfm.build(file_path_list[i], output_file_path)
        except:
            print(f'Error in {file_path_list[i]}')
            continue


for state in origin_sound_files:
    output_path = os.path.join(temp_data_path, state)
    if not os.path.exists(output_path):
        os.makedirs(output_path)
    resampling(
        file_path_list=origin_sound_files[state],
        output_path=output_path,
        target_sample_rate=16000
    )

temp_sound_files = get_sound_files_by_state(temp_data_path, state_list)
for state in state_list:
    print(
        f'{state}: {len(temp_sound_files[state])}', temp_sound_files[state][:2])

100%|██████████| 71/71 [00:00<00:00, 86.42it/s]
100%|██████████| 69/69 [00:00<00:00, 82.69it/s]
 43%|████▎     | 78/181 [00:00<00:00, 108.93it/s]

Error in /Users/jaewone/Downloads/FurEmotion-AI/origin_data/dog/whining/whining182.wav
Error in /Users/jaewone/Downloads/FurEmotion-AI/origin_data/dog/whining/whining183.wav
Error in /Users/jaewone/Downloads/FurEmotion-AI/origin_data/dog/whining/whining181.wav
Error in /Users/jaewone/Downloads/FurEmotion-AI/origin_data/dog/whining/whining180.wav


 56%|█████▋    | 102/181 [00:00<00:00, 110.08it/s]

Error in /Users/jaewone/Downloads/FurEmotion-AI/origin_data/dog/whining/whining184.wav
Error in /Users/jaewone/Downloads/FurEmotion-AI/origin_data/dog/whining/whining178.wav
Error in /Users/jaewone/Downloads/FurEmotion-AI/origin_data/dog/whining/whining179.wav
Error in /Users/jaewone/Downloads/FurEmotion-AI/origin_data/dog/whining/whining174.wav


 70%|██████▉   | 126/181 [00:01<00:00, 111.91it/s]

Error in /Users/jaewone/Downloads/FurEmotion-AI/origin_data/dog/whining/whining175.wav
Error in /Users/jaewone/Downloads/FurEmotion-AI/origin_data/dog/whining/whining177.wav
Error in /Users/jaewone/Downloads/FurEmotion-AI/origin_data/dog/whining/whining176.wav
Error in /Users/jaewone/Downloads/FurEmotion-AI/origin_data/dog/whining/whining172.wav
Error in /Users/jaewone/Downloads/FurEmotion-AI/origin_data/dog/whining/whining173.wav


100%|██████████| 181/181 [00:01<00:00, 106.92it/s]

relax: 71 ['/Users/jaewone/Downloads/FurEmotion-AI/temp_data/dog/relax/Dog190.wav', '/Users/jaewone/Downloads/FurEmotion-AI/temp_data/dog/relax/Dog184.wav']
hostile: 69 ['/Users/jaewone/Downloads/FurEmotion-AI/temp_data/dog/hostile/dog59.wav', '/Users/jaewone/Downloads/FurEmotion-AI/temp_data/dog/hostile/dog65.wav']
whining: 168 ['/Users/jaewone/Downloads/FurEmotion-AI/temp_data/dog/whining/whining122.wav', '/Users/jaewone/Downloads/FurEmotion-AI/temp_data/dog/whining/whining136.wav']





In [7]:
# 3번 과정의 함수를 정의한다.
from scipy.signal import stft


def read_wav(file_path):
    with wave.open(file_path, 'r') as wav_file:
        n_channels, sampwidth, framerate, n_frames, comptype, compname = wav_file.getparams()
        audio_data = wav_file.readframes(n_frames)
        audio_data = np.frombuffer(audio_data, dtype=np.int16)

    # Stereo 파일의 경우 mono로 변환
    if n_channels == 2:
        audio_data = (audio_data[::2] + audio_data[1::2]) / 2

    return [audio_data, {'n_channels': n_channels,
                         'sampwidth': sampwidth,
                         'framerate': framerate,
                         'n_frames': n_frames,
                         'comptype': comptype,
                         'compname': compname}]


def mel_filter_bank(num_filters, fft_size, sample_rate):
    """
    멜 필터뱅크 생성
    """
    # 멜 스케일과 헤르츠 스케일 간의 변환 함수
    def hz_to_mel(hz): return 2595 * np.log10(1 + hz / 700)
    def mel_to_hz(mel): return 700 * (10**(mel / 2595) - 1)

    # 멜 스케일로 끝점 설정
    mel_end = hz_to_mel(sample_rate / 2)
    mel_points = np.linspace(hz_to_mel(0), mel_end, num_filters + 2)
    hz_points = mel_to_hz(mel_points)

    # FFT 주파수 인덱스로 변환
    bin_points = np.floor((fft_size + 1) * hz_points / sample_rate).astype(int)

    # 필터뱅크 생성
    filters = np.zeros((num_filters, fft_size // 2 + 1))
    for i in range(1, num_filters + 1):
        filters[i - 1, bin_points[i - 1]:bin_points[i]] = \
            (np.arange(bin_points[i - 1], bin_points[i]) -
             bin_points[i - 1]) / (bin_points[i] - bin_points[i - 1])
        filters[i - 1, bin_points[i]:bin_points[i + 1]] = 1 - \
            (np.arange(bin_points[i], bin_points[i + 1]) -
             bin_points[i]) / (bin_points[i + 1] - bin_points[i])
    return filters


def get_log_mel_spectogram(audio_data, framerate, num_filters=40, nperseg=2048, noverlap=1024, nfft=2048):
    # STFT 계산
    _, _, Zxx = stft(audio_data, fs=framerate, nperseg=nperseg,
                     noverlap=noverlap, nfft=nfft)
    magnitude = np.abs(Zxx)

    # 로그 멜 스펙트럼 추출
    num_filters = num_filters
    filters = mel_filter_bank(num_filters, 2048, framerate)
    mel_spectrum = np.dot(filters, magnitude)
    log_mel_spectrum = np.log(mel_spectrum + 1e-9)  # log 0을 피하기 위한 작은 값 추가
    return log_mel_spectrum


def get_mel_power(log_mel_spectrum):
    # 로그 멜 스펙트럼의 파워 계산
    return np.sum(log_mel_spectrum**2, axis=0)

In [8]:
# 4번 과정의 함수를 정의한다.
def get_interval_min_sec(power, sec: int = 1):
    interval_sec = int(10 * sec)
    min_sec_list = []
    for i in range(0, len(power) + 1, interval_sec):
        argsort = np.argsort(power[i:i + interval_sec])
        if len(argsort) < 1:
            break
        min_sec = i + argsort[0]
        min_sec_list.append(min_sec)
    return min_sec_list


def split_audio_on_indices(audio_data, power, indices):
    """
    주어진 인덱스를 기준으로 오디오 데이터를 여러 부분으로 나누는 함수.

    Parameters:
    - audio_data: 오디오 데이터 배열
    - indices: 분할할 시점의 리스트

    Returns:
    - audio_segments: 나눈 오디오 데이터의 리스트
    """
    # STFT를 통해 구한 시점을 오디오 샘플의 시점으로 변환
    nperseg = 2048
    noverlap = 1024
    samples_per_frame = (len(audio_data) - nperseg) / (len(power) - 1)
    sample_indices = [int(index * samples_per_frame) for index in indices]

    audio_segments = []
    prev_index = 0
    for index in sample_indices:
        segment = audio_data[prev_index:index]
        if len(segment) != 0:
            audio_segments.append(segment)
        prev_index = index
    audio_segments.append(audio_data[prev_index:])  # 마지막 세그먼트 추가

    return audio_segments


def save_audio_segments_as_wav(audio_segments, output_dir, prefix, sampwidth, framerate):
    """
    주어진 오디오 세그먼트들을 WAV 파일로 저장하는 함수.

    Parameters:
    - audio_segments: 나눈 오디오 데이터의 리스트
    - output_dir: 출력 디렉토리 경로
    - prefix: 저장할 파일의 접두사

    Returns:
    - filepaths: 저장된 파일의 경로 리스트
    """
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)

    filepaths = []
    for i, segment in enumerate(audio_segments):
        filename = f"{prefix}_{uuid4()}.wav"
        filepath = os.path.join(output_dir, filename)
        with wave.open(filepath, 'wb') as wav_file:
            wav_file.setnchannels(1)
            wav_file.setsampwidth(sampwidth)
            wav_file.setframerate(framerate)
            wav_file.writeframes(segment.tobytes())
        filepaths.append(filepath)

    return filepaths

In [9]:
# 5,6,7번 과정의 함수를 정의하거나 불러온다.
from scipy.io import wavfile
from typing import Optional
import wave
import numpy as np
import noisereduce as nr

from utils.os import *
from utils.sound import *


def reduced_base_noise(file_path: str,
                       output_path: Optional[str] = None,
                       inplace: bool = False):
    """
    noisereduce 라이브러리를 이용하여 기본적인 노이즈를 감소시킨다.

    Parameters:
        * file_path : 처리하고자 하는 파일의 경로
        * output_path : 처리한 결과를 저장하고자 하는 파일 경로. 없을 경우 file_path의 파일을 덮어쓴다.
        * inplace : 원본 데이터(file_path)를 처리한 파일로 덮어쓴다.

    Returns: None
    """

    if inplace == False and output_path == None:
        raise ValueError(f'output path must be defined if inplace is False.')

    if output_path == None or inplace == True:
        output_path = file_path

    # load data
    rate, data = wavfile.read(file_path)

    # perform noise reduction
    reduced_noise = nr.reduce_noise(y=data, sr=rate)
    wavfile.write(output_path, rate, reduced_noise)


def detect_non_silence(audio_data: np.ndarray, threshold: float, frame_size: int) -> list[int]:
    """
    오디오 신호에서 무음이 아닌 섹션의 시작과 끝을 감지한다.

    Parameters:
        - audio_data (numpy.ndarray): 묵음을 감지해야 하는 오디오 데이터.
        - threshold (float): 오디오가 무음으로 간주되는 에너지 임계값.
        - frame_size (int): 오디오 에너지의 이동 평균을 계산하기 위해 고려할 샘플 수.

    Returns:
        - start (int): 비침묵 섹션의 시작 샘플.
        - end (int): non-silence 섹션의 엔딩 샘플.
    """

    moving_avg = np.convolve(audio_data, np.ones(
        (frame_size,)) / frame_size, mode='valid')
    non_silence = np.where(moving_avg > threshold)[0]

    start = non_silence[0]
    # compensate for the 'valid' mode in convolution
    end = non_silence[-1] + frame_size

    return start, end


def trim_audio(file_path: str, output_path: Optional[str] = None, inplace: bool = False, frame_size=5000):
    """
    오디오의 앞뒤에 존재하는 화이트 노이즈를 제거한다.
    화이트 노이즈는 음성의 전체 에너지의 하위 10%에 해당하는 시점으로 정의한다.

    Parameters:
        * file_path : 처리하고자 하는 파일의 경로
        * output_path : 처리한 결과를 저장하고자 하는 파일 경로. 없을 경우 file_path의 파일을 덮어쓴다.
        * inplace : 원본 데이터(file_path)를 처리한 파일로 덮어쓴다.

    Returns: None
    """

    if inplace == False and output_path == None:
        raise ValueError(f'output path must be defined if inplace is False.')

    # wav 파일을 읽어온다.
    with wave.open(file_path, "r") as file:
        params = file.getparams()
        n_frames = params[3]
        audio_data = file.readframes(n_frames)
        wave_data = np.frombuffer(audio_data, dtype=np.int16)

    # 오디오 시그널을 통한 에너지 계산
    energy = np.abs(wave_data)

    # 백색 잡음을 분류하기 위한 임계값을 설정.
    # 전체 에너지의 하위 10%에 10을 곱하여 임계값을 설정하였으나 추가적은 고민이 필요하다.
    threshold = np.percentile(energy, 10) * 10

    # Use a larger frame size to get a moving average of the audio energy
    frame_size = frame_size
    error_files = []
    try:
        start, end = detect_non_silence(energy, threshold, frame_size)
        trimmed_wave_data = wave_data[start:end]
    except:
        trimmed_wave_data = wave_data
        error_files.append(file_path)
    # print(start, end)

    # trim된 numpy array를 wav 파일로 저장한다.
    if inplace or output_path == None:
        remove_file(file_path)
        output_path = file_path
    with wave.open(output_path, "w") as out_file:
        out_file.setparams(params)
        out_file.writeframes(trimmed_wave_data.tobytes())

    return error_files


def normalize_and_save_audio(input_path, output_path=None, inplace=True):
    """
    Load an audio file from the given input path, normalize it, and save the normalized audio to the given output path.

    Parameters:
    - input_path: Path to the input audio file.
    - output_path: Path to save the normalized audio file.
    - inplace: 이미 존재하는 파일에 덮어쓴다.
    """
    if inplace == False and output_path == None:
        raise ValueError(f'output path must be defined if inplace is False.')

    # Load the audio file
    y, sr = librosa.load(input_path, sr=None)

    # Normalize the audio data
    y_normalized = y / np.max(np.abs(y))

    # Convert float to int16 (since wavfile.write requires int values)
    y_normalized_int16 = (y_normalized * 32767).astype(np.int16)

    if inplace:
        os.remove(input_path)
        output_path = input_path

    # Save the normalized audio data to the specified output path
    wavfile.write(output_path, sr, y_normalized_int16)

  from .autonotebook import tqdm as notebook_tqdm


In [10]:
# 3 ~ 7 번 과정을 수행한다.
for state in temp_sound_files:
    file_list = temp_sound_files[state]
    save_data_path = os.path.join(data_path, state)
    for i in tqdm(range(len(file_list))):
        file_state = file_list[i].rsplit('/', 2)[1]

        """   3. 로그 멜 스펙트럼을 통해 음성의 파워를 측정한다.   """

        # 파일을 읽어온다.
        try:
            audio_data, audio_status = read_wav(file_list[i])
        except Exception as e:
            print(f'Error in {file_list[i].rsplit("/", 1)[1]}', f'Error: {e}')
            continue

        # 로그 멜 스펙트럼을 추출한 뒤
        log_mel_spectogram = get_log_mel_spectogram(
            audio_data, audio_status['framerate'])

        # 파워를 측정한다.
        power = get_mel_power(log_mel_spectogram)

        """   4. 파워의 지역 최솟값을 기준으로 음성을 분할한다.   """

        # 파워의 지역 최솟값의 시점들을 구한다.
        min_sec_list = get_interval_min_sec(power, sec=search_in_sec)

        # 지역 최솟값을 기준으로 오디오(numpy)값을 분할한다.
        audio_segments_sec = split_audio_on_indices(
            audio_data, power, min_sec_list)

        # 분할된 값을 data_path에 저장한다.
        saved_filepaths = save_audio_segments_as_wav(
            audio_segments=audio_segments_sec,
            output_dir=save_data_path,
            prefix=file_state,
            sampwidth=audio_status['sampwidth'],
            framerate=audio_status['framerate'],
        )

        # # 분할된 음성 중 search_in_sec의 절반보다 긴 음성들에 대해 5,6 과정을 진행한다.
        total_error_files = []
        for file_path in saved_filepaths:
            # WAV 파일 로드
            y, sr = librosa.load(file_path, sr=None)

            # 파일의 길이(초) 출력
            origin_duration = librosa.get_duration(y=y, sr=sr)
            if origin_duration > 1:
                # 5. 분할된 음성의 앞뒤 화이트 노이즈(특정 임계값 이하의 에너지를 가지는 시점)을 제거한다.
                error_files = trim_audio(
                    file_path, inplace=True, frame_size=1000)
                if (len(error_files) > 0):
                    total_error_files += error_files

                # 6. 분할된 음성이 노이즈를 제거하여 패턴을 보다 잘 드러나게 한다.
                reduced_base_noise(file_path, inplace=True)

                # 7. 음성 정규화를 수행한다.
                normalize_and_save_audio(file_path, inplace=True)
            # else:
            #     os.remove(file_path)

data_sound_files = get_sound_files_by_state(data_path, state_list)
for state in state_list:
    print(
        f'{state}: {len(data_sound_files[state])}', data_sound_files[state][:2])

100%|██████████| 71/71 [00:03<00:00, 19.68it/s]
 26%|██▌       | 18/69 [00:00<00:01, 38.17it/s]

Error in dog12.wav Error: unknown format: 65534
Error in dog13.wav Error: unknown format: 65534


 38%|███▊      | 26/69 [00:00<00:01, 28.57it/s]

Error in dog14.wav Error: unknown format: 65534


100%|██████████| 69/69 [00:02<00:00, 27.68it/s]
100%|██████████| 168/168 [00:03<00:00, 42.30it/s]

relax: 137 ['/Users/jaewone/Downloads/FurEmotion-AI/data/dog/relax/relax_acafd77b-1241-4c54-b89b-1b8766667cf8.wav', '/Users/jaewone/Downloads/FurEmotion-AI/data/dog/relax/relax_6d4884ad-3223-4e54-aac9-c48dcecf526a.wav']
hostile: 140 ['/Users/jaewone/Downloads/FurEmotion-AI/data/dog/hostile/hostile_518e376a-54e8-4db7-a042-dfcc6bc20620.wav', '/Users/jaewone/Downloads/FurEmotion-AI/data/dog/hostile/hostile_889fdebb-566b-4381-99a1-a2af29b5fd21.wav']
whining: 300 ['/Users/jaewone/Downloads/FurEmotion-AI/data/dog/whining/whining_60de4a67-0b80-4694-83f3-1f255575bb99.wav', '/Users/jaewone/Downloads/FurEmotion-AI/data/dog/whining/whining_a6ce5c62-0f32-4fb3-a664-227f493d3e70.wav']





### 음성 길이 규정

위 과정을 통해 음성의 특징을 가지는 각각의 전처리된 세그먼트가 wav 파일 형태로 저장되었다.

모델의 입력 벡터로 구성하기 위해서는 모든 음성의 길이가 동일해야 함으로 모든 세그먼트의 평균 길이를 구한 뒤 Padding과 Trimming 과정을 통해 평균 길이로 음성 길이를 통일하고자 한다.


#### 평균 세그먼트 길이 측정

아래 과정을 통해 평균 세그먼트의 (음성)길이는 2.05 임을 알았다. 이에 음성을 반올림값인 2초로 통일하고자 한다.


In [11]:
data_sound_files = get_sound_files_by_state(data_path, state_list)
file_list = [
    file for state in data_sound_files for file in data_sound_files[state]]
duration_list = []
for file_path in file_list:
    y, sr = librosa.load(file_path, sr=None)
    duration = librosa.get_duration(y=y, sr=sr)
    duration_list.append(duration)
    # if duration < 1:
    #     os.remove(file_path)
    # else:
    #     duration_list.append(duration)
sum = np.array(duration_list).sum()
avg = sum / len(duration_list)
avg_duration = round(avg)
print(f'Average: {avg}')
print(f'사용할 음성의 길이: {avg_duration}')

Average: 2.0532673310225307
사용할 음성의 길이: 2


In [12]:
# 8번 과정을 수행한다.

def pad_audio_to_length(input_path, target_duration, output_path=None, inplace=False):
    """
    wav 파일을 target_duration 길이로 맞춘다. 
    음성의 중심을 기준으로 target_duration보다 짧을 경우 앞뒤에 Padding을 추가하며 
    음성의 중심을 기준으로 target_duration보다 길 경우 앞뒤를 자른다.
    """

    if inplace == False and output_path == None:
        raise ValueError(f'output path must be defined if inplace is False.')

    # WAV 파일 읽기
    y, sr = librosa.load(input_path, sr=None)

    # 현재 오디오 파일의 길이 확인
    current_duration = len(y) / sr

    # target_duration과 비교하여 패딩 또는 trimming 필요 여부 확인
    total_samples = int(target_duration * sr)

    if len(y) < total_samples:
        # 패딩 필요
        padding_samples = total_samples - len(y)
        left_padding = padding_samples // 2
        right_padding = padding_samples - left_padding
        processed_audio = np.pad(
            y, (left_padding, right_padding), mode='constant')
    else:
        # trimming 필요
        excess_samples = len(y) - total_samples
        left_trim = excess_samples // 2
        right_trim = excess_samples - left_trim
        processed_audio = y[left_trim:-right_trim]

    if inplace:
        os.remove(input_path)
        output_path = input_path

    # 변환된 데이터를 WAV 파일로 저장
    sf.write(output_path, processed_audio, sr)

    return output_path


data_sound_files = get_sound_files_by_state(data_path, state_list)
file_list = [
    file for state in data_sound_files for file in data_sound_files[state]]

for file_path in file_list:
    pad_audio_to_length(file_path, avg_duration, inplace=True)

In [13]:
# 9번 과정을 수행한다 : 음성의 유사도를 측정하여 하위 10%의 데이터는 제외한다.

min_state = ''
min_count = 100000    # 초기값: 불가능한 아주 큰 수
for state in os.listdir(data_path):
    count = len(os.listdir(os.path.join(data_path, state)))
    print(f'State {state} with file counts: {count}')
    if count < min_count:
        min_count = count
        min_state = state

print(f'\n가장 적게 존재하는 State는 {state} 이며 개수는 {min_count} 이다.')

State relax with file counts: 137
State hostile with file counts: 140
State whining with file counts: 300

가장 적게 존재하는 State는 whining 이며 개수는 137 이다.


In [14]:
# 9번 과정을 수행한다 : 유사도를 측정한 뒤 전체 파일의 개수를 맞춘다.
from sklearn.metrics.pairwise import cosine_similarity
from typing import Optional
from tqdm import tqdm
import librosa
import numpy as np

from utils.os import remove_file


def compute_melspectrogram(file_path: str) -> np.ndarray:
    """
    wav 파일의 멜 스펙토그램을 계산한다.

    Parameters:
        * file_path : wav 파일의 경로

    Returns:
        * 계산된 멜 스펙트럼값(numpy array)
    """
    y, sr = librosa.load(file_path)
    mels = librosa.feature.melspectrogram(y=y, sr=sr)
    return mels


def get_similarities(file_list: list[str], top_n: Optional[int] = None) -> list[str]:
    """
    파일 리스트에서 가장 유사한 n개의 파일을 반환한다.

    Parameters:
    * file_list: 분석하고자 하는 파일 경로 리스트
    * top_n: 유사한 순으로 나열했을 때 반환 할 상위 n개의 파일 리스트. 만약 top_n=None 일 경우 전체 리스트를 반환한다.

    Returns: 유사도 순으로 나열한 n개의 파일 리스트
    """

    # 멜 스펙트럼을 분석한다.
    melspectrograms = {file_path: compute_melspectrogram(
        file_path) for file_path in file_list}

    # 코사인 유사도를 측정한다.
    n = len(file_list)
    similarity_matrix = np.zeros((n, n))
    with tqdm(total=n, desc='Processing', position=0) as pbar:
        for i in range(n):
            for j in range(n):
                mels1 = melspectrograms[file_list[i]].flatten()
                mels2 = melspectrograms[file_list[j]].flatten()

                # 두 멜 스펙트럼의 크기가 다를 경우 작은 크기에 맞춘다.
                min_size = min(mels1.shape[0], mels2.shape[0])
                mels1 = mels1[:min_size]
                mels2 = mels2[:min_size]

                similarity_matrix[i, j] = cosine_similarity(
                    mels1.reshape(1, -1), mels2.reshape(1, -1))
            pbar.update(1)

    # 유사도 순으로 나열한다.
    np.fill_diagonal(similarity_matrix, 0)
    indices = np.flip(np.argsort(np.sum(similarity_matrix, axis=1)))

    return [file_list[i] for i in (indices if top_n == None else indices[:top_n])]


def delete_dissimilar_wavs(data_path: str, n_limit: int, save_dir=None):
    mask_files = {}
    for state in state_list:
        mask_files[state] = []
        state_path = os.path.join(data_path, state)
        state_file_list = [os.path.join(state_path, file)
                           for file in os.listdir(state_path) if file.endswith('.wav')]
        print("state file length: ", len(state_file_list))
        mask_file_length = round(len(state_file_list) * 0.1)

        # get similarities
        sim_file_list = get_similarities(state_file_list)

        # 파일의 개수를 n_limit으로 맞춘다.
        del_file_list = sim_file_list[n_limit:]
        for file in del_file_list:
            remove_file(file)

        # 유사도 정도를 저장한다.
        sim_file_list = sim_file_list[:n_limit]
        if (save_dir != None):
            np.savetxt(f'{os.path.join(save_dir, state)}_similarity.txt',
                       sim_file_list, delimiter=',', fmt='%s')

        mask_files[state] = sim_file_list[:mask_file_length]
    return mask_files


save_dir = os.path.join(main_path, 'save_similarity')
if not os.path.exists(save_dir):
    os.makedirs(save_dir, exist_ok=True)

mask_files = delete_dissimilar_wavs(data_path, min_count, save_dir)
print("\nBest similar files: ")
for state in mask_files:
    print(f'{state}({len(mask_files[state])}):', end=' ')
    for file in mask_files[state]:
        print(file.rsplit('/', 1)[1], end=', ')
    print()

state file length:  137


Processing: 100%|██████████| 137/137 [00:03<00:00, 38.58it/s]


state file length:  140


Processing: 100%|██████████| 140/140 [00:03<00:00, 39.99it/s]


state file length:  300


Processing: 100%|██████████| 300/300 [00:16<00:00, 18.44it/s]


Best similar files: 
relax(14): relax_fb938076-df2c-4cec-8041-6fe70e69b619.wav, relax_ca4ac321-5dc9-482e-a0cb-c1685a644213.wav, relax_af40514e-adcb-4fd2-bd84-04b54b5e92d9.wav, relax_0a977d15-b25e-4e73-beb1-c46f361be830.wav, relax_edd2a521-dabc-4486-93f7-061ac491e31f.wav, relax_ad1f89ef-fe50-4788-b232-01435c05d66b.wav, relax_cfcb0d25-3ea6-465f-9862-e8f06ae8a638.wav, relax_f20900a0-7dc1-4720-969d-eec0f15b4cc5.wav, relax_bff9ff3d-c0a2-44aa-b424-80ac795e6217.wav, relax_1a0fa57a-3154-432b-b0d3-07632364979a.wav, relax_c7a34bf6-da89-4b12-93f1-56e0683d046e.wav, relax_446b3d4a-e1d6-4be2-b657-dec43d522bdb.wav, relax_a11c66b0-2aa5-4f3d-a026-32d8a0f41248.wav, relax_b2227d53-88e1-4ca8-8177-7871f8bd0968.wav, 
hostile(14): hostile_60815d15-6f45-45b5-b7f7-18604b3104c9.wav, hostile_771bff27-6c81-43fd-8ab2-fca8707c31c9.wav, hostile_da5d65c6-646d-4a4a-b47e-eb2de883400e.wav, hostile_6ba041c4-ee90-4fe3-bfc7-bd62818a41bb.wav, hostile_be723a29-f78f-4889-a305-9f64a42b1f5d.wav, hostile_ffa87c62-ab7f-4546-ba66




In [15]:
# 10번 과정을 수행한다.
# 유사도 상위 10% 데이터에 대하여 SpecAugment를 적용하여 Training 데이터에 대해 마스킹 작업을 수행한다.

def mask_melspectrogram(file_path,
                        freq_mask_num=2,
                        time_mask_num=2,
                        freq_masking_max_percentage=0.15,
                        time_masking_max_percentage=0.3):

    # 1. Load the audio file
    y, sr = librosa.load(file_path, sr=None)

    # 2. Extract mel spectrogram
    mel_spectrogram = librosa.feature.melspectrogram(y=y, sr=sr)

    num_freq_bins = mel_spectrogram.shape[0]
    num_time_bins = mel_spectrogram.shape[1]

    # 3. Apply masking
    # Frequency masking
    for _ in range(freq_mask_num):
        freq_start = np.random.randint(0, num_freq_bins)
        freq_length = np.random.randint(
            0, int(num_freq_bins * freq_masking_max_percentage))
        freq_end = min(mel_spectrogram.shape[0], freq_start + freq_length)
        mel_spectrogram[freq_start:freq_end, :] = 0

    # Time masking
    for _ in range(time_mask_num):
        time_start = np.random.randint(0, num_time_bins)
        time_length = np.random.randint(
            0, int(num_time_bins * time_masking_max_percentage))
        time_end = min(mel_spectrogram.shape[1], time_start + time_length)
        mel_spectrogram[:, time_start:time_end] = 0

    return mel_spectrogram


def show_masked_mel(db_masked_mel):
    plt.figure(figsize=(10, 4))
    plt.imshow(db_masked_mel, origin='lower', aspect='auto',
               extent=[0, db_masked_mel.shape[1], 0, db_masked_mel.shape[0]])
    plt.colorbar(format='%+2.0f dB')
    plt.title('Realistically Masked Mel spectrogram (using matplotlib)')
    plt.xlabel('Time frames')
    plt.ylabel('Mel frequency bins')
    plt.tight_layout()
    plt.show()


def save_masked_spectrogram_to_wav(mel_spectrogram, sr, save_path):
    """ Convert masked mel spectrogram back to waveform and save as wav """
    y_inv = librosa.feature.inverse.mel_to_audio(mel_spectrogram, sr=sr)
    sf.write(save_path, y_inv, sr)


def create_mask(wav_file, output_dir=None, n_create=2):
    sr = 16000
    wav_output_dir, wav_file_name = wav_file.rsplit('/', 1)
    base_filename = wav_file_name.split('.')[0]

    if output_dir == None:
        output_dir = wav_output_dir

    for i in range(1, n_create + 1, 1):
        odd = i % 2 == 0
        masked_mel = mask_melspectrogram(
            wav_file,
            freq_mask_num=1,
            time_mask_num=1,
            freq_masking_max_percentage=0.12 if odd else 0.07,
            time_masking_max_percentage=0.07 if odd else 0.12
        )
        save_masked_spectrogram_to_wav(masked_mel, sr, os.path.join(
            output_dir, base_filename + f"_mask{i}.wav"))
        # show_masked_mel(librosa.power_to_db(masked_mel, ref=np.max))
        # print(f'Save: {os.path.join(output_dir, base_filename + f"_mask{i}.wav")}')


top_10 = int(len(os.listdir(os.path.join(data_path, state_list[0]))) * 0.1)

for state in state_list:
    state_data_path = os.path.join(data_path, state)
    file_list = mask_files[state]
    for i in tqdm(range(len(file_list))):
        create_mask(
            wav_file=file_list[i],
            output_dir=state_data_path,
            n_create=1
        )
    print(f'State {state} masking done.')

100%|██████████| 14/14 [00:03<00:00,  4.66it/s]


State relax masking done.


100%|██████████| 14/14 [00:02<00:00,  4.68it/s]


State hostile masking done.


100%|██████████| 30/30 [00:06<00:00,  4.61it/s]

State whining masking done.





In [16]:
# 파일 이름을 relax_1.wav, relax_2.wav, ... 형태로 변경하여 보기 쉽게 한다.
def rename_files(prefix: str, dir_path: str):
    file_list = [file for file in os.listdir(
        dir_path) if file.endswith('.wav')]
    for i in range(len(file_list)):
        os.rename(os.path.join(dir_path, file_list[i]),
                  os.path.join(dir_path, f'{prefix}_{i+1}.wav'))


for state in data_sound_files:
    rename_files(state, os.path.join(data_path, state))

In [None]:
# temp_dir를 삭제한다.
from utils.os import remove_path_with_files

remove_path_with_files(temp_data_path)
print('All process done.')