In [None]:
import json
import os
import numpy as np
import matplotlib.pyplot as plt
import seaborn as sns
from corpus import Synchronizer

def calculate_wps(origin_path: str, cover_path: str, song_dir: str, lambda_wps: float = 1.0) -> float:
    try:
        s = Synchronizer()
        wp = s.get_wp(origin_path, cover_path, song_dir)
        
        t_cover = s.t1
        t_orig = s.t2
        
        if t_cover is None or t_orig is None:
            raise ValueError("時間戳序列未能成功在 Synchronizer 物件中生成。")

        wp_int = wp.astype(int)
        path_t_cover = t_cover[wp_int[0]]
        path_t_orig = t_orig[wp_int[1]]

        coeffs = np.polyfit(path_t_cover, path_t_orig, 1)
        a, b = coeffs[0], coeffs[1]

        t_orig_predicted = a * path_t_cover + b
        deviation = path_t_orig - t_orig_predicted
        sigma_dev = np.std(deviation)
        wps_score = np.exp(-lambda_wps * sigma_dev)
        return wps_score
    except Exception as e:
        print(f"計算 wps 分數時發生錯誤：{e}")
        return 0.0


def analyze_and_visualize_scores():
    """
    主函式：計算所有歌曲各版本的 wps 分數，計算平均值，並進行視覺化。
    """
    # --- 1. 設定 ---
    base_dir = os.path.join(".", "dataset", "eval")
    metadata_path = os.path.join(base_dir, "metadata.json")
    origin_filename = "origin.wav"
    versions = ["human", "etude_e", "etude_d", "picogen", "amtapc", "music2midi"]
    lambda_val = 0.5
    
    # 用於儲存所有分數的字典
    scores_by_version = {v: [] for v in versions}

    # --- 2. 讀取 metadata 並計算分數 ---
    try:
        with open(metadata_path, 'r', encoding='utf-8') as f:
            metadata = json.load(f)
        # print(f"✅ 成功讀取 metadata.json，共找到 {len(metadata)} 首歌曲。")
    except FileNotFoundError:
        print(f"❌ 錯誤：找不到 metadata.json 檔案。")
        return

    for i, song_data in enumerate(metadata):
        dir_name = song_data.get("dir_name")
        if not dir_name:
            continue

        song_dir = os.path.join(base_dir, dir_name)
        # print(f"\n🎵 正在處理歌曲: {song_dir} ({i+1}/{len(metadata)})")

        origin_path = os.path.join(song_dir, origin_filename)
        if not os.path.exists(origin_path):
            print(f"  ↪️ 已跳過 (找不到 origin.wav), {origin_path}")
            continue

        for v in versions:
            cover_path = os.path.join(song_dir, f"{v}.wav")
            if not os.path.exists(cover_path):
                print(f"  ↪️ 已跳過版本 '{v}' (找不到 {v}.wav) {song_dir}")
                continue
            
            # 計算分數
            score = calculate_wps(origin_path, cover_path, song_dir, lambda_wps=lambda_val)
            if score > 0:
                # print(f"  📊 版本 '{v}' 的 wps 分數: {score:.4f}")
                scores_by_version[v].append(score)

    # --- 3. 計算並打印平均分數 ---
    print("\n\n--- 平均分數統計 ---")
    average_scores = {}
    for version, scores in scores_by_version.items():
        if scores:
            avg_score = np.mean(scores)
            average_scores[version] = avg_score
            print(f"版本 {version:<12}: 平均 wps 分數 = {avg_score:.4f} (基於 {len(scores)} 個樣本)")
        else:
            print(f"版本 {version:<12}: 無有效分數可供計算。")
    
    # --- 4. 數據視覺化 ---
    print("\n🎨 正在生成分數分佈圖...")
    
    plt.style.use('seaborn-v0_8-whitegrid')
    fig, ax = plt.subplots(figsize=(12, 7))

    # 為每個版本繪製一條 KDE 曲線
    for version, scores in scores_by_version.items():
        if scores:
            sns.kdeplot(scores, label=version, fill=True, alpha=0.5, ax=ax, lw=2.5)

    ax.set_title('WPS Score Distribution by Version', fontsize=16, pad=20)
    ax.set_xlabel('WPS Score', fontsize=12)
    ax.set_ylabel('Density', fontsize=12)
    ax.set_xlim(0, 1.05)
    ax.legend(title='Version', fontsize=10)
    
    # 儲存圖表
    output_image_path = "wps_score_distribution.png"
    plt.savefig(output_image_path, dpi=300, bbox_inches='tight')
    
    print(f"✅ 分數分佈圖已成功儲存至: {output_image_path}")


if __name__ == '__main__':
    analyze_and_visualize_scores()

In [None]:
import json
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm

# 假設您的 Synchronizer 位於此處
from corpus import Synchronizer 

