In [1]:
import pandas as pd
import numpy as np
import torch
import os
import glob
import librosa
import librosa.display
import matplotlib.pyplot as plt
import seaborn as sns
from pathlib import Path
import warnings
import torch.nn as nn
import torch.optim as optim
from torch.utils.data import Dataset, DataLoader
from sklearn.model_selection import train_test_split
from sklearn.preprocessing import LabelEncoder
import torch.nn.functional as F
from tqdm import tqdm
import time
import random
warnings.filterwarnings('ignore')

# 设置中文字体显示
plt.rcParams['font.sans-serif'] = ['SimHei', 'Arial Unicode MS']  
plt.rcParams['axes.unicode_minus'] = False

device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f"使用设备: {device}")

# 设置图表样式
plt.style.use('seaborn-v0_8')
sns.set_palette("husl")

DATA_DIR = "fma_metadata"  
AUDIO_DIR = "fma_small"   
FEATURE_PATH = 'features'


使用设备: cuda


In [3]:
def load_metadata():
    tracks = pd.read_csv(os.path.join(DATA_DIR, "tracks.csv"), header=[0, 1], index_col=0)
    genres = pd.read_csv(os.path.join(DATA_DIR, "genres.csv"), index_col=0)
    features = pd.read_csv(os.path.join(DATA_DIR, "features.csv"), header=[0, 1, 2], index_col=0)
    return tracks, genres, features

def process_tracks_data(tracks):
    # 提取关键列
    track_data = pd.DataFrame({
        'track_id': tracks.index,
        'title': tracks[('track', 'title')],
        'duration': tracks[('track', 'duration')],
        'genre_top': tracks[('track', 'genre_top')],
        'genres': tracks[('track', 'genres')],
        'listens': tracks[('track', 'listens')],
        'bit_rate': tracks[('track', 'bit_rate')],
        'interest': tracks[('track', 'interest')]
    })
    
    print(f"原始数据: {len(track_data)} 首歌曲")
    
    # 清理数据
    # 移除没有genre_top标签的数据
    track_data = track_data.dropna(subset=['genre_top'])
    track_data = track_data[track_data['genre_top'] != 0]
    
    # 转换数据类型
    track_data['duration'] = pd.to_numeric(track_data['duration'], errors='coerce')
    track_data['listens'] = pd.to_numeric(track_data['listens'], errors='coerce')
    genre_name_to_id = {}
    for gid, row in genres.iterrows():
        genre_name_to_id[row['title']] = gid
    
    # 映射流派名称到ID
    def map_genre_name_to_id(genre_name):
        if pd.isna(genre_name):
            return np.nan
        if isinstance(genre_name, str):
            return genre_name_to_id.get(genre_name, np.nan)
        else:
            return genre_name
    
    track_data['genre_top'] = track_data['genre_top'].apply(map_genre_name_to_id)
    
    # 移除无法映射的流派
    track_data = track_data.dropna(subset=['genre_top'])
    
    print(f"清理后数据: {len(track_data)} 首歌曲")
    return track_data

tracks, genres, features = load_metadata()
track_data = process_tracks_data(tracks)

原始数据: 106574 首歌曲
清理后数据: 49598 首歌曲


In [4]:
def scan_audio_files(audio_dir):
    """扫描音频目录，找到所有可用的mp3文件"""
    print(f"📂 扫描音频目录: {audio_dir}")
    
    if not os.path.exists(audio_dir):
        print(f"❌ 音频目录不存在: {audio_dir}")
        return {}
    
    audio_files = {}
    total_size = 0
    
    # 遍历所有子文件夹
    for root, dirs, files in os.walk(audio_dir):
        for file in files:
            if file.endswith('.mp3'):
                file_path = os.path.join(root, file)
                
                # 从文件名提取track_id
                try:
                    track_id = int(file.split('.')[0])
                    file_size = os.path.getsize(file_path)
                    
                    audio_files[track_id] = {
                        'path': file_path,
                        'size_bytes': file_size,
                        'size_kb': file_size / 1024,
                        'size_mb': file_size / (1024 * 1024)
                    }
                    total_size += file_size
                    
                except ValueError:
                    # 文件名不是数字，跳过
                    continue
    
    print(f"   找到 {len(audio_files)} 个音频文件")
    print(f"   总大小: {total_size / (1024**3):.2f} GB")
    
    return audio_files

# 扫描音频文件
audio_files_info = scan_audio_files(AUDIO_DIR)

