In [1]:
import librosa
from tqdm import tqdm
import numpy as np
from scipy.signal import find_peaks
import os
import soundfile as sf
import librosa.display

In [2]:
def normalize_audio(wav):
    # 標準化音頻信號
    return wav / np.max(np.abs(wav))

def amplify_audio(wav, factor):
    # 放大音頻信號
    return wav * factor

def trim_audio(wav, sr, trim_duration=0.5):
    # 移除前後 trim_duration 秒
    trim_samples = int(trim_duration * sr)
    if len(wav) > 2 * trim_samples:
        return wav[trim_samples:-trim_samples]
    else:
        return wav  # 如果音頻長度不足以移除前後 trim_duration 秒，則不進行裁剪

def load_wav_files(directory, target_sr=16000, amplification_factor=80, trim_duration=1):
    wav_files = []
    for root, dirs, files in os.walk(directory):
        with tqdm(total=len(files), desc='Loading files', unit='file') as pbar:
            for file in files:
                if file.endswith(".wav") and file != 'all_channel.wav':
                    file_path = os.path.join(root, file)
                    y, sr = librosa.load(file_path, sr=None)
                    if sr != target_sr:
                        y = librosa.resample(y, orig_sr=sr, target_sr=target_sr)
                    
                    y = trim_audio(y, sr=target_sr, trim_duration=trim_duration)
                    y = amplify_audio(y, amplification_factor)
                    y = normalize_audio(y)
                    
                    # remove wav_directory from root
                    path = root.split(os.path.sep)
                    filename = f'{path[1]}_{file}'
                    pbar.set_postfix(file=filename,)
                    wav_files.append((y, filename))
                pbar.update(1)
    return wav_files

In [None]:
target_sample_rate = 44100

# 指定存放 WAV 檔案的根目錄
loaded_files = load_wav_files('one_microphone', target_sr=target_sample_rate, amplification_factor=80, trim_duration=1)
loaded_files += load_wav_files('soundcam_dataset', target_sr=target_sample_rate, amplification_factor=80, trim_duration=1)