# 【函式已加入 trim_seconds 參數】
def calculate_nwpd(origin_path: str, cover_path: str, song_dir: str, 
                   lambda_nwpd: float = 1.0, subsample_step: int = 1, 
                   trim_seconds: float = 0) -> float:
    """
    計算正規化路徑偏差 (NWPD) 分數。

    Args:
        ... (原有參數) ...
        subsample_step (int): 對齊路徑的降採樣步長。
        trim_seconds (float): 從路徑的頭尾各裁去幾秒不參與計算。設為 0 表示不裁切。
    """
    try:
        if not isinstance(subsample_step, int) or subsample_step < 1:
            raise ValueError("subsample_step 必須是 >= 1 的整數。")
        if not isinstance(trim_seconds, (int, float)) or trim_seconds < 0:
            raise ValueError("trim_seconds 必須是 >= 0 的數字。")

        s = Synchronizer()
        wp = s.get_wp(origin_path, cover_path, song_dir)
        
        t_cover = s.t1
        t_orig = s.t2
        
        if t_cover is None or t_orig is None:
            raise ValueError("時間戳序列未能成功在 Synchronizer 物件中生成。")

        wp_int = wp.astype(int)
        
        # 降採樣
        wp_to_process = wp_int[:, ::subsample_step] if subsample_step > 1 else wp_int
        
        if wp_to_process.shape[1] < 10: # 點太少則不進行後續處理
            return 0.0

        path_t_cover = t_cover[wp_to_process[0]]
        path_t_orig = t_orig[wp_to_process[1]]

        # --- 【關鍵修改 #1】根據秒數裁切對齊路徑 ---
        if trim_seconds > 0:
            total_duration = path_t_orig[-1]
            # 確保樂曲長度足夠進行裁切
            if total_duration > (2 * trim_seconds):
                start_time = trim_seconds
                end_time = total_duration - trim_seconds
                
                mask = (path_t_orig >= start_time) & (path_t_orig <= end_time)
                
                # 應用遮罩，並確保裁切後仍有足夠的點
                if np.sum(mask) > 10:
                    path_t_cover = path_t_cover[mask]
                    path_t_orig = path_t_orig[mask]
        # ---

        coeffs = np.polyfit(path_t_cover, path_t_orig, 1)
        a, b = coeffs[0], coeffs[1]

        t_orig_predicted = a * path_t_cover + b
        deviation = path_t_orig - t_orig_predicted
        sigma_dev = np.std(deviation)
        nwpd_score = np.exp(-lambda_nwpd * sigma_dev)
        return nwpd_score
    except Exception as e:
        print(f"計算 NWPD 分數時發生錯誤：{e}")
        return 0.0


def analyze_and_visualize_scores():
    """
    主函式：計算所有歌曲各版本的 NWPD 分數，計算平均值，並進行視覺化。
    """
    # --- 1. 設定 ---
    base_dir = os.path.join(".", "dataset", "eval")
    metadata_path = os.path.join(base_dir, "metadata.json")
    origin_filename = "origin.wav"
    versions = ["human", "etude_e", "etude_d", "picogen", "amtapc", "music2midi"]
    lambda_val = 0.5
    
    # 【新增】設定要裁切的頭尾秒數，設為 0 表示不裁切
    TRIM_SECONDS = 10
    
    # 設定降採樣步長，1 表示不進行降採樣
    SUBSAMPLE_STEP = 10 
    
    # 設定用於圖表和報告的顯示名稱
    VERSION_DISPLAY_NAMES = {
        "human": "Human",
        "etude_e": "Etude Extractor",
        "etude_d": "Etude Decoder",
        "picogen": "PiCoGen",
        "amtapc": "AMT-APC",
        "music2midi": "Music2MIDI"
    }
    
    results_list = []

    # --- 2. 讀取 metadata 並計算分數 ---
    try:
        with open(metadata_path, 'r', encoding='utf-8') as f:
            metadata = json.load(f)
        print(f"✅ 成功讀取 metadata.json，將分析 {len(metadata)} 首歌曲。")
        print(f"ℹ️  Warping Path 裁切秒數設定為: {TRIM_SECONDS}s")
        print(f"ℹ️  Warping Path 降採樣步長設定為: {SUBSAMPLE_STEP}")
    except FileNotFoundError:
        print(f"❌ 錯誤：找不到 metadata.json 檔案。")
        return

    for i, song_data in enumerate(tqdm(metadata, desc="Analyzing Songs")):
        dir_name = song_data.get("dir_name")
        if not dir_name: continue

        song_dir = os.path.join(base_dir, dir_name)
        origin_path = os.path.join(song_dir, origin_filename)
        if not os.path.exists(origin_path): continue

        for v in versions:
            cover_path = os.path.join(song_dir, f"{v}.wav")
            if not os.path.exists(cover_path): continue
            
            # 【關鍵修改 #2】呼叫函式時傳入裁切秒數
            score = calculate_nwpd(origin_path, cover_path, song_dir, 
                                   lambda_nwpd=lambda_val, 
                                   subsample_step=SUBSAMPLE_STEP,
                                   trim_seconds=TRIM_SECONDS)
            if score > 0:
                results_list.append({
                    'Version': VERSION_DISPLAY_NAMES.get(v, v), # 直接使用顯示名稱
                    'WPS Score': score
                })

    # --- 3. 計算並打印平均分數 ---
    df = pd.DataFrame(results_list)
    if df.empty:
        print("未能計算出任何有效分數。")
        return
        
    print("\n\n--- 平均分數統計 ---")
    # 根據平均分數由高到低排序
    sorted_means = df.groupby('Version')['WPS Score'].mean().sort_values(ascending=False)
    print(sorted_means)
    
    # --- 4. 數據視覺化 ---
    print("\n🎨 正在生成分數分佈圖...")
    
    # 獲取排序後的顯示名稱列表
    order = sorted_means.index

    plt.style.use('seaborn-v0_8-whitegrid')
    fig, ax = plt.subplots(figsize=(12, 8))

    sns.boxplot(data=df, x='WPS Score', y='Version', order=order, palette='viridis', ax=ax)

    title = f'WPS Distribution (Trim: {TRIM_SECONDS}s, Subsample: {SUBSAMPLE_STEP})'
    ax.set_title(title, fontsize=16, pad=20)
    ax.set_xlabel('WPS', fontsize=12)
    ax.set_ylabel('Version', fontsize=12)
    ax.set_xlim(-0.05, 1.05)
    ax.grid(axis='x', linestyle='--', alpha=0.7)
    
    # 儲存圖表
    output_image_path = f"nwpd_score_dist_trim{int(TRIM_SECONDS)}_step{SUBSAMPLE_STEP}.png"
    plt.savefig(output_image_path, dpi=300, bbox_inches='tight')
    
    print(f"✅ 分數分佈圖已成功儲存至: {output_image_path}")


if __name__ == '__main__':
    analyze_and_visualize_scores()

In [None]:
import json
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm

# 假設您的 Synchronizer 位於此處
from corpus import Synchronizer 