📂 扫描音频目录: fma_small
   找到 7999 个音频文件
   总大小: 7.43 GB


In [5]:
def filter_valid_audio_files(audio_files_info, track_data, min_size_kb=10, max_size_kb=20000):
    """过滤出可用的音频文件"""
    valid_files = {}
    filtered_tracks = []
    
    for track_id, file_info in audio_files_info.items():
        # 检查文件大小
        if not (min_size_kb <= file_info['size_kb'] <= max_size_kb):
            continue
            
        # 检查是否在track_data中存在
        if track_id not in track_data['track_id'].values:
            continue
            
        # 获取对应的track信息
        track_row = track_data[track_data['track_id'] == track_id].iloc[0]
        
        # 合并音频文件信息和元数据
        combined_info = {
            'track_id': track_id,
            'title': track_row['title'],
            'genre_top': track_row['genre_top'],
            'duration': track_row['duration'],
            'listens': track_row['listens'],
            'audio_path': file_info['path'],
            'file_size_kb': file_info['size_kb'],
            'file_size_mb': file_info['size_mb']
        }
        
        valid_files[track_id] = file_info
        filtered_tracks.append(combined_info)
    
    # 转换为DataFrame
    valid_tracks_df = pd.DataFrame(filtered_tracks)
    
    print(f"\n✅ 过滤结果:")
    print(f"   原始track_data: {len(track_data)}")
    print(f"   最终可用数据: {len(valid_tracks_df)}")
    
    return valid_tracks_df, valid_files

# 过滤可用文件
valid_tracks_df, valid_audio_files = filter_valid_audio_files(
    audio_files_info, track_data, min_size_kb=10, max_size_kb=20000
)


✅ 过滤结果:
   原始track_data: 49598
   最终可用数据: 7996


In [6]:
def analyze_genre_distribution(valid_tracks_df, genres):
    """分析可用数据的流派分布"""

    # 统计每个流派的歌曲数量
    genre_counts = valid_tracks_df['genre_top'].value_counts()
    
    # 获取流派名称
    genre_names = {}
    for genre_id in genre_counts.index:
        if genre_id in genres.index:
            genre_names[genre_id] = genres.loc[genre_id, 'title']
        else:
            genre_names[genre_id] = f"Unknown_{genre_id}"
    
    # 创建流派统计
    genre_stats_filtered = pd.DataFrame({
        'genre_id': genre_counts.index,
        'genre_name': [genre_names[gid] for gid in genre_counts.index],
        'track_count': genre_counts.values,
        'percentage': (genre_counts.values / len(valid_tracks_df) * 100).round(2)
    })
    
    print(f"   总流派数: {len(genre_stats_filtered)}")
    print(f"   总歌曲数: {len(valid_tracks_df)}")
    for i, row in genre_stats_filtered.head(10).iterrows():
        print(f"   {i+1:2d}. {row['genre_name']:15s}: {row['track_count']:4d} ({row['percentage']:5.1f}%)")
    
    return genre_stats_filtered

# 分析可用数据的流派分布
if len(valid_tracks_df) > 0:
    final_genre_stats = analyze_genre_distribution(valid_tracks_df, genres)

   总流派数: 8
   总歌曲数: 7996
    1. Hip-Hop        : 1000 ( 12.5%)
    2. Pop            : 1000 ( 12.5%)
    3. Folk           : 1000 ( 12.5%)
    4. International  : 1000 ( 12.5%)
    5. Instrumental   : 1000 ( 12.5%)
    6. Experimental   :  999 ( 12.5%)
    7. Rock           :  999 ( 12.5%)
    8. Electronic     :  998 ( 12.5%)


In [11]:
def extract_mel_spectrogram(audio_path, sr=22050, n_mels=128, hop_length=512, n_fft=2048, duration=30):
    """
    提取mel频谱图特征
    """
    try:
        # 加载音频，限制时长为30秒
        y, sr = librosa.load(audio_path, sr=sr, duration=duration, offset=0)
        
        # 如果音频太短，用零填充到30秒
        target_length = sr * duration
        if len(y) < target_length:
            y = np.pad(y, (0, target_length - len(y)), mode='constant')
        else:
            y = y[:target_length]
        
        # 提取mel频谱图
        mel_spec = librosa.feature.melspectrogram(
            y=y, sr=sr, n_mels=n_mels, hop_length=hop_length, n_fft=n_fft
        )
        
        # 转换为对数刻度
        mel_spec_db = librosa.power_to_db(mel_spec, ref=np.max)
        
        return mel_spec_db
        
    except Exception as e:
        print(f"处理音频文件时出错 {audio_path}: {e}")
        # 返回零填充的频谱图
        return np.zeros((n_mels, 1292))  # 30秒音频的默认时间帧数

