In [None]:
import os
import cv2
import time
import librosa
import pandas as pd
import numpy as np
from tqdm.notebook import tqdm

import warnings
warnings.filterwarnings("ignore")

In [None]:
class Config:
 
    DEBUG_MODE = False
    
    OUTPUT_DIR = './numpy_dataset'
    DATA_ROOT = '..'
    FS = 32000
    
    # Mel spectrogram parameters
    N_FFT = 1034
    HOP_LENGTH = 64
    N_MELS = 136
    FMIN = 20
    FMAX = 16000
    
    TARGET_DURATION = 5.0
    TARGET_SHAPE = (256, 256)
    
    N_MAX = 50 if DEBUG_MODE else None  

config = Config()

In [None]:
print(f"Debug mode: {'ON' if config.DEBUG_MODE else 'OFF'}")
print(f"Max samples to process: {config.N_MAX if config.N_MAX is not None else 'ALL'}")

print("Loading taxonomy data...")
taxonomy_df = pd.read_csv(f'{config.DATA_ROOT}/taxonomy.csv')
species_class_map = dict(zip(taxonomy_df['primary_label'], taxonomy_df['class_name']))

print("Loading training metadata...")
train_df = pd.read_csv(f'{config.DATA_ROOT}/train.csv')

In [None]:
label_list = sorted(train_df['primary_label'].unique())
label_id_list = list(range(len(label_list)))
label2id = dict(zip(label_list, label_id_list))
id2label = dict(zip(label_id_list, label_list))

print(f'Found {len(label_list)} unique species')
working_df = train_df[['primary_label', 'rating', 'filename']].copy()
working_df['target'] = working_df.primary_label.map(label2id)
working_df['filepath'] = config.DATA_ROOT + '/train_audio/' + working_df.filename
working_df['samplename'] = working_df.filename.map(lambda x: x.split('/')[0] + '-' + x.split('/')[-1].split('.')[0])
working_df['class'] = working_df.primary_label.map(lambda x: species_class_map.get(x, 'Unknown'))
total_samples = min(len(working_df), config.N_MAX or len(working_df))
print(f'Total samples to process: {total_samples} out of {len(working_df)} available')
print(f'Samples by class:')
print(working_df['class'].value_counts())

In [None]:
def audio2melspec(audio_data):
    if np.isnan(audio_data).any():
        mean_signal = np.nanmean(audio_data)
        audio_data = np.nan_to_num(audio_data, nan=mean_signal)

    mel_spec = librosa.feature.melspectrogram(
        y=audio_data,
        sr=config.FS,
        n_fft=config.N_FFT,
        hop_length=config.HOP_LENGTH,
        n_mels=config.N_MELS,
        fmin=config.FMIN,
        fmax=config.FMAX,
        power=2.0,
        pad_mode="reflect",
        norm='slaney',
        htk=True,
        center=True
    )

    mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max)
    mel_spec_norm = (mel_spec_db - mel_spec_db.min()) / (mel_spec_db.max() - mel_spec_db.min() + 1e-8)
    
    return mel_spec_norm

In [None]:
def slice_audio(y, sr, filename):
    duration = len(y) / sr
    td       = config.TARGET_DURATION  # 5.0
    fs       = config.FS               # 32000

    # 1. 파일 길이 체크
    if duration < td:
        n_repeat = int(np.ceil(td / duration))
        y_rep    = np.tile(y, n_repeat)
        return y_rep[:int(td * fs)]

    # 2. CSA로 시작하는 파일명
    if filename.upper().startswith("CSA"):
        start_sample = int(2.0 * fs)
        return y[start_sample : start_sample + int(td * fs)]

    # 3. 멜 스펙 변환 후 유효 구간 찾기
    S = librosa.feature.melspectrogram(
        y=y, sr=sr,
        n_fft=config.N_FFT,
        hop_length=config.HOP_LENGTH,
        n_mels=config.N_MELS,
        fmin=config.FMIN,
        fmax=config.FMAX
    )
    S_db = librosa.power_to_db(S, ref=np.max)

    mel_f    = librosa.mel_frequencies(
        n_mels=config.N_MELS,
        fmin=config.FMIN,
        fmax=config.FMAX
        )
    
    high_idx = np.where(mel_f >= 2000)[0]

    full_max = S_db.max(axis=0)
    high_max = S_db[high_idx, :].max(axis=0)

    mask_frames = np.where(high_max > -27.5)[0]
    times       = mask_frames * config.HOP_LENGTH / fs

    # 저장된 구간 없으면 2~7초
    if len(times) == 0:
        start_sample = int(2.0 * fs)
        return y[start_sample : start_sample + int(td * fs)]

    # 연속 구간 병합
    intervals = []
    start_t   = times[0]
    for t_prev, t in zip(times, times[1:]):
        if t - t_prev > config.HOP_LENGTH / fs:
            intervals.append((start_t, t_prev))
            start_t = t
    intervals.append((start_t, times[-1]))

    # 슬라이딩 윈도우에서 가장 많이 포함된 5초 찾기
    best_count = -1
    best_start = 0.0
    for s in np.arange(0, duration - td + 1e-6, 1.0):
        e     = s + td
        count = sum(i0 >= s and i1 <= e for i0, i1 in intervals)
        if count > best_count:
            best_count = count
            best_start = s

    start_sample = int(best_start * fs)
    return y[start_sample : start_sample + int(td * fs)]

In [None]:
print("Starting audio processing...")
print(f"{'DEBUG MODE - Processing only 50 samples' if config.DEBUG_MODE else 'FULL MODE - Processing all samples'}")
start_time = time.time()

all_bird_data = {}
errors = []

for i, row in tqdm(working_df.iterrows(), total=total_samples):
    if config.N_MAX is not None and i >= config.N_MAX:
        break
    
    try:
        # 1) 오디오 로드
        audio_data, _ = librosa.load(row.filepath, sr=config.FS)

        # 2) 커스텀 5초 슬라이싱
        audio5 = slice_audio(audio_data, config.FS, os.path.basename(row.filepath))
        # (slice_audio 내부에서 길이 < 5s인 경우 반복+패딩도 처리됨)

        # 3) 멜 스펙트로그램으로 변환
        mel_spec = audio2melspec(audio5)

        # 4) 크기가 맞지 않으면 리사이즈
        if mel_spec.shape != config.TARGET_SHAPE:
            mel_spec = cv2.resize(
                mel_spec, config.TARGET_SHAPE, interpolation=cv2.INTER_LINEAR
            )

        # 5) 결과 저장
        all_bird_data[row.samplename] = mel_spec.astype(np.float32)
        
    except Exception as e:
        print(f"Error processing {row.filepath}: {e}")
        errors.append((row.filepath, str(e)))

end_time = time.time()
print(f"Processing completed in {end_time - start_time:.2f} seconds")
print(f"Successfully processed {len(all_bird_data)} files out of {total_samples} total")
print(f"Failed to process {len(errors)} files")

# 🔄 저장 경로 지정
os.makedirs(config.OUTPUT_DIR, exist_ok=True)
save_path = os.path.join(config.OUTPUT_DIR, 'train.npy')

# 💾 딕셔너리 저장
np.save(save_path, all_bird_data)

print(f"Saved mel spectrograms to {save_path} ({len(all_bird_data)} items)")