def calculate_nwpd_trimmed(origin_path: str, cover_path: str, song_dir: str, 
                           lambda_nwpd: float = 1.0, trim_seconds: float = 10.0) -> float:
    """
    計算正規化路徑偏差 (NWPD) 分數。
    【新版】：此版本會裁去對齊路徑的頭尾部分再進行計算。
    """
    try:
        s = Synchronizer()
        wp = s.get_wp(origin_path, cover_path, song_dir)
        
        t_cover = s.t1
        t_orig = s.t2
        
        if t_cover is None or t_orig is None:
            raise ValueError("時間戳序列未能成功在 Synchronizer 物件中生成。")

        wp_int = wp.astype(int)
        path_t_cover = t_cover[wp_int[0]]
        path_t_orig = t_orig[wp_int[1]]
        
        # --- 【關鍵修改】裁切對齊路徑的頭尾 ---
        
        # 1. 檢查樂曲總時長是否足夠進行裁切
        total_duration = path_t_orig[-1]
        
        trimmed_path_t_cover = path_t_cover
        trimmed_path_t_orig = path_t_orig

        if total_duration > (2 * trim_seconds):
            # 2. 找出需要保留的時間區間
            start_time = trim_seconds
            end_time = total_duration - trim_seconds
            
            # 3. 建立布林遮罩 (mask) 來過濾在這個區間內的點
            mask = (path_t_orig >= start_time) & (path_t_orig <= end_time)
            
            # 應用遮罩
            trimmed_path_t_cover = path_t_cover[mask]
            trimmed_path_t_orig = path_t_orig[mask]
            
            # 如果裁切後點太少，則不進行裁切
            if len(trimmed_path_t_orig) < 10:
                trimmed_path_t_cover = path_t_cover
                trimmed_path_t_orig = path_t_orig
        
        # --- 後續計算使用裁切後的路徑 ---

        coeffs = np.polyfit(trimmed_path_t_cover, trimmed_path_t_orig, 1)
        a, b = coeffs[0], coeffs[1]

        t_orig_predicted = a * trimmed_path_t_cover + b
        deviation = trimmed_path_t_orig - t_orig_predicted
        sigma_dev = np.std(deviation)
        nwpd_score = np.exp(-lambda_nwpd * sigma_dev)
        
        return nwpd_score
    
    except Exception as e:
        print(f"計算 NWPD 分數時發生錯誤：{e}, 檔案: {os.path.basename(cover_path)}")
        return 0.0


def analyze_and_visualize_scores_detailed():
    """
    主函式：計算所有歌曲各版本的 NWPD 分數（使用裁切版），並進行分析。
    """
    # --- 1. 設定 ---
    base_dir = os.path.join(".", "dataset", "eval")
    metadata_path = os.path.join(base_dir, "metadata.json")
    origin_filename = "origin.wav"
    versions = ["human", "etude_e", "etude_d", "picogen", "amtapc", "music2midi"]
    lambda_val = 0.5
    trim_seconds = 20.0 # 設定要裁切的秒數
    
    results_list = []

    # --- 2. 讀取 metadata 並計算分數 ---
    try:
        with open(metadata_path, 'r', encoding='utf-8') as f:
            metadata = json.load(f)
        print(f"✅ 成功讀取 metadata.json，將分析 {len(metadata)} 首歌曲。")
    except FileNotFoundError:
        print(f"❌ 錯誤：找不到 metadata.json 檔案。")
        return

    for song_data in tqdm(metadata, desc="Analyzing Songs"):
        dir_name = song_data.get("dir_name")
        if not dir_name: continue

        song_dir = os.path.join(base_dir, dir_name)
        origin_path = os.path.join(song_dir, origin_filename)
        if not os.path.exists(origin_path): continue

        for v in versions:
            cover_path = os.path.join(song_dir, f"{v}.wav")
            if not os.path.exists(cover_path): continue
            
            # 【關鍵修改】呼叫新的裁切版函式
            score = calculate_nwpd_trimmed(origin_path, cover_path, song_dir, 
                                           lambda_nwpd=lambda_val, trim_seconds=trim_seconds)
            if score > 0:
                results_list.append({
                    'song': dir_name,
                    'version': v,
                    'nwpd_score': score
                })

    if not results_list:
        print("未能計算出任何有效分數。")
        return
        
    df = pd.DataFrame(results_list)
    
    # --- 3. 輸出詳細統計與低分歌曲 ---
    print("\n\n--- 各版本【裁切後】NWPD 分數詳細統計 ---")
    detailed_stats = df.groupby('version')['nwpd_score'].describe().sort_values('mean', ascending=False)
    print(detailed_stats)

    print("\n--- 【裁切後】分數最低的 10 首 'human' 演奏歌曲 ---")
    human_scores_df = df[df['version'] == 'human']
    lowest_human_scores = human_scores_df.sort_values(by='nwpd_score').head(10)
    print(lowest_human_scores)

    # --- 4. 數據視覺化 ---
    print("\n🎨 正在生成【裁切後】分數分佈圖 (Box Plot)...")
    order = detailed_stats.index
    plt.style.use('seaborn-v0_8-whitegrid')
    plt.figure(figsize=(12, 8))
    sns.boxplot(data=df, x='nwpd_score', y='version', order=order, palette='viridis')
    plt.title(f'NWPD Score Distribution (Trimmed by {trim_seconds}s)', fontsize=18, pad=20)
    plt.xlabel('NWPD Score (Higher is Better)', fontsize=14)
    plt.ylabel('Version', fontsize=14)
    plt.xlim(-0.05, 1.05)
    plt.grid(axis='x', linestyle='--', alpha=0.7)
    
    output_image_path = "nwpd_score_boxplot_trimmed.png"
    plt.savefig(output_image_path, dpi=300, bbox_inches='tight')
    
    print(f"✅ 分數分佈圖已成功儲存至: {output_image_path}")


if __name__ == '__main__':
    # 確保 pandas, seaborn, tqdm 已安裝
    # pip install pandas seaborn tqdm
    analyze_and_visualize_scores_detailed()

In [None]:
import os
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm # 用於顯示進度條

from evaluation import IPECalculator