def batch_extract_features(valid_tracks_df, batch_size=100, save_interval=500):
    """
    批量提取特征并保存到磁盘
    """
    print("🎵 开始批量提取mel频谱图特征...")
 
    os.makedirs(FEATURE_PATH, exist_ok=True)
    
    features_list = []
    labels_list = []
    track_ids_list = []
    
    # 创建标签编码器
    label_encoder = LabelEncoder()
    
    # 获取所有流派ID并编码
    all_genres = valid_tracks_df['genre_top'].values
    encoded_labels = label_encoder.fit_transform(all_genres)
    
    print(f"流派映射关系:")
    for i, genre_id in enumerate(label_encoder.classes_):
        genre_name = genres.loc[genre_id, 'title'] if genre_id in genres.index else f"Unknown_{genre_id}"
        print(f"   {i}: {genre_name} (ID: {genre_id})")
    
    # 随机打乱数据
    indices = np.random.permutation(len(valid_tracks_df))
    
    processed_count = 0
    failed_count = 0
    
    for idx in tqdm(indices, desc="提取特征"):
        row = valid_tracks_df.iloc[idx]
        
        # 提取mel频谱图
        mel_spec = extract_mel_spectrogram(row['audio_path'])
        
        if mel_spec is not None:
            features_list.append(mel_spec)
            labels_list.append(encoded_labels[idx])
            track_ids_list.append(row['track_id'])
            processed_count += 1
        else:
            failed_count += 1
        
        # 定期保存和清理内存
        if len(features_list) >= save_interval:
            print(f"\n💾 保存中间结果... (已处理: {processed_count}, 失败: {failed_count})")
            
            # 转换为numpy数组并保存
            features_array = np.array(features_list)
            labels_array = np.array(labels_list)
            track_ids_array = np.array(track_ids_list)
            
            # 保存到文件
            timestamp = int(time.time())
            np.savez_compressed(os.path.join(FEATURE_PATH, f'features_batch_{timestamp}.npz'), 
                              features=features_array, 
                              labels=labels_array,
                              track_ids=track_ids_array)
            
            # 清理内存
            features_list = []
            labels_list = []
            track_ids_list = []
    
    # 保存剩余的数据
    if features_list:
        print(f"\n💾 保存最后批次...")
        features_array = np.array(features_list)
        labels_array = np.array(labels_list)
        track_ids_array = np.array(track_ids_list)
        
        timestamp = int(time.time())
        np.savez_compressed(os.path.join(FEATURE_PATH, f'features_batch_{timestamp}.npz'), 
                          features=features_array, 
                          labels=labels_array,
                          track_ids=track_ids_array)
    
    print(f"\n特征提取完成!")
    print(f"   成功处理: {processed_count} 个文件")
    print(f"   失败: {failed_count} 个文件")
    
    return label_encoder

# test_data = valid_tracks_df.head(100)  # 只处理前100个文件进行测试
# label_encoder = batch_extract_features(test_data, batch_size=50, save_interval=50)

# 已经提取过
# label_encoder = batch_extract_features(valid_tracks_df, batch_size=100, save_interval=500)

In [13]:
def load_all_features():
    """
    加载所有保存的特征文件
    """

    print("📂 加载所有特征文件...")

    feature_files = glob.glob(os.path.join(FEATURE_PATH, 'features_*.npz'))
    if not feature_files:
        print("❌ 没有找到特征文件！")
        return None, None, None
    
    all_features = []
    all_labels = []
    all_track_ids = []
    
    for file in feature_files:
        print(f"   加载: {file}")
        data = np.load(file)
        all_features.append(data['features'])
        all_labels.append(data['labels'])
        all_track_ids.append(data['track_ids'])
    
    # 合并所有数据
    features = np.concatenate(all_features, axis=0)
    labels = np.concatenate(all_labels, axis=0)
    track_ids = np.concatenate(all_track_ids, axis=0)
    
    print(f"✅ 加载完成!")
    print(f"   总样本数: {len(features)}")
    print(f"   特征维度: {features.shape}")
    print(f"   标签分布: {np.bincount(labels)}")
    
    return features, labels, track_ids