Loading files: 0file [00:00, ?file/s]
Loading files: 100%|██████████| 20/20 [00:11<00:00,  1.78file/s, file=anomaly-0615-02-09_02-09-20.wav]
Loading files: 100%|██████████| 20/20 [00:08<00:00,  2.33file/s, file=anomaly-0616-01-XX_01-XX-20.wav]
Loading files: 100%|██████████| 20/20 [00:08<00:00,  2.46file/s, file=anomaly-0616-02-24_02-24-20.wav]
Loading files: 100%|██████████| 20/20 [00:07<00:00,  2.66file/s, file=anomaly-0616-03-XX_03-XX-20.wav]
Loading files: 100%|██████████| 9/9 [00:03<00:00,  2.73file/s, file=anomaly-08-15_08-15-09.wav]
Loading files: 100%|██████████| 10/10 [00:03<00:00,  2.63file/s, file=anomaly-09-16_09-16-10.wav]
Loading files: 100%|██████████| 10/10 [00:02<00:00,  3.85file/s, file=anomaly-wang-kong-08_9.wav]
Loading files: 100%|██████████| 10/10 [00:02<00:00,  3.84file/s, file=anomaly-wang-kong-09_9.wav]
Loading files: 100%|██████████| 21/21 [00:08<00:00,  2.46file/s, file=normal-0615-XX-18_XX-18-21.wav]
Loading files: 100%|██████████| 20/20 [00:08<00:00,  2.49f

In [ ]:
def segment(wav_file, duration):
    

In [8]:
def peak_detection(wav, n_fft=None, hop_length=None, height=None, distance=None, prominence=None, width=None, verbose=False):
    window = np.hamming(len(wav))
    wav = wav * window
    
    # 計算短時傅立葉變換 (STFT)
    stft = librosa.stft(wav, n_fft=n_fft, hop_length=hop_length)
    spectrogram = librosa.amplitude_to_db(np.abs(stft), ref=np.max)
    
    # 計算頻譜圖的列均值
    mean_spectrogram = spectrogram.mean(axis=0)
    
    # 使用 scipy.signal.find_peaks 檢測峰值
    peaks, properties = find_peaks(mean_spectrogram, height=height, distance=distance, prominence=prominence, width=width)
    if verbose:
        print(f'Peaks detected: {len(peaks)}')
    return peaks, mean_spectrogram

In [ ]:
def segment_audio_in_time(wav_file, sr=None, segment_length=2, verbose=False):
    """
    Segment audio into clips of a specified length.

    :param wav_file: Tuple containing the audio time series and file name
    :param sr: Sample rate
    :param segment_length: The desired length of each audio segment in seconds (default: 2)
    :param verbose: Print additional information (default: False)
    :return: A list of audio segments
    """
    # 載入音訊數據
    y, name = wav_file
    if sr is None:
        sr = librosa.get_samplerate(name)
        y, _ = librosa.load(name, sr=sr)
    
    # 計算每個片段的樣本數
    segment_samples = int(segment_length * sr)
    
    # 切割音訊，每個片段長度固定為 segment_length 秒
    segments = []
    for start in range(0, len(y), segment_samples):
        end = start + segment_samples
        segment = y[start:end]
        
        # 如果片段長度不足 segment_length 秒，則補零
        if len(segment) < segment_samples:
            segment = np.pad(segment, (0, segment_samples - len(segment)), mode='constant')
        
        segments.append(segment)
    
    if verbose:
        print(f"Segmented audio into {len(segments)} clips of {segment_length} seconds each.")
    
    return segments

In [9]:
def segment_audio(wav_file, sr=None, segment_length=None, hop_length=None, n_fft=None, height=None, distance=None, prominence=None, width=None, verbose=False):
    """
    Segment audio into clips based on onset detection.

    :param wav_file: Audio time series and file name tuple
    :param sr: Sample rate
    :param segment_length: The desired length of each audio segment in seconds (default: 2)
    :param hop_length: Number of samples between successive frames (default: 512)
    :param n_fft: Length of the FFT window (default: 2048)
    :param height: Required height of peaks (default: None)
    :param distance: Required minimal horizontal distance (in samples) between neighbouring peaks (default: None)
    :param prominence: Required prominence of peaks (default: None)
    :param width: Required width of peaks (default: None)
    :param verbose: Print additional information (default: False)
    :return: A tuple containing a list of audio segments and the sample rate (segments, sr)
    """
    # 載入音訊數據
    y, name = wav_file
    
    peaks, _ = peak_detection(y, n_fft=n_fft, hop_length=hop_length, height=height, distance=distance, prominence=prominence, width=width, verbose=verbose)
    
    # 將峰值位置轉換為時間戳
    peak_times = librosa.frames_to_time(peaks, sr=sr, hop_length=hop_length, n_fft=n_fft)
    
    # 切割音訊,每個片段長度固定為 segment_length 秒
    segments = []
    for peak_time in peak_times:
        # 根據 peak_time 和 segment_length 計算起始和結束時間
        start_time = max(0, peak_time - segment_length / 2)
        end_time = min(peak_time + segment_length / 2, len(y) / sr)
        
        start_sample = int(start_time * sr)
        end_sample = int(end_time * sr)
        segment = y[start_sample:end_sample]
        
        # 如果片段長度不足 segment_length 秒,則補零
        if len(segment) < segment_length * sr:
            padding_length = int(segment_length * sr) - len(segment)
            segment = np.pad(segment, (0, padding_length), mode='constant')
        
        segments.append(segment)
    
    return segments

In [6]:
output_dir = "segmented_audio"
output_anomaly_dir = "segmented_audio_anomaly"
def segment_files_and_save(files, sr, segment_length=None, hop_length=None, n_fft=None, height=None, distance=None, prominence=None, width=None):
    """
    :param files: List of audio time series and file name tuples
    :param sr: Sample rate
    :param segment_length: The desired length of each audio segment in seconds (default: 2)
    :param hop_length: Number of samples between successive frames (default: 512)
    :param n_fft: Length of the FFT window (default: 2048)
    :param height: Required height of peaks (default: None)
    :param distance: Required minimal horizontal distance (in samples) between neighbouring peaks (default: None)
    :param prominence: Required prominence of peaks (default: None)
    :param width: Required width of peaks (default: None)
    :return: A tuple containing a list of audio segments and the sample rate (segments, sr)
    """
    if not os.path.exists(output_dir):
        os.makedirs(output_dir)
    
    total_files = len(files)
    with tqdm(total=total_files, desc='Processing files', unit='file') as pbar:
        for i, wav in enumerate(files):
            segments = segment_audio(wav, sr=sr, segment_length=segment_length, hop_length=hop_length, n_fft=n_fft, height=height, distance=distance, prominence=prominence, width=width, verbose=False)
            main_file_name = wav[1].split('.')[0]
            pbar.set_postfix(file=f'{wav[1]}/{total_files}', segments=len(segments))
            for j, segment in enumerate(segments):
                file_name = f'{main_file_name}_segment_{j}.wav'
                if 'anomaly' in wav[1]:
                    sf.write(f'{output_anomaly_dir}/{file_name}', segment, target_sample_rate)
                else:
                    sf.write(f'{output_dir}/{file_name}', segment, target_sample_rate)
            pbar.update(1)

In [10]:
n_fft=2048
hop_length=512
height=-64
distance=30
prominence=1
width=5
segment_length=2
segment_files_and_save(files=loaded_files, sr=target_sample_rate, segment_length=segment_length, hop_length=hop_length, n_fft=n_fft, height=height, distance=distance, prominence=prominence, width=width)

Processing files:  17%|█▋        | 127/752 [00:14<01:10,  8.90file/s, file=34A-12-10-2_Mic046.wav/752, segments=19]   


ParameterError: Audio buffer is not finite everywhere