def analyze_dataset_for_ipe_params():
    """
    遍歷資料集，計算所有人類演奏的香農熵，並輸出統計數據以決定 IPE 參數。
    """
    dataset_dir = "./dataset/synced/"
    if not os.path.exists(dataset_dir):
        print(f"錯誤：找不到資料集目錄 {dataset_dir}")
        return

    # 初始化計算器。mu 和 sigma 在這裡不重要，但 n_gram 和 n_clusters 會影響熵的計算
    ipe_calculator = IPECalculator(n_gram=5, n_clusters=16)
    
    entropy_values = []
    
    # 獲取所有子目錄
    subdirectories = [d for d in os.scandir(dataset_dir) if d.is_dir()]
    
    print(f"正在分析 {len(subdirectories)} 首人類演奏歌曲...")
    
    # 使用 tqdm 顯示進度條
    for entry in tqdm(subdirectories, desc="Analyzing songs"):
        json_path = os.path.join(entry.path, "cover.json")
        
        if os.path.exists(json_path):
            # 我們只需要計算熵值
            results = ipe_calculator.calculate_ipe(json_path)
            if "shannon_entropy" in results:
                entropy_values.append(results["shannon_entropy"])

    if not entropy_values:
        print("未能在資料集中計算出任何熵值。")
        return
        
    # --- 統計分析 ---
    entropy_series = pd.Series(entropy_values)
    stats = entropy_series.describe()
    
    mean_entropy = stats['mean']
    std_entropy = stats['std']
    
    print("\n\n--- 人類演奏資料集熵值統計分析 ---")
    print(stats)
    
    print("\n--- 建議的 IPE 參數值 ---")
    print(f"建議的 𝜇_Hn (mu_entropy): {mean_entropy:.4f}")
    print(f"建議的 σ_c (sigma_entropy): {std_entropy:.4f}  (這是一個好的起始點，您可以根據需要調整)")

    # --- 視覺化 ---
    print("\n正在生成熵值分佈圖...")
    plt.style.use('seaborn-v0_8-whitegrid')
    plt.figure(figsize=(12, 6))
    sns.histplot(entropy_series, kde=True, bins=50)
    plt.axvline(mean_entropy, color='r', linestyle='--', label=f'Mean: {mean_entropy:.2f}')
    plt.axvline(mean_entropy + std_entropy, color='g', linestyle=':', label=f'+1 Std Dev: {mean_entropy + std_entropy:.2f}')
    plt.axvline(mean_entropy - std_entropy, color='g', linestyle=':', label=f'-1 Std Dev: {mean_entropy - std_entropy:.2f}')
    plt.title('人類演奏資料集的香農熵 (H_n) 分佈', fontsize=16)
    plt.xlabel('Shannon Entropy', fontsize=12)
    plt.ylabel('歌曲數量 (Count)', fontsize=12)
    plt.legend()
    plt.savefig("ipe_entropy_distribution.png", dpi=300)
    print("✅ 熵值分佈圖已儲存為 ipe_entropy_distribution.png")


if __name__ == '__main__':
    # 安裝必要的函式庫
    # pip install pandas matplotlib seaborn tqdm
    analyze_dataset_for_ipe_params()

In [None]:
import os
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm

# 假設您的 IpeCalculator 類別儲存在 evaluation/IPE.py
from evaluation.IPE import IPECalculator 


VERSION_DISPLAY_NAMES = {
    "human": "Human",
    "etude_e": "Etude Extractor",
    "etude_d": "Etude Decoder",
    "picogen": "PiCoGen",
    "amtapc": "AMT-APC",
    "music2midi": "Music2MIDI"
}

def evaluate_models_with_ipe():
    """
    使用校準後的 IPE 參數，評估 eval 資料集中各個模型的表現。
    """
    # --- 1. 設定 ---
    eval_dir = "./dataset/eval"
    metadata_path = os.path.join(eval_dir, "metadata.json")
    versions = ["cover", "picogen", "amtapc", "music2midi", "etude_e", "etude_d"]
    
    # 使用您從 4751 首歌曲中分析出的黃金參數
    EMPIRICAL_MU = 10.2402
    EMPIRICAL_SIGMA = 0.7174
    
    # 初始化計算器
    ipe_calculator = IPECalculator(
        mu_entropy=EMPIRICAL_MU, 
        sigma_entropy=EMPIRICAL_SIGMA,
        n_gram=5, 
        n_clusters=16 # 確保與分析時的參數一致
    )

    # --- 2. 讀取 metadata 並計算分數 ---
    try:
        with open(metadata_path, 'r', encoding='utf-8') as f:
            metadata = json.load(f)
        print(f"✅ 成功讀取 metadata.json，將分析 {len(metadata)} 首歌曲。")
    except FileNotFoundError:
        print(f"❌ 錯誤：找不到 metadata.json 檔案。")
        return

    results_list = []
    for song_data in tqdm(metadata, desc="Evaluating Songs"):
        dir_name = song_data.get("dir_name")
        if not dir_name: continue

        song_dir = os.path.join(eval_dir, dir_name)
        
        for version in versions:
            midi_path = os.path.join(song_dir, f"{version}.mid")
            if not os.path.exists(midi_path): continue

            results = ipe_calculator.calculate_ipe(midi_path)
            if "error" not in results:
                results_list.append({
                    "song": dir_name,
                    "version": VERSION_DISPLAY_NAMES.get(version, version),
                    "ipe_score": results["ipe_score"],
                    "entropy": results["shannon_entropy"]
                })

    # --- 3. 使用 Pandas 進行統計分析 ---
    if not results_list:
        print("未計算出任何有效分數。")
        return

    df = pd.DataFrame(results_list)
    
    print("\n\n--- 各版本 IPE 分數統計摘要 ---")
    # 根據平均分數進行排序
    summary = df.groupby('version')['ipe_score'].describe().sort_values('mean', ascending=False)
    print(summary)
    
    print("\n--- 各版本平均熵值 (與理想值 10.8956 比較) ---")
    mean_entropy = df.groupby('version')['entropy'].mean().sort_values(ascending=False)
    print(mean_entropy)

    # --- 4. 數據視覺化 ---
    print("\n🎨 正在生成分數分佈的箱形圖 (Box Plot)...")
    
    # 根據平均分對版本進行排序，讓圖表更清晰
    order = summary.index 
    
    plt.style.use('seaborn-v0_8-whitegrid')
    plt.figure(figsize=(12, 8))
    
    sns.boxplot(data=df, x='ipe_score', y='version', order=order, palette='viridis')
    
    plt.title('IPE Score Distribution', fontsize=18, pad=20)
    plt.xlabel('IPE Score', fontsize=14)
    plt.ylabel('Version', fontsize=14)
    plt.xlim(-0.05, 1.05)
    plt.grid(axis='x', linestyle='--', alpha=0.7)
    
    output_image_path = "ipe_evaluation_results.png"
    plt.savefig(output_image_path, dpi=300, bbox_inches='tight')
    
    print(f"✅ 評估結果圖表已成功儲存至: {output_image_path}")