In [15]:
class MusicDataset(Dataset):
    def __init__(self, features, labels, transform=None):
        self.features = features
        self.labels = labels
        self.transform = transform
    
    def __len__(self):
        return len(self.features)
    
    def __getitem__(self, idx):
        feature = self.features[idx]
        label = self.labels[idx]
        
        # 转换为张量
        feature = torch.FloatTensor(feature).unsqueeze(0)  # 添加通道维度
        label = torch.LongTensor([label])[0]
        
        if self.transform:
            feature = self.transform(feature)
        
        return feature, label

def create_data_splits(features, labels, test_size=0.2, val_size=0.1, random_state=42):
    """
    创建训练、验证和测试集
    """
    print("🔄 划分数据集...")
    
    # 首先分离出测试集
    X_temp, X_test, y_temp, y_test = train_test_split(
        features, labels, test_size=test_size, 
        random_state=random_state, stratify=labels
    )
    
    # 从剩余数据中分离出验证集
    val_size_adjusted = val_size / (1 - test_size)  # 调整验证集比例
    X_train, X_val, y_train, y_val = train_test_split(
        X_temp, y_temp, test_size=val_size_adjusted, 
        random_state=random_state, stratify=y_temp
    )
    
    print(f"✅ 数据集划分完成:")
    print(f"   训练集: {len(X_train)} 样本 ({len(X_train)/len(features)*100:.1f}%)")
    print(f"   验证集: {len(X_val)} 样本 ({len(X_val)/len(features)*100:.1f}%)")
    print(f"   测试集: {len(X_test)} 样本 ({len(X_test)/len(features)*100:.1f}%)")
    
    # 检查标签分布
    print(f"\n📊 各集合的标签分布:")
    print(f"   训练集: {np.bincount(y_train)}")
    print(f"   验证集: {np.bincount(y_val)}")
    print(f"   测试集: {np.bincount(y_test)}")
    
    return X_train, X_val, X_test, y_train, y_val, y_test

In [17]:
# 数据增强
class AudioTransform:
    def __init__(self, noise_factor=0.005, time_shift_factor=0.1):
        self.noise_factor = noise_factor
        self.time_shift_factor = time_shift_factor
    
    def __call__(self, x):
        # 添加噪声
        if random.random() > 0.5:
            noise = torch.randn_like(x) * self.noise_factor
            x = x + noise
        
        # 时间偏移
        if random.random() > 0.5:
            shift = int(x.shape[-1] * self.time_shift_factor * (random.random() - 0.5))
            if shift != 0:
                if shift > 0:
                    x = torch.cat([x[..., shift:], torch.zeros_like(x[..., :shift])], dim=-1)
                else:
                    x = torch.cat([torch.zeros_like(x[..., :abs(shift)]), x[..., :shift]], dim=-1)
        
        return x

# 加载特征数据
features, labels, track_ids = load_all_features()

if features is not None:
    # 划分数据集
    X_train, X_val, X_test, y_train, y_val, y_test = create_data_splits(features, labels)
    
    # 创建数据集
    train_transform = AudioTransform()  # 训练时使用数据增强
    
    train_dataset = MusicDataset(X_train, y_train, transform=train_transform)
    val_dataset = MusicDataset(X_val, y_val, transform=None)
    test_dataset = MusicDataset(X_test, y_test, transform=None)
    
    print("✅ 数据集创建完成！")

📂 加载所有特征文件...
   加载: features\features_batch_1754283560.npz
   加载: features\features_batch_1754283609.npz
   加载: features\features_batch_1754283656.npz
   加载: features\features_batch_1754283721.npz
   加载: features\features_batch_1754283766.npz
   加载: features\features_batch_1754283828.npz
   加载: features\features_batch_1754283986.npz
   加载: features\features_batch_1754284160.npz
   加载: features\features_batch_1754284344.npz
   加载: features\features_batch_1754284508.npz
   加载: features\features_batch_1754284602.npz
   加载: features\features_batch_1754284667.npz
   加载: features\features_batch_1754284712.npz
   加载: features\features_batch_1754284758.npz
   加载: features\features_batch_1754284804.npz
   加载: features\features_batch_1754284850.npz
   加载: features\features_batch_1754285711.npz
✅ 加载完成!
   总样本数: 8496
   特征维度: (8496, 128, 1292)
   标签分布: [1065 1064 1066 1059 1061 1059 1062 1060]
