# 2025.5.6： 生成梅尔频谱图的pkl


In [None]:
import os
import cv2
import math
import time
import librosa
import pandas as pd
import numpy as np
from tqdm.notebook import tqdm
import warnings
import gc
import pickle
import logging
from pathlib import Path
import torch
import torch.nn as nn
import torch.nn.functional as F
import timm

logging.basicConfig(level=logging.ERROR)
warnings.filterwarnings("ignore")

In [None]:
class CFG:
    DEBUG_MODE = False
    OUTPUT_DIR = '/kaggle/working/'
    DATA_ROOT = '/kaggle/input/birdclef-2025'
    audio_datadir = '/kaggle/input/birdclef-2025/train_audio'
    test_soundscapes = '/kaggle/input/birdclef-2025/test_soundscapes'
    submission_csv = '/kaggle/input/birdclef-2025/sample_submission.csv'
    taxonomy_csv = '/kaggle/input/birdclef-2025/taxonomy.csv'



    # Audio parameters
    FS = 32000  
    N_FFT = 1024
    HOP_LENGTH = 512
    N_MELS = 128
    FMIN = 50
    FMAX = 16000

    TARGET_DURATION = 5.0
    TARGET_SHAPE = (256,256)  


    
    
    in_channels = 1
    device = 'cpu'  
    
    # Inference parameters
    batch_size = 16

    
    use_specific_folds = False  # If False, use all found models
    folds = [0,1,2,3]  # Used only if use_specific_folds is True
    
    debug = False
    debug_count = 3
    N_MAX = 20 if DEBUG_MODE else None  

cfg = CFG()

In [None]:
print(f"Using device: {cfg.device}")
print(f"Loading taxonomy data...")
taxonomy_df = pd.read_csv(cfg.taxonomy_csv)
species_ids = taxonomy_df['primary_label'].tolist()
num_classes = len(species_ids)
print(f"Number of classes: {num_classes}")

label_to_idx = {label: idx for idx, label in enumerate(species_ids)}
species_class_map = dict(zip(taxonomy_df['primary_label'], taxonomy_df['class_name']))


In [None]:
'''
压缩动态范围： 分贝值可以压缩能量值的动态范围，使得信号的细节更加明显。
归一化： 归一化可以将所有值都缩放到 0 到 1 的范围内，这可以提高模型的训练效果，并减少梯度消失或爆炸的问题。
'''

def audio2melspec(audio_data):
    '''音频数据转换为归一化的梅尔频谱图'''
    '''插值方法是否可以更改？？'''
    if np.isnan(audio_data).any():
        mean_signal = np.nanmean(audio_data)
        audio_data = np.nan_to_num(audio_data, nan=mean_signal)

    mel_spec = librosa.feature.melspectrogram(
        y=audio_data,
        sr=cfg.FS, #采样率
        n_fft=cfg.N_FFT,   # 快速傅里叶变换的点数   采样点数
        hop_length=cfg.HOP_LENGTH,    
        n_mels=cfg.N_MELS,  
        fmin=cfg.FMIN,
        fmax=cfg.FMAX,
        power=2.0,
        pad_mode="reflect",
        norm='slaney',
        htk=True,
        center=True
    )

    mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max)
    mel_spec_norm = (mel_spec_db - mel_spec_db.min()) / (mel_spec_db.max() - mel_spec_db.min() + 1e-8)
    
    return mel_spec_norm


In [None]:
def make_work_df():
    '''
    创建类别标签到ID的映射（Label Mapping）
    构建工作 DataFrame (Working DF)，包含必要的训练信息

    '''
    train_csv_path = f'{cfg.DATA_ROOT}/train.csv'
    train_df=pd.read_csv(train_csv_path)

    label_list = sorted(train_df['primary_label'].unique())
    print(f'found {len(label_list)} unique species')    #train.csv中有206个类别
    #创建work_df   包含列名：primary_label，filename，class_name
    
    working_df = train_df[['primary_label','secondary_labels','filename']].copy()
    working_df['filepath'] = cfg.audio_datadir + '/' + working_df.filename
    
    working_df["class_name"] = train_df['primary_label'].apply(lambda x : species_class_map.get(x))

    working_df['samplename'] = working_df['filename'].map(lambda x: x.split('/')[0] + '-' + x.split('/')[-1].split('.')[0])
    return  working_df

working_df = make_work_df()

working_df

In [None]:
print("Starting audio processing...")
start_time = time.time()

all_bird_data = {}
errors = []

for i, row in tqdm(working_df.iterrows(), total=working_df.shape[0]):
    if cfg.N_MAX is not None and i >= cfg.N_MAX:
        break
    
    try:
        audio_data, _ = librosa.load(row.filepath, sr=cfg.FS)

        target_samples = int(cfg.TARGET_DURATION * cfg.FS)
        # 目标音频时长

        if len(audio_data) < target_samples:
            n_copy = math.ceil(target_samples / len(audio_data))
            if n_copy > 1:
                audio_data = np.concatenate([audio_data] * n_copy)

        start_idx = max(0, int(len(audio_data) / 2 - target_samples / 2))
        #从中间取五秒
        
        end_idx = min(len(audio_data), start_idx + target_samples)
        center_audio = audio_data[start_idx:end_idx]

        if len(center_audio) < target_samples:
            center_audio = np.pad(center_audio, 
                                 (0, target_samples - len(center_audio)), 
                                 mode='constant')

        mel_spec = audio2melspec(center_audio)

        if mel_spec.shape != cfg.TARGET_SHAPE:
            mel_spec = cv2.resize(mel_spec, cfg.TARGET_SHAPE, interpolation=cv2.INTER_CUBIC)  
            #通过cv2 resize进行缩放，使用双三次插值

        all_bird_data[row.samplename] = mel_spec.astype(np.float32)
        
    except Exception as e:
        print(f"Error processing {row.filepath}: {e}")
        errors.append((row.filepath, str(e)))

end_time = time.time()
print(f"Processing completed in {end_time - start_time:.2f} seconds")
print(f"Successfully processed {len(all_bird_data)}")
print(f"Failed to process {len(errors)} files")

In [None]:
pkl_name = "likely_best_audio.pkl"

path = os.path.join("/kaggle/working/",pkl_name)
with open(path, "wb") as f:
    pickle.dump(all_bird_data, f)
    print("Save %s." % path)

In [None]:
# new_audio_data

In [None]:
   # if not cfg.LOAD_DATA:
   #      print("run_training：Will generate spectrograms on-the-fly during training.")
   #      if 'filepath' not in df.columns:
   #          df['filepath'] = cfg.train_datadir + '/' + df.filename
   #      if 'samplename' not in df.columns:
   #          df['samplename'] = df.filename.map(lambda x: x.split('/')[0] + '-' + x.split('/')[-1].split('.')[0])


   # self.primary_to_class = dict(zip(taxonomy_df['primary_label'],taxonomy_df['class_name']))




   #      if 'samplename' not in self.df.columns:
   #          self.df['samplename'] = self.df.filename.map(lambda x: x.split('/')[0] + '-' + x.split('/')[-1].split('.')[0])

   #      sample_names = set(self.df['samplename'])