if __name__ == '__main__':
    evaluate_models_with_ipe()

In [None]:
import os
import json
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from tqdm import tqdm

# 假設您的 IpeCalculator 類別儲存在 evaluation/IPE.py
from evaluation.IPE import IPECalculator 

def run_deep_dive_analysis():
    """
    執行 IPE 指標的深度診斷分析，收集並呈現中間數據。
    """
    # --- 1. 設定 ---
    eval_dir = "./dataset/eval"
    metadata_path = os.path.join(eval_dir, "metadata.json")
    versions = ["cover", "music2midi", "etude_d", "etude_e", "picogen", "amtapc"]
    
    # 使用您校準後的參數
    EMPIRICAL_MU = 10.2402
    EMPIRICAL_SIGMA = 0.7174
    
    ipe_calculator = IPECalculator(
        mu_entropy=EMPIRICAL_MU, 
        sigma_entropy=EMPIRICAL_SIGMA,
        n_gram=5, 
        n_clusters=16
    )
    
    # 修改 IPECalculator 的 IOI 提取方法，以回傳中間數據
    original_get_ioi_sequence = ipe_calculator.get_ioi_sequence
    def get_ioi_with_stats(midi_path: str):
        # 這是一個 wrapper 函式，用於捕獲預處理前後的數據
        try:
            midi_data = pretty_midi.PrettyMIDI(midi_path)
            onsets = []
            for instrument in midi_data.instruments:
                if not instrument.is_drum:
                    onsets.extend([note.start for note in instrument.notes])
            if len(onsets) < 2: return None, {}
            
            unique_onsets = np.unique(onsets)
            unique_onsets.sort()
            if len(unique_onsets) < 2: return None, {}
            
            raw_ioi = np.diff(unique_onsets)
            processed_ioi = ipe_calculator._process_raw_ioi(raw_ioi)
            
            stats = {
                'raw_ioi_count': len(raw_ioi),
                'processed_ioi_count': len(processed_ioi),
                'filtered_percent': (1 - len(processed_ioi) / len(raw_ioi)) * 100 if len(raw_ioi) > 0 else 0,
                'capped_count': np.sum(raw_ioi > ipe_calculator.max_ioi)
            }
            return processed_ioi, stats
        except Exception:
            return None, {}

    ipe_calculator.get_ioi_sequence = get_ioi_with_stats # Monkey-patch a a method
    
    # --- 2. 收集數據 ---
    results_list = []
    metadata = json.load(open(metadata_path, 'r', encoding='utf-8'))

    for song_data in tqdm(metadata, desc="Deep Dive Analysis"):
        dir_name = song_data.get("dir_name")
        if not dir_name: continue
        song_dir = os.path.join(eval_dir, dir_name)
        
        for version in versions:
            midi_path = os.path.join(song_dir, f"{version}.mid")
            if not os.path.exists(midi_path): continue
            
            # 修改後的 ioi 提取方法會回傳額外統計數據
            ioi_sequence, io_stats = ipe_calculator.get_ioi_sequence(midi_path)
            
            if ioi_sequence is None or ioi_sequence.size == 0: continue

            # 繼續計算熵等指標
            symbol_sequence = ipe_calculator.quantize_ioi_to_symbols(ioi_sequence)
            if symbol_sequence.size == 0: continue
            ngrams = ipe_calculator.get_ngrams_from_sequence(symbol_sequence, ipe_calculator.n_gram)
            entropy = ipe_calculator.get_shannon_entropy(ngrams)
            ipe_score = np.exp(-((entropy - ipe_calculator.mu_entropy)**2) / (2 * ipe_calculator.sigma_entropy**2))
            
            result_item = {
                'song': dir_name, 'version': version, 'ipe_score': ipe_score, 'entropy': entropy
            }
            result_item.update(io_stats) # 將 IOI 統計數據加入結果
            results_list.append(result_item)

    # --- 3. 數據分析與呈現 ---
    if not results_list:
        print("未能收集到任何有效數據。")
        return

    df = pd.DataFrame(results_list)
    
    # 診斷一：熵值分佈
    print("\n\n--- 診斷一：各版本『香農熵』詳細統計 ---")
    entropy_stats = df.groupby('version')['entropy'].describe().sort_values('mean', ascending=False)
    print(entropy_stats)

    plt.figure(figsize=(12, 7))
    sns.kdeplot(data=df, x='entropy', hue='version', fill=True, alpha=0.5, palette='viridis')
    plt.axvline(EMPIRICAL_MU, color='r', linestyle='--', label=f'Ideal μ: {EMPIRICAL_MU:.2f}')
    plt.title('H Distribution', fontsize=16)
    plt.xlabel('Shannon Entropy')
    plt.legend()
    plt.savefig('entropy_distribution_analysis.png', dpi=300)
    print("✅ 熵值分佈圖已儲存至 entropy_distribution_analysis.png")
    plt.show()

    # 診斷二：IOI 預處理影響
    print("\n\n--- 診斷二：IOI 預處理影響分析 (平均值) ---")
    processing_stats = df.groupby('version')[['filtered_percent', 'capped_count']].mean().sort_values('filtered_percent', ascending=False)
    print(processing_stats)

    # 診斷三：K-Means 用詞習慣 (以分數差異最大的一首歌為例)
    print("\n\n--- 診斷三：節奏符號使用頻率比較 (範例) ---")
    # 找到 cover 和 music2midi 分數差異最大的一首歌
    pivot_df = df.pivot(index='song', columns='version', values='ipe_score')
    pivot_df['diff'] = (pivot_df['music2midi'] - pivot_df['cover']).abs()
    sample_song_dir = pivot_df.nlargest(1, 'diff').index[0]
    print(f"以分數差異最大的歌曲 '{sample_song_dir}' 為例進行分析:")
    
    fig, axs = plt.subplots(1, 2, figsize=(15, 5), sharey=True)
    
    for i, version in enumerate(['cover', 'music2midi']):
        midi_path = os.path.join(eval_dir, sample_song_dir, f"{version}.mid")
        ioi_sequence, _ = ipe_calculator.get_ioi_sequence(midi_path)
        symbol_sequence = ipe_calculator.quantize_ioi_to_symbols(ioi_sequence)
        
        if symbol_sequence.size > 0:
            sns.histplot(symbol_sequence, ax=axs[i], bins=ipe_calculator.n_clusters, kde=False)
            axs[i].set_title(f"Symbol Usage - {version} @ {sample_song_dir}")
            axs[i].set_xticks(range(ipe_calculator.n_clusters))

    plt.suptitle("Freq", fontsize=16)
    plt.savefig('symbol_usage_analysis.png', dpi=300)
    print("✅ 節奏符號使用頻率比較圖已儲存至 symbol_usage_analysis.png")
    plt.show()


