In [2]:
import pandas as pd
import numpy as np
import random
from datetime import datetime, timedelta
import os
import pickle
import soundfile as sf
import logging
from tqdm import tqdm
from msclap import CLAP  # 确保msclap模块已正确安装
import torch
import chardet

# 配置日志记录
logging.basicConfig(
    filename='processing.log',
    filemode='a',
    format='%(asctime)s - %(levelname)s - %(message)s',
    level=logging.INFO
)

# 设置环境变量以启用同步错误报告（用于调试）
os.environ['CUDA_LAUNCH_BLOCKING'] = '1'

# 设置设备
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')
print(f'Using device: {device}')
logging.info(f'Using device: {device}')

# 初始化 CLAP 模型（根据需要启用或禁用 CUDA）
clap_model = CLAP("D:/Melody/Models/MSClap/CLAP_weights_2023.pth", version='2023', use_cuda=device.type == 'cuda')

# 获取指定扩展名的文件列表
def get_files_by_extension(directory, extensions):
    files = []
    for root, dirs, files_in_dir in os.walk(directory):
        for file in files_in_dir:
            if file.lower().endswith(extensions):
                files.append(os.path.join(root, file))
    return files

# 解析文件名，提取歌曲名称、性别和艺术家
def parse_filename(file_path):
    base_name = os.path.basename(file_path)
    name, ext = os.path.splitext(base_name)
    # 按照 '_M_' 或 '_F_' 分隔符进行分割
    if '_M_' in name:
        parts = name.split('_M_')
        gender = 'M'
    elif '_F_' in name:
        parts = name.split('_F_')
        gender = 'F'
    else:
        # 文件名不包含 '_M_' 或 '_F_'，不符合预期格式
        return None, None, None, ext.lower()
    
    if len(parts) != 2:
        # 分割后不等于2部分，格式不正确
        return None, None, None, ext.lower()
    
    song_name = parts[0].strip()
    artist = parts[1].strip()
    
    return song_name, gender, artist, ext.lower()

# 读取文件并检测编码
def read_file_with_detected_encoding(file_path):
    with open(file_path, 'rb') as f:
        raw_data = f.read()
    result = chardet.detect(raw_data)
    encoding = result['encoding']
    if encoding is None:
        encoding = 'utf-8'  # 默认编码
    try:
        return raw_data.decode(encoding)
    except UnicodeDecodeError:
        # 如果仍然失败，可以选择忽略错误
        logging.warning(f"文件 '{file_path}' 的编码解码失败，尝试忽略错误。")
        return raw_data.decode(encoding, errors='ignore')

# 验证音频文件是否有效
def is_audio_file_valid(file_path):
    try:
        data, samplerate = sf.read(file_path)
        return True
    except Exception as e:
        logging.error(f"音频文件 '{file_path}' 无法读取。错误信息: {e}")
        return False

# 定义音乐目录路径
music_directory = r"D:/Melody/Music"

# 获取所有相关文件
extensions = ('.wav', '.txt')  # 只处理 .wav 和 .txt 文件
files = get_files_by_extension(music_directory, extensions)
logging.info(f"找到 {len(files)} 个相关文件。")

# 按歌曲名称、性别和艺术家对文件进行分组
songs = {}

for file_path in files:
    song_name, gender, artist, ext = parse_filename(file_path)
    if song_name is None or gender is None or artist is None:
        logging.warning(f"文件 '{file_path}' 不符合预期的命名格式，已跳过。")
        continue  # 跳过不符合格式的文件
    key = (song_name, gender, artist)
    if key not in songs:
        songs[key] = {'audio_file': None, 'text_file': None}
    if ext == '.wav':
        songs[key]['audio_file'] = file_path
    elif ext == '.txt':
        songs[key]['text_file'] = file_path

logging.info(f"成功分组 {len(songs)} 首歌曲。")

# 定义输出目录
output_directory = r"D:/Melody/Data"
if not os.path.exists(output_directory):
    os.makedirs(output_directory)
    logging.info(f"创建输出目录 '{output_directory}'。")

# 将歌曲列表排序，生成 song_id 映射
song_keys = list(songs.keys())
song_keys.sort()  # 按照 (song_name, gender, artist) 进行排序

# 创建 song_key 到 song_id 的映射，song_id 为序号数字
song_key_to_id = {key: idx for idx, key in enumerate(song_keys)}

# 初始化列表用于存储表格数据
data = []

# 处理每首歌曲，提取嵌入
for key in tqdm(song_keys, desc="Processing songs", unit="song"):
    song_name, gender, artist = key
    files = songs[key]
    audio_embedding = None
    text_embedding = None

    # 处理音频文件
    audio_file = files['audio_file']
    if audio_file:
        if is_audio_file_valid(audio_file):
            try:
                embedding = clap_model.get_audio_embeddings([audio_file])
                audio_embedding = embedding.cpu().numpy().flatten()  # 展平为一维数组
            except Exception as e:
                logging.error(f"无法处理歌曲 '{song_name}' ({gender}) by '{artist}' 的音频文件。错误信息: {e}")
        else:
            logging.warning(f"歌曲 '{song_name}' ({gender}) by '{artist}' 的音频文件无效。")
    else:
        logging.warning(f"歌曲 '{song_name}' ({gender}) by '{artist}' 缺少音频文件。")

    # 处理文本文件
    text_file = files['text_file']
    if text_file:
        try:
            text = read_file_with_detected_encoding(text_file)
            if text.strip():  # 确保文本非空
                embedding = clap_model.get_text_embeddings([text])
                text_embedding = embedding.cpu().numpy().flatten()
            else:
                logging.warning(f"歌曲 '{song_name}' ({gender}) by '{artist}' 的歌词文件为空。")
        except Exception as e:
            logging.error(f"无法处理歌曲 '{song_name}' ({gender}) by '{artist}' 的歌词文件。错误信息: {e}")
    else:
        logging.warning(f"歌曲 '{song_name}' ({gender}) by '{artist}' 缺少歌词文件。")

    # 仅当至少有一个嵌入存在时才添加到数据列表
    if audio_embedding is not None or text_embedding is not None:
        # 获取 song_id
        song_id = song_key_to_id[key]

        data.append({
            'song_id': song_id,  # 使用序号数字作为 song_id
            'song_name': song_name,
            'gender': gender,
            'artist': artist,
            'text_embedding': text_embedding,
            'audio_embedding': audio_embedding
        })

# 创建 DataFrame
df = pd.DataFrame(data)

# 检查是否成功添加 song_id
if 'song_id' not in df.columns:
    logging.error("song_id 列未成功添加到 DataFrame。")
    raise KeyError("song_id 列未成功添加到 DataFrame。")
else:
    logging.info("song_id 列已成功添加到 DataFrame。")

# 保存 DataFrame 为 pickle 文件
output_file = os.path.join(output_directory, "song_embeddings.pkl")
df.to_pickle(output_file)
logging.info(f"已处理 {len(df)} 首歌曲。数据已保存到 '{output_file}'。")

# 可选：打印确认信息
print(f"已处理 {len(df)} 首歌曲。数据已保存到 {output_file}")


Using device: cuda


Processing songs:  94%|█████████▍| 558/595 [01:29<00:04,  7.94song/s]