In [9]:
#!/usr/bin/env python
# coding: utf-8

"""
2_feature_extraction.py
-----------------------
用途:
1. 读取 ../data/raw/ 下的音频
2. 根据 chorus_labels.csv 标注, 提取多种音频特征(含Spectral Contrast、Tempogram等)
3. 做滑窗拼帧 (context=1 => 3帧拼接)
4. 对“副歌”帧进行过采样
5. 生成 feature_index.csv 保存特征与标签

运行:
  python 2_feature_extraction.py
生成:
  ../data/processed/feature_index.csv
以及若干 .npy
"""

import os
import numpy as np
import librosa
import pandas as pd

# ============== 全局设定 ================
label_csv = "chorus_labels.csv"
AUDIO_FOLDER = "../data/raw/"
FEATURE_FOLDER = "../data/processed/"
os.makedirs(FEATURE_FOLDER, exist_ok=True)

# 过采样倍数
oversample_factor = 2

# 每段时长(秒)
frame_size = 2.0
# 副歌区间与帧重叠阈值
overlap_threshold = 0.5

def extract_advanced_features(y, sr, hop_length=512):
    """
    提取多种特征: MFCC + Delta + Chroma + Spectral Contrast + ZCR + TempogramMean
    shape => (frames, feature_dim=??), 这里先行提取 (任意), 再在 2s 分段时取平均
    """
    n_mfcc = 13
    mfcc = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=n_mfcc, hop_length=hop_length)
    mfcc_delta = librosa.feature.delta(mfcc)
    chroma = librosa.feature.chroma_stft(y=y, sr=sr, hop_length=hop_length)
    spec_contrast = librosa.feature.spectral_contrast(y=y, sr=sr, hop_length=hop_length)
    zcr = librosa.feature.zero_crossing_rate(y, hop_length=hop_length)

    # tempogram => 取均值
    onset_env = librosa.onset.onset_strength(y=y, sr=sr, hop_length=hop_length)
    tempogram = librosa.feature.tempogram(onset_envelope=onset_env, sr=sr, hop_length=hop_length)
    tempogram_mean = np.mean(tempogram, axis=0, keepdims=True)

    # vertical stack => (feature_dim, frames)
    features = np.vstack([mfcc, mfcc_delta, chroma, spec_contrast, zcr, tempogram_mean])
    # 转置 => (frames, feature_dim)
    features = features.T
    # 归一化
    eps = 1e-8
    mean_ = np.mean(features, axis=0)
    std_  = np.std(features, axis=0) + eps
    features_norm = (features - mean_) / std_
    return features_norm.astype(np.float32)  # shape=(frames, feature_dim)

def main():
    # 1) 读取 chorus_labels
    if not os.path.exists(label_csv):
        raise FileNotFoundError("没有找到 chorus_labels.csv, 无法进行有监督标注.")

    df_labels = pd.read_csv(label_csv)
    label_dict = {}
    for _, row in df_labels.iterrows():
        fname = row["filename"]
        start_sec = float(row["start_sec"])
        end_sec   = float(row["end_sec"])
        label_dict.setdefault(fname, []).append((start_sec,end_sec))

    # 2) 遍历 raw audio
    all_files = [f for f in os.listdir(AUDIO_FOLDER) if f.endswith(".mp3") or f.endswith(".wav")]
    metadata_list = []

    samples_pos = 0
    samples_neg = 0

    for afile in all_files:
        path = os.path.join(AUDIO_FOLDER, afile)
        y, sr = librosa.load(path, sr=None)

        # 提取帧级特征 => (frames, feature_dim)
        hop_length = 512
        all_feats = extract_advanced_features(y, sr, hop_length=hop_length)

        # 计算: 1帧对应时间
        time_per_frame = librosa.frames_to_time(1, sr=sr, hop_length=hop_length)
        frames_per_segment = int(frame_size / time_per_frame)

        segments_feats = []
        segments_label = []
        i=0
        current_t=0.0
        total_frames = all_feats.shape[0]

        while True:
            start_frame = i*frames_per_segment
            end_frame   = (i+1)*frames_per_segment
            if start_frame >= total_frames:
                break
            segment = all_feats[start_frame:end_frame]  # shape=(seg_frames, feature_dim)
            # 取平均
            if len(segment)==0:
                break
            feats_1seg = np.mean(segment, axis=0)

            # label=0/1
            frame_start_sec = current_t
            frame_end_sec   = current_t + frame_size
            is_chorus = 0
            fname = afile
            if fname in label_dict:
                for (s,e) in label_dict[fname]:
                    overlap = min(frame_end_sec,e) - max(frame_start_sec,s)
                    if overlap > frame_size*overlap_threshold:
                        is_chorus=1
                        break

            segments_feats.append(feats_1seg)
            segments_label.append(is_chorus)

            current_t+= frame_size
            i+=1

        # => segments_feats (N, feature_dim=??)
        # context=1 => 每帧与前后帧合并 => 3*feature_dim
        context=1
        seg_array = np.stack(segments_feats, axis=0)  # shape=(N, feat_dim=??)
        merged_feat_seq=[]
        for idx_frame in range(len(seg_array)):
            out_vecs=[]
            for offset in range(-context, context+1):
                idx_c= idx_frame+offset
                if idx_c<0 or idx_c>= len(seg_array):
                    out_vecs.append(np.zeros_like(seg_array[0]))
                else:
                    out_vecs.append(seg_array[idx_c])
            combined = np.concatenate(out_vecs, axis=0) # shape=(3*feat_dim,)
            merged_feat_seq.append((combined, segments_label[idx_frame]))

        # 写入 .npy + 过采样
        for iFrame, (feat_vec, label_val) in enumerate(merged_feat_seq):
            frame_name= f"{afile}_{iFrame}.npy"
            frame_path= os.path.join(FEATURE_FOLDER, frame_name)
            np.save(frame_path, feat_vec.astype(np.float32))

            if label_val==1: samples_pos+=1
            else: samples_neg+=1
            metadata_list.append((frame_path, label_val, afile))

            # oversample
            if label_val==1 and oversample_factor>1:
                for dup_i in range(oversample_factor-1):
                    dup_name= f"{afile}_{iFrame}_dup{dup_i}.npy"
                    dup_path= os.path.join(FEATURE_FOLDER, dup_name)
                    np.save(dup_path, feat_vec.astype(np.float32))
                    metadata_list.append((dup_path, 1, afile))

    print(f"\n原统计 => pos= {samples_pos}, neg= {samples_neg} (不含dup)")
    df_meta= pd.DataFrame(metadata_list, columns=["feature_path","label","filename"])
    df_meta.to_csv(os.path.join(FEATURE_FOLDER, "feature_index.csv"), index=False)
    print("已输出 feature_index.csv =>", len(df_meta), "条记录.")
    print(f"过采样后 => 正例= {sum(df_meta.label==1)}, 负例= {sum(df_meta.label==0)}")
    print("\n特征提取结束(包含高级特征 & context滑窗 & 过采样).")

if __name__=="__main__":
    main()


原统计 => pos= 41, neg= 385 (不含dup)
已输出 feature_index.csv => 467 条记录.
过采样后 => 正例= 82, 负例= 385

特征提取结束(包含高级特征 & context滑窗 & 过采样).