if __name__ == '__main__':
    # monkey-patch a class a method a little bit
    from evaluation.IPE import pretty_midi
    run_deep_dive_analysis()

In [None]:
import os
import json
import numpy as np
import pandas as pd
from tqdm import tqdm
import matplotlib.pyplot as plt
import seaborn as sns

# 假設您的 RGCCalculator 類別位於此處
from evaluation import RGCCalculator

def get_genre_from_dirname(dir_name: str) -> str:
    """
    根據目錄名稱推斷音樂類型。
    """
    dir_name_upper = dir_name.upper()
    if "CPOP" in dir_name_upper:
        return "CPOP"
    elif "JPOP" in dir_name_upper:
        return "JPOP"
    elif "KPOP" in dir_name_upper:
        return "KPOP"
    elif "WESTERN" in dir_name_upper:
        return "WESTERN"
    else:
        return "UNKNOWN"

def analyze_rgc_by_genre_and_overall():
    """
    主函式：計算所有歌曲各版本的 RGC 分數，先按音樂類型分析，最後再進行總體分析。
    """
    # --- 1. 設定 ---
    eval_dir = "./dataset/eval"
    metadata_path = os.path.join(eval_dir, "metadata.json")
    versions = ["cover", "picogen", "etude_d", "music2midi", "amtapc", "etude_e"]
    genres_to_analyze = ["CPOP", "JPOP", "KPOP", "WESTERN"]
    
    VERSION_DISPLAY_NAMES = {
        "cover": "Human",
        "picogen": "PiCoGen",
        "etude_d": "Etude Decoder",
        "music2midi": "Music2MIDI",
        "amtapc": "AMT-APC",
        "etude_e": "Etude Extractor"
    }
    
    # 使用您覺得效果最好的微調參數
    rgc_calc = RGCCalculator(
        reasonable_bpm_range=(60, 240),
        tau_falloff_sigma=0.03,
        lambda_grid_fit=10.0
    )

    # --- 2. 一次性遍歷所有歌曲並計算分數 ---
    try:
        with open(metadata_path, 'r', encoding='utf-8') as f:
            metadata = json.load(f)
        print(f"✅ 成功讀取 metadata.json，將分析 {len(metadata)} 首歌曲。")
    except FileNotFoundError:
        print(f"❌ 錯誤：找不到 metadata.json 檔案於 {metadata_path}")
        return
        
    results_list = []
    for song_data in tqdm(metadata, desc="Calculating all RGC scores"):
        dir_name = song_data.get("dir_name")
        if not dir_name: continue
        
        genre = get_genre_from_dirname(dir_name)
        song_dir = os.path.join(eval_dir, dir_name)
        
        for version in versions:
            midi_path = os.path.join(song_dir, f"{version}.mid")
            if not os.path.exists(midi_path): continue
            
            results = rgc_calc.calculate_rgc(midi_path)
            if "error" not in results:
                results['song'] = dir_name
                results['version'] = version
                results['genre'] = genre
                results_list.append(results)

    if not results_list:
        print("未能計算出任何有效分數，無法生成報告。")
        return

    df_all = pd.DataFrame(results_list)
    df_all['display_name'] = df_all['version'].map(VERSION_DISPLAY_NAMES)
    
    print("\n✅ 所有歌曲分數計算完畢，開始按類型進行分析...")

    # --- 3. 按音樂類型遍歷，分別進行分析與呈現 ---
    for genre in genres_to_analyze:
        print(f"\n\n{'='*25} 分析報告: {genre} {'='*25}")
        
        df_genre = df_all[df_all['genre'] == genre].copy()
        
        if df_genre.empty:
            print(f"在資料集中找不到類型為 '{genre}' 的歌曲，已跳過。")
            continue
            
        # ... (此處省略了分項報告的 print 和繪圖邏輯，與您提供的一致) ...
        print(f"\n--- {genre} 類型 RGC 分數統計 ---")
        summary = df_genre.groupby('display_name')['rgc_score'].describe().sort_values('mean', ascending=False)
        print(summary)
        
        print(f"\n--- {genre} 類型『基本節拍單位 τ』統計 (秒) ---")
        tau_summary = df_genre.groupby('display_name')['inferred_tau'].describe()
        print(tau_summary[['mean', 'std']])
        
        order = summary.index 
        fig, ax = plt.subplots(figsize=(12, 8))
        sns.boxplot(data=df_genre, x='rgc_score', y='display_name', order=order, palette='plasma')
        ax.set_title(f'RGC Score Distribution for {genre}', fontsize=18, pad=20)
        ax.set_xlabel('RGC Score', fontsize=14)
        ax.set_ylabel('Version', fontsize=14)
        ax.set_xlim(-0.05, max(df_genre['rgc_score'].max() * 1.1, 0.8) if not df_genre.empty else 1.0)
        ax.grid(axis='x', linestyle='--', alpha=0.7)
        output_image_path = f"rgc_evaluation_{genre}.png"
        plt.savefig(output_image_path, dpi=300, bbox_inches='tight')
        plt.close(fig)
        print(f"✅ {genre} 類型評估圖表已成功儲存至: {output_image_path}")

    # --- 【關鍵修改】新增總體分析報告 ---
    print(f"\n\n{'='*25} 總體分析報告: ALL GENRES ({len(metadata)}首歌曲) {'='*25}")

    print("\n--- 總體 RGC 分數統計 ---")
    summary_all = df_all.groupby('display_name')['rgc_score'].describe().sort_values('mean', ascending=False)
    print(summary_all)

    print("\n--- 總體『基本節拍單位 τ』統計 (秒) ---")
    tau_summary_all = df_all.groupby('display_name')['inferred_tau'].describe()
    print(tau_summary_all[['mean', 'std']])
    
    # --- 繪製總體圖表 ---
    print("\n🎨 正在生成總體分數分佈圖...")
    
    order_all = summary_all.index 
    plt.style.use('seaborn-v0_8-whitegrid')
    fig, ax = plt.subplots(figsize=(12, 8))
    
    sns.boxplot(data=df_all, x='rgc_score', y='display_name', order=order_all, palette='viridis')
    
    ax.set_title('Overall RGC Score Distribution (All Genres)', fontsize=18, pad=20)
    ax.set_xlabel('RGC Score', fontsize=14)
    ax.set_ylabel('Version', fontsize=14)
    ax.set_xlim(-0.05, max(df_all['rgc_score'].max() * 1.1, 0.8))
    ax.grid(axis='x', linestyle='--', alpha=0.7)
    
    output_image_path_overall = "rgc_evaluation_OVERALL.png"
    plt.savefig(output_image_path_overall, dpi=300, bbox_inches='tight')
    plt.close(fig)
    
    print(f"✅ 總體評估圖表已成功儲存至: {output_image_path_overall}")
    
    print("\n\n🎉 所有類型的分析與視覺化均已完成！")