🔄 划分数据集...
✅ 数据集划分完成:
   训练集: 5946 样本 (70.0%)
   验证集: 850 样本 (10.0%)
   测试集: 1700 样本 (20.0%)

📊 各集合的标签分布

In [None]:
class CNN(nn.Module):
    def __init__(self, num_classes=8):
        super(CNN, self).__init__()
        
        self.features = nn.Sequential(
            # 第一层
            nn.Conv2d(1, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.Conv2d(32, 32, kernel_size=3, padding=1),
            nn.BatchNorm2d(32),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Dropout2d(0.25),
            
            # 第二层
            nn.Conv2d(32, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.Conv2d(64, 64, kernel_size=3, padding=1),
            nn.BatchNorm2d(64),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Dropout2d(0.25),
            
            # 第三层
            nn.Conv2d(64, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.Conv2d(128, 128, kernel_size=3, padding=1),
            nn.BatchNorm2d(128),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Dropout2d(0.25),
            
            # 第四层
            nn.Conv2d(128, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.Conv2d(256, 256, kernel_size=3, padding=1),
            nn.BatchNorm2d(256),
            nn.ReLU(),
            nn.MaxPool2d(2, 2),
            nn.Dropout2d(0.25),
        )
        
        # 自适应池化
        self.adaptive_pool = nn.AdaptiveAvgPool2d((4, 4))
        
        # 分类器
        self.classifier = nn.Sequential(
            nn.Dropout(0.5),
            nn.Linear(256 * 4 * 4, 512),
            nn.ReLU(),
            nn.Dropout(0.5),
            nn.Linear(512, 256),
            nn.ReLU(),
            nn.Dropout(0.3),
            nn.Linear(256, num_classes)
        )
    
    def forward(self, x):
        x = self.features(x)
        x = self.adaptive_pool(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x

# 创建模型实例
print("🏗️  创建CNN模型...")
model = CNN(num_classes=8)
print(f"模型参数量: {sum(p.numel() for p in model.parameters()):,}")
model = model.to(device)

print("\n📋 模型结构:")
print(model)

In [None]:
def train_epoch(model, dataloader, criterion, optimizer, device):
    """训练一个epoch"""
    model.train()
    running_loss = 0.0
    correct = 0
    total = 0

    for batch_idx, (data, target) in enumerate(dataloader):
        data, target = data.to(device), target.to(device)
        
        optimizer.zero_grad()
        output = model(data)
        loss = criterion(output, target)
        loss.backward()
        optimizer.step()
        
        running_loss += loss.item()
        with torch.no_grad():  
            _, predicted = torch.max(output.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()

    epoch_loss = running_loss / len(dataloader)
    epoch_acc = 100. * correct / total
    return epoch_loss, epoch_acc

def validate_epoch(model, dataloader, criterion, device):
    """验证一个epoch"""
    model.eval()
    running_loss = 0.0
    correct = 0
    total = 0
    
    with torch.no_grad():
        for data, target in dataloader:
            data, target = data.to(device), target.to(device)
            output = model(data)
            loss = criterion(output, target)
            
            running_loss += loss.item()
            _, predicted = torch.max(output.data, 1)
            total += target.size(0)
            correct += (predicted == target).sum().item()
    
    epoch_loss = running_loss / len(dataloader)
    epoch_acc = 100. * correct / total
    return epoch_loss, epoch_acc

def plot_training_history(train_losses, val_losses, train_accs, val_accs):
    """绘制训练历史"""
    fig, (ax1, ax2) = plt.subplots(1, 2, figsize=(15, 5))
    
    # 损失曲线
    ax1.plot(train_losses, label='Training Loss', color='blue')
    ax1.plot(val_losses, label='Validation Loss', color='red')
    ax1.set_title('Loss vs Epoch')
    ax1.set_xlabel('Epoch')
    ax1.set_ylabel('Loss')
    ax1.legend()
    ax1.grid(True)
    
    # 准确率曲线
    ax2.plot(train_accs, label='Training Accuracy', color='blue')
    ax2.plot(val_accs, label='Validation Accuracy', color='red')
    ax2.set_title('Accuracy vs Epoch')
    ax2.set_xlabel('Epoch')
    ax2.set_ylabel('Accuracy (%)')
    ax2.legend()
    ax2.grid(True)
    
    plt.tight_layout()
    plt.show()


class EarlyStopping:
    """早停机制"""
    def __init__(self, patience=7, min_delta=0, restore_best_weights=True):
        self.patience = patience
        self.min_delta = min_delta
        self.restore_best_weights = restore_best_weights
        self.best_loss = None
        self.counter = 0
        self.best_weights = None

    def __call__(self, val_loss, model):
        if self.best_loss is None:
            self.best_loss = val_loss
            self.save_checkpoint(model)
        elif val_loss < self.best_loss - self.min_delta:
            self.best_loss = val_loss
            self.counter = 0
            self.save_checkpoint(model)
        else:
            self.counter += 1

        if self.counter >= self.patience:
            if self.restore_best_weights:
                model.load_state_dict(self.best_weights)
            return True
        return False

    def save_checkpoint(self, model):
        self.best_weights = model.state_dict().copy()

In [None]:
def train_model(model, train_loader, val_loader, criterion, optimizer, scheduler, 
                num_epochs=50, device='cuda', patience=10):
    """完整的训练流程"""
    
    print(f"🚀 开始训练，共 {num_epochs} 个epoch...")
    
    # 初始化记录
    train_losses = []
    val_losses = []
    train_accs = []
    val_accs = []
    
    # 早停机制
    early_stopping = EarlyStopping(patience=patience, min_delta=0.001)
    
    best_val_acc = 0.0
    start_time = time.time()
    
    for epoch in range(num_epochs):
        print(f"\nEpoch {epoch+1}/{num_epochs}")
        print("-" * 30)
        
        # 训练
        train_loss, train_acc = train_epoch(model, train_loader, criterion, optimizer, device)
        train_losses.append(train_loss)
        train_accs.append(train_acc)
        
        # 验证
        val_loss, val_acc = validate_epoch(model, val_loader, criterion, device)
        val_losses.append(val_loss)
        val_accs.append(val_acc)
        
        # 调整学习率
        if scheduler:
            scheduler.step(val_loss)
        
        # 打印结果
        print(f"Train Loss: {train_loss:.4f}, Train Acc: {train_acc:.2f}%")
        print(f"Val Loss: {val_loss:.4f}, Val Acc: {val_acc:.2f}%")
        
        # 保存最佳模型
        if val_acc > best_val_acc:
            best_val_acc = val_acc
            torch.save(model.state_dict(), 'best_music_model.pth')
            print(f"🎯 新的最佳验证准确率: {best_val_acc:.2f}%")
        
        # 早停检查
        if early_stopping(val_loss, model):
            print(f"⏹️  早停触发，在第 {epoch+1} 个epoch停止训练")
            break
    
    training_time = time.time() - start_time
    print(f"\n✅ 训练完成！")
    print(f"⏱️  训练时间: {training_time/60:.2f} 分钟")
    print(f"🏆 最佳验证准确率: {best_val_acc:.2f}%")
    
    return train_losses, val_losses, train_accs, val_accs


In [None]:
# baseline
criterion = nn.CrossEntropyLoss()

# 优化器 - 使用经典配置
optimizer = optim.Adam(
    model.parameters(), 
    lr=0.001,           # 标准学习率
    weight_decay=1e-4   # L2正则化
)

# 学习率调度器
scheduler = optim.lr_scheduler.LambdaLR(optimizer, lr_lambda=lambda epoch: 1.0)

batch_size = 32

num_workers = 2

train_loader = DataLoader(
    train_dataset, 
    batch_size=batch_size, 
    shuffle=True,           
    num_workers=num_workers,
    pin_memory=True         # 加速GPU传输
)

val_loader = DataLoader(
    val_dataset, 
    batch_size=batch_size, 
    shuffle=False,          
    num_workers=num_workers,
    pin_memory=True
)

print("准备基线模型")

train_losses, val_losses, train_accs, val_accs = train_model(model, train_loader, val_loader, criterion, optimizer, scheduler)

In [None]:
import optuna
from optuna.integration import PyTorchLightningPruningCallback
from sklearn.metrics import accuracy_score, classification_report


class ConfigurableCNN(nn.Module):
    """可配置的CNN模型，用于超参数调优"""
    
    def __init__(self, trial, num_classes=8):
        super(ConfigurableCNN, self).__init__()
        
        # 通过trial对象获取超参数
        self.n_layers = trial.suggest_int('n_layers', 2, 5)
        
        layers = []
        in_channels = 1
        
        for i in range(self.n_layers):
            # 每层的通道数
            out_channels = trial.suggest_categorical(f'n_units_l{i}', [16, 32, 64, 128, 256])
            
            # 卷积核大小
            kernel_size = trial.suggest_categorical(f'kernel_size_l{i}', [3, 5])
            
            # Dropout概率
            dropout_rate = trial.suggest_float(f'dropout_l{i}', 0.1, 0.5)
            
            # 构建卷积块
            layers.extend([
                nn.Conv2d(in_channels, out_channels, kernel_size, padding=kernel_size//2),
                nn.BatchNorm2d(out_channels),
                nn.ReLU(),
                nn.MaxPool2d(2, 2),
                nn.Dropout2d(dropout_rate)
            ])
            
            in_channels = out_channels
        
        self.features = nn.Sequential(*layers)
        
        # 自适应池化
        pool_size = trial.suggest_categorical('pool_size', [2, 4, 8])
        self.adaptive_pool = nn.AdaptiveAvgPool2d((pool_size, pool_size))
        
        # 分类器
        fc_input_size = in_channels * pool_size * pool_size
        hidden_size = trial.suggest_categorical('fc_hidden_size', [128, 256, 512])
        final_dropout = trial.suggest_float('final_dropout', 0.2, 0.7)
        
        self.classifier = nn.Sequential(
            nn.Dropout(final_dropout),
            nn.Linear(fc_input_size, hidden_size),
            nn.ReLU(),
            nn.Dropout(final_dropout * 0.5),
            nn.Linear(hidden_size, num_classes)
        )
    
    def forward(self, x):
        x = self.features(x)
        x = self.adaptive_pool(x)
        x = x.view(x.size(0), -1)
        x = self.classifier(x)
        return x

In [None]:
def objective(trial):
    """Optuna的目标函数"""
    
    # 1. 模型超参数
    model = ConfigurableCNN(trial, num_classes=8).to(device)
    
    # 2. 训练超参数
    lr = trial.suggest_float('lr', 1e-5, 1e-2, log=True)
    batch_size = trial.suggest_categorical('batch_size', [16, 32, 64])
    weight_decay = trial.suggest_float('weight_decay', 1e-6, 1e-2, log=True)
    
    # 3. 优化器选择
    optimizer_name = trial.suggest_categorical('optimizer', ['Adam', 'RMSprop', 'SGD'])
    
    if optimizer_name == 'Adam':
        optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
    elif optimizer_name == 'RMSprop':
        optimizer = optim.RMSprop(model.parameters(), lr=lr, weight_decay=weight_decay)
    else:  # SGD
        momentum = trial.suggest_float('momentum', 0.8, 0.99)
        optimizer = optim.SGD(model.parameters(), lr=lr, weight_decay=weight_decay, momentum=momentum)
    
    # 4. 创建数据加载器（使用新的batch_size）
    if 'train_dataset' in globals():
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2)
        val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=2)
    else:
        # 如果数据集不存在，返回一个虚拟值用于演示
        return 0.5
    
    # 5. 训练配置
    criterion = nn.CrossEntropyLoss()
    n_epochs = 10  # 为了快速调优，减少epoch数
    
    # 6. 训练循环
    best_val_acc = 0.0
    
    for epoch in range(n_epochs):
        # 训练
        model.train()
        for batch_idx, (data, target) in enumerate(train_loader):
            if batch_idx > 50:  # 限制每个epoch的批次数，加速调优
                break
                
            data, target = data.to(device), target.to(device)
            optimizer.zero_grad()
            output = model(data)
            loss = criterion(output, target)
            loss.backward()
            optimizer.step()
        
        # 验证
        model.eval()
        correct = 0
        total = 0
        with torch.no_grad():
            for batch_idx, (data, target) in enumerate(val_loader):
                if batch_idx > 20:  # 限制验证批次数
                    break
                    
                data, target = data.to(device), target.to(device)
                output = model(data)
                _, predicted = torch.max(output.data, 1)
                total += target.size(0)
                correct += (predicted == target).sum().item()
        
        val_acc = correct / total
        best_val_acc = max(best_val_acc, val_acc)
        
        # 报告中间结果给Optuna
        trial.report(val_acc, epoch)
        
        # 如果效果不好，提前剪枝
        if trial.should_prune():
            raise optuna.exceptions.TrialPruned()
    
    return best_val_acc

def run_hyperparameter_tuning(n_trials=50):
    """运行超参数调优"""
    
    print("🔍 开始超参数自动调优...")
    print(f"将尝试 {n_trials} 种不同的超参数组合")
    
    # 创建研究对象
    study = optuna.create_study(
        direction='maximize',  # 最大化验证准确率
        pruner=optuna.pruners.MedianPruner(n_startup_trials=5, n_warmup_steps=3)
    )
    
    # 开始优化
    study.optimize(objective, n_trials=n_trials)
    
    # 输出结果
    print("🎯 超参数调优完成！")
    print(f"最佳验证准确率: {study.best_value:.4f}")
    print("最佳超参数组合:")
    for key, value in study.best_params.items():
        print(f"  {key}: {value}")
    
    return study

In [None]:
def manual_grid_search():
    """手动网格搜索（更简单的调参方法）"""
    
    if 'train_dataset' not in globals():
        print("⚠️  请先运行数据加载步骤！")
        return None
    
    print("🔍 开始手动网格搜索...")
    
    # 定义搜索空间
    param_grid = {
        'lr': [0.001, 0.0005, 0.0001],
        'batch_size': [16, 32, 64],
        'weight_decay': [1e-4, 1e-5, 1e-6],
        'dropout_rate': [0.3, 0.4, 0.5]
    }
    
    best_params = None
    best_acc = 0.0
    results = []
    
    # 遍历所有参数组合
    from itertools import product
    
    param_combinations = list(product(*param_grid.values()))
    total_combinations = len(param_combinations)
    
    print(f"总共需要测试 {total_combinations} 种参数组合")
    
    for i, params in enumerate(param_combinations):
        lr, batch_size, weight_decay, dropout_rate = params
        
        print(f"\n测试组合 {i+1}/{total_combinations}:")
        print(f"  lr={lr}, batch_size={batch_size}, weight_decay={weight_decay}, dropout={dropout_rate}")
        
        # 创建模型
        model = DeepMusicCNN(num_classes=8).to(device)
        
        # 修改dropout率（这里简化处理）
        for module in model.modules():
            if isinstance(module, nn.Dropout) or isinstance(module, nn.Dropout2d):
                module.p = dropout_rate
        
        # 创建优化器
        optimizer = optim.Adam(model.parameters(), lr=lr, weight_decay=weight_decay)
        criterion = nn.CrossEntropyLoss()
        
        # 创建数据加载器
        train_loader = DataLoader(train_dataset, batch_size=batch_size, shuffle=True, num_workers=2)
        val_loader = DataLoader(val_dataset, batch_size=batch_size, shuffle=False, num_workers=2)
        
        # 快速训练（只训练几个epoch）
        best_val_acc = 0.0
        for epoch in range(5):  # 只训练5个epoch
            # 训练
            model.train()
            for batch_idx, (data, target) in enumerate(train_loader):
                if batch_idx > 30:  # 限制批次数
                    break
                data, target = data.to(device), target.to(device)
                optimizer.zero_grad()
                output = model(data)
                loss = criterion(output, target)
                loss.backward()
                optimizer.step()
            
            # 验证
            model.eval()
            correct = 0
            total = 0
            with torch.no_grad():
                for batch_idx, (data, target) in enumerate(val_loader):
                    if batch_idx > 15:  # 限制验证批次数
                        break
                    data, target = data.to(device), target.to(device)
                    output = model(data)
                    _, predicted = torch.max(output.data, 1)
                    total += target.size(0)
                    correct += (predicted == target).sum().item()
            
            val_acc = correct / total
            best_val_acc = max(best_val_acc, val_acc)
        
        results.append({
            'params': {'lr': lr, 'batch_size': batch_size, 'weight_decay': weight_decay, 'dropout': dropout_rate},
            'accuracy': best_val_acc
        })
        
        print(f"  最佳验证准确率: {best_val_acc:.4f}")
        
        if best_val_acc > best_acc:
            best_acc = best_val_acc
            best_params = {'lr': lr, 'batch_size': batch_size, 'weight_decay': weight_decay, 'dropout': dropout_rate}
            print(f"  🎯 发现更好的参数组合！")
    
    print(f"\n✅ 网格搜索完成！")
    print(f"最佳准确率: {best_acc:.4f}")
    print("最佳参数:")
    for key, value in best_params.items():
        print(f"  {key}: {value}")
    
    return best_params, results


In [None]:
run_hyperparameter_tuning(n_trials=20)

manual_grid_search()