if __name__ == '__main__':
    # 確保您的 RGCCalculator 類別定義在 evaluation.py 中，或修改下面的 import 路徑
    from evaluation import RGCCalculator
    analyze_rgc_by_genre_and_overall()

In [None]:
import os
import json
import numpy as np
import pandas as pd
from tqdm import tqdm
import matplotlib.pyplot as plt
import seaborn as sns

# 假設您的 RGCCalculator 類別位於此處
from evaluation import RGCCalculator
from evaluation import RCCalculator


def get_genre_from_dirname(dir_name: str) -> str:
    """
    根據目錄名稱推斷音樂類型。
    """
    dir_name_upper = dir_name.upper()
    if "CPOP" in dir_name_upper:
        return "CPOP"
    elif "JPOP" in dir_name_upper:
        return "JPOP"
    elif "KPOP" in dir_name_upper:
        return "KPOP"
    elif "WESTERN" in dir_name_upper:
        return "WESTERN"
    else:
        return "UNKNOWN"

def analyze_rgc_by_genre():
    """
    主函式：計算所有歌曲各版本的 RGC 分數，然後按音樂類型分別進行分析與視覺化。
    """
    # --- 1. 設定 ---
    eval_dir = "./dataset/eval"
    metadata_path = os.path.join(eval_dir, "metadata.json")
    versions = ["cover", "picogen", "etude_d", "etude_d_d", "music2midi", "amtapc", "etude_e"]
    genres_to_analyze = ["CPOP", "JPOP", "KPOP", "WESTERN"]
    
    # 使用您覺得效果最好的微調參數
    LAMBDA_GRID_FIT = 10.0
    
    rc_calc = RCCalculator(
        top_k=8,
        lambda_grid_fit=LAMBDA_GRID_FIT
    )

    # --- 2. 一次性遍歷所有歌曲並計算分數 ---
    try:
        with open(metadata_path, 'r', encoding='utf-8') as f:
            metadata = json.load(f)
        print(f"✅ 成功讀取 metadata.json，將分析 {len(metadata)} 首歌曲。")
    except FileNotFoundError:
        print(f"❌ 錯誤：找不到 metadata.json 檔案於 {metadata_path}")
        return
        
    results_list = []
    for song_data in tqdm(metadata, desc="Calculating all RC scores"):
        dir_name = song_data.get("dir_name")
        if not dir_name: continue
        
        genre = get_genre_from_dirname(dir_name)
        song_dir = os.path.join(eval_dir, dir_name)
        
        for version in versions:
            midi_path = os.path.join(song_dir, f"{version}.mid")
            if not os.path.exists(midi_path): continue
            
            results = rc_calc.calculate_rc(midi_path)
            if "error" not in results:
                results['song'] = dir_name
                results['version'] = version
                results['genre'] = genre # 將類型資訊加入結果
                results_list.append(results)

    if not results_list:
        print("未能計算出任何有效分數，無法生成報告。")
        return

    # 建立包含所有結果的主 DataFrame
    df_all = pd.DataFrame(results_list)
    print("\n✅ 所有歌曲分數計算完畢，開始按類型進行分析...")

    # --- 3. 【關鍵修改】按音樂類型遍歷，分別進行分析與呈現 ---
    for genre in genres_to_analyze:
        print(f"\n\n{'='*25} 分析報告: {genre} {'='*25}")
        
        # 篩選出當前類型的數據
        df_genre = df_all[df_all['genre'] == genre].copy()
        
        if df_genre.empty:
            print(f"在資料集中找不到類型為 '{genre}' 的歌曲，已跳過。")
            continue
            
        print(f"\n--- {genre} 類型 RC 分數統計 ---")
        summary = df_genre.groupby('version')['rc_score'].describe().sort_values('mean', ascending=False)
        print(summary)
        
        # print(f"\n--- {genre} 類型『基本節拍單位 τ』統計 (秒) ---")
        # tau_summary = df_genre.groupby('version')['inferred_tau'].describe()
        # print(tau_summary[['mean', 'std']])
        
        # --- 繪製該類型的圖表 ---
        print(f"\n🎨 正在生成 {genre} 類型的分數分佈圖...")
        
        order = summary.index 
        plt.style.use('seaborn-v0_8-whitegrid')
        fig, ax = plt.subplots(figsize=(12, 8)) # 為每個類型建立新的圖表
        
        sns.boxplot(data=df_genre, x='rc_score', y='version', order=order, palette='plasma')
        
        ax.set_title(f'RGC Score Distribution for {genre}', fontsize=18, pad=20)
        ax.set_xlabel('RGC Score', fontsize=14)
        ax.set_ylabel('Version', fontsize=14)
        ax.set_xlim(-0.05, max(df_genre['rc_score'].max() * 1.1, 0.8))
        ax.grid(axis='x', linestyle='--', alpha=0.7)
        
        output_image_path = f"rgc_evaluation_{genre}.png"
        plt.savefig(output_image_path, dpi=300, bbox_inches='tight')
        plt.close(fig) # 關閉當前的圖表，避免在下一次迴圈中重疊
        
        print(f"✅ {genre} 類型評估圖表已成功儲存至: {output_image_path}")

    print("\n\n🎉 所有類型的分析與視覺化均已完成！")


if __name__ == '__main__':
    analyze_rgc_by_genre()

In [8]:
import os
import json
import pandas as pd
from tqdm import tqdm

from evaluation import WPDCalculator

def test_density_penalty_weights():
    """
    測試不同的 density_penalty_weight 值對最終模型排名的影響。
    """
    # --- 1. 設定 ---
    EVAL_DIR = "./dataset/eval"
    METADATA_PATH = os.path.join(EVAL_DIR, "metadata.json")
    VERSIONS = ["cover", "etude_e", "etude_d", "etude_d_d", "picogen", "amtapc", "music2midi"]
    VERSION_DISPLAY_NAMES = {
        "cover": "Human", "etude_e": "Etude Extractor", "etude_d_d": "Etude Decoder - Default",
        "etude_d": "Etude Decoder - Prompted", "picogen": "PiCoGen", "amtapc": "AMT-APC", "music2midi": "Music2MIDI"
    }
    
    # 【關鍵】設定一組您想要測試的權重值
    weights_to_test = [0, 0.05, 0.1, 0.5]

    all_results = []
    
    try:
        with open(METADATA_PATH, 'r', encoding='utf-8') as f:
            metadata = json.load(f)
    except FileNotFoundError:
        print(f"❌ 錯誤：找不到 metadata.json 檔案於 {METADATA_PATH}")
        return

    # --- 2. 遍歷所有權重和所有歌曲 ---
    for weight in tqdm(weights_to_test, desc="Testing Penalty Weights"):
        # 為每個權重實例化一個新的計算器
        wpd_calc = WPDCalculator(
            subsample_step=1,
            trim_seconds=10,
            density_penalty_weight=weight
        )
        
        for song_data in metadata:
            dir_name = song_data.get("dir_name")
            if not dir_name: continue
            
            song_dir = os.path.join(EVAL_DIR, dir_name)
            origin_wav_path = os.path.join(song_dir, "origin.wav")
            if not os.path.exists(origin_wav_path): continue
            
            for version in VERSIONS:
                # WPD-N 需要 .mid 或 .json 來計算音符密度
                # 我們假設檔名與 .wav 對應
                file_path = os.path.join(song_dir, f"{version}.mid")
                if not os.path.exists(file_path):
                     file_path = os.path.join(song_dir, f"{version}.json")
                if not os.path.exists(file_path): continue

                wav_path = os.path.join(song_dir, f"{version}.wav")
                if not os.path.exists(wav_path): continue
                
                results = wpd_calc.calculate_wpd(origin_wav_path, file_path, song_dir)
                if "error" not in results:
                    results['version'] = VERSION_DISPLAY_NAMES.get(version, version)
                    results['penalty_weight'] = weight
                    all_results.append(results)

    # --- 3. 使用 Pandas 進行分析與報告 ---
    if not all_results:
        print("未能計算出任何有效分數。")
        return
        
    df = pd.DataFrame(all_results)
    
    print("\n\n" + "="*20 + " WPD-N 權重敏感度分析報告 " + "="*20)

    # 報告一：不同權重下的平均誤差值
    print("\n--- 報告一：各版本在不同權重下的平均誤差值 (越低越好) ---")
    pivot_scores = df.pivot_table(
        index='version', 
        columns='penalty_weight', 
        values='wpd_score',
        aggfunc='mean'
    )
    print(pivot_scores)
    
    # 報告二：不同權重下的模型排名
    print("\n\n--- 報告二：各版本在不同權重下的排名變化 (1=最好) ---")
    pivot_ranks = pivot_scores.rank(axis=0, method='min').astype(int)
    print(pivot_ranks)

test_density_penalty_weights()

Testing Penalty Weights: 100%|██████████| 4/4 [01:28<00:00, 22.08s/it]




--- 報告一：各版本在不同權重下的平均誤差值 (越低越好) ---
penalty_weight                0.00      0.05      0.10      0.50
version                                                         
AMT-APC                   0.086916  1.062312  2.037707  9.840872
Etude Decoder - Default   0.211236  0.701990  1.192744  5.118775
Etude Decoder - Prompted  0.233434  0.829003  1.424573  6.189125
Etude Extractor           0.118497  0.911008  1.703518  8.043605
Human                     0.489353  0.924388  1.359422  4.839699
Music2MIDI                0.183201  0.487121  0.791041  3.222401
PiCoGen                   1.001302  1.386048  1.770795  4.848767


--- 報告二：各版本在不同權重下的排名變化 (1=最好) ---
penalty_weight            0.00  0.05  0.10  0.50
version                                         
AMT-APC                      1     6     7     7
Etude Decoder - Default      4     2     2     4
Etude Decoder - Prompted     5     3     4     5
Etude Extractor              2     4     5     6
Human                        6     5     3     


