In [1]:
from pydub import AudioSegment
import os, shutil
from tqdm import tqdm
import psutil
from concurrent.futures import ThreadPoolExecutor, wait

data_dir = r"E:\music\20250227"
dst_dir = r"E:\music\20250227_ogg"
MAX_CPU_PERCENT = 80  # 最大CPU占用率[3,5](@ref)

def convert_mp3_to_ogg(input_file, output_file):
    """带CPU限制的转换函数"""
    try:
        # 动态调整线程数
        while psutil.cpu_percent(1) > MAX_CPU_PERCENT:
            pass  # 等待CPU占用下降[4](@ref)
            
        audio = AudioSegment.from_mp3(input_file)
        audio.export(output_file, format="ogg", parameters=["-b:a", '64k'])
        return True
    except Exception as e:
        print(f"Error converting {input_file}: {str(e)}")
        return False

def process_file(file):
    """多线程处理函数"""
    if file[-4:].lower() != ".mp3":
        return  # 跳过非MP3文件[2](@ref)
    
    src_path = os.path.join(data_dir, file)
    dst_path = os.path.join(dst_dir, file[:-4] + ".ogg")
    
    if os.path.exists(dst_path):
        return  # 跳过已转换文件[1](@ref)
    
    return convert_mp3_to_ogg(src_path, dst_path)

if __name__ == "__main__":
    os.makedirs(dst_dir, exist_ok=True)
    flist = [f for f in os.listdir(data_dir) if f.lower().endswith(".mp3")]
    
    # 智能线程数设置（根据CPU核心数和占用率动态调整）
    cpu_count = os.cpu_count()
    max_workers = max(1, int(cpu_count * 0.8))  # 保留20%余量[3,5](@ref)
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        futures = [executor.submit(process_file, file) for file in flist]
        
        # 进度条显示
        with tqdm(total=len(flist), desc="Converting") as pbar:
            for future in futures:
                future.add_done_callback(lambda _: pbar.update(1))
            wait(futures)

Converting: 100%|██████████| 4017/4017 [11:06<00:00,  6.03it/s] 


In [2]:
import os
import time
import shutil
import json


# 原始音频路径
raw_music_root_path = r"E:\music\20250227_ogg"

# 识别为有vocal的音频存放路径
pos_music_save_path = r"E:\music\pos"

# 由于疏漏没有过模型的曲子，后续需要打包重新过模型
remain_music_path = r"E:\music\remains"

# 将qq音乐下载好的音乐全部打包，每个包原始音频大小不超过500M
max_zip_pack_size = (1<<32)

# 压缩文件临时路径
temp_root_path = r"E:\music\temp"
temp_music_path = r"E:\music\temp\music"

# 所有元数据
all_metas_file = r"E:\music\metas\all_metas.json"


def pack_music_zip_file():
    if os.path.exists(temp_root_path):
        shutil.rmtree(temp_root_path)
    os.mkdir(temp_root_path)
    os.mkdir(temp_music_path)
    cur_size = 0
    cur_fn = 0
    cur_zip_num = 0
    file_list = os.listdir(raw_music_root_path)

    # 计算每个包的大小（均衡策略）
    total_size = sum([os.stat(os.path.join(raw_music_root_path,f)).st_size for f in os.listdir(raw_music_root_path)])
    pack_num = total_size//max_zip_pack_size + 1

    # 当前有两个并行，因此数量要为2的倍数
    pack_num = (pack_num+1)//2 * 2

    true_pack_size = total_size/pack_num
    print("total/pack_num/size:", len(file_list), pack_num, true_pack_size)

    for i, file in enumerate(file_list):
        file_fullpath = os.path.join(raw_music_root_path, file)
        file_cp_fullpath = os.path.join(temp_music_path, file)
        cur_size += os.stat(file_fullpath).st_size
        cur_fn += 1
        shutil.copyfile(file_fullpath, file_cp_fullpath)
        if cur_size < true_pack_size and i != len(file_list)-1:
            continue
        cur_size = 0
        cur_fn = 0
        pack_name = "music_zip" + str(cur_zip_num)
        print("pack...", pack_name, i+1)
        shutil.make_archive(os.path.join(temp_root_path, pack_name), 'gztar', root_dir='.', base_dir=temp_music_path)
        print("pack OK!", pack_name)
        cur_zip_num+=1
        shutil.rmtree(temp_music_path)
        os.mkdir(temp_music_path)
        # 改名，防止kaggle解压后看到数据
        shutil.move(os.path.join(temp_root_path, pack_name)+".tar.gz", os.path.join(temp_root_path, pack_name))

pack_music_zip_file()

total/pack_num/size: 4017 2 2705099641.0
pack... music_zip0 1834
pack OK! music_zip0
pack... music_zip1 4017
pack OK! music_zip1


In [3]:
import os
import time
import shutil
import json


# 原始音频路径
raw_music_root_path = r"E:\music\20250227"

# 识别为有vocal的音频存放路径
pos_music_save_path = r"E:\music\pos"

# 由于疏漏没有过模型的曲子，后续需要打包重新过模型
remain_music_path = r"E:\music\remains"

# 将qq音乐下载好的音乐全部打包，每个包原始音频大小不超过500M
max_zip_pack_size = (1<<31)

# 压缩文件临时路径
temp_root_path = r"E:\music\temp"
temp_music_path = r"E:\music\temp\music"

# 所有元数据
all_metas_file = r"E:\music\metas\all_metas.json"

def copy_remain_musics(res):
    cnt = 0
    res_fset = set([_[0] for _ in res])

    for file in os.listdir(raw_music_root_path):
        if file not in res_fset:
            shutil.copyfile(os.path.join(raw_music_root_path, file), os.path.join(remain_music_path, file))
            cnt += 1
    print("find %d files not in res" % cnt)

def add_demus_res_to_meta(demus_res_files):
    # 先备份元数据
    shutil.copyfile(all_metas_file, all_metas_file+"_"+str(time.time())+".json")
    all_metas = json.loads(open(all_metas_file, "r").read())
    # 阈值
    thre = 50

    res = []
    for demus_res_file in demus_res_files:
        with open(demus_res_file, 'r') as f:
            res += json.loads(f.read())
            print("res len: %d" % len(res))

    # 将res后缀转换回mp3
    for i in range(len(res)):
        file_name = res[i][0]
        if file_name[-4:] == ".ogg":
            res[i][0] = file_name[:-4] + ".mp3"

    # 剩余未识别的曲子数量
    copy_remain_musics(res)

    # 将本轮的pos样本拷贝到单独路径下，并合入all_metas中，去除重复音乐
    for [file, song_id, ratio] in res:
        file = os.path.basename(file)
        is_new_song = True
        if file in all_metas:
            print("find duplicate song: %s" % file)
            for dup_song in all_metas[file]:
                if dup_song[0] == song_id:
                    print("find a same song id, so we drop it")
                    is_new_song = False
                    break
            else:
                print("cannot find a same song id, add to meta...")
                all_metas[file].append([song_id, ratio])
        else:
            all_metas[file] = [[song_id, ratio]]
        if ratio >= thre:
            continue
        if not is_new_song:
            continue

        try:
            shutil.copyfile(os.path.join(raw_music_root_path, file), os.path.join(pos_music_save_path, file))
        except FileNotFoundError as e:
            print("cannot find: %s" % os.path.join(raw_music_root_path, file))

    with open(all_metas_file, "w") as f:
        f.write(json.dumps(all_metas))


res_files = [r"E:\music\src\vgmdb\cache\kaggle_2\res.json", r"E:\music\src\vgmdb\cache\kaggle_3\res.json"]
add_demus_res_to_meta(res_files)

res len: 1834
res len: 4017
find 0 files not in res
find duplicate song: TAMUSIC - 夏影.mp3
cannot find a same song id, add to meta...
find duplicate song: UI-70 - 亡き王女の為のセプテット.mp3
cannot find a same song id, add to meta...
find duplicate song: TAMUSIC - 朝影.mp3
cannot find a same song id, add to meta...
cannot find: E:\music\20250227\Scott Morton - Praying Warrior (1).mp3
find duplicate song: TAMUSIC - 笑顔の向こう侧に.mp3
cannot find a same song id, add to meta...
find duplicate song: 志方あきこ - ロマの娘.mp3
cannot find a same song id, add to meta...
find duplicate song: 志方あきこ - ラヂヲ予报.mp3
cannot find a same song id, add to meta...
cannot find: E:\music\20250227\坂本英城 - 千両役者 (1).mp3


In [9]:
import re

re.sub(r"&amp;amp;", "&", "&amp;amp;casc")

'&casc'

## 自动kaggle训练脚本

In [130]:
from auto_web import *
import pyperclip
from selenium.webdriver.common.keys import Keys
import ast

# 每个kaggle账号起一个实例
cookie_dir = r"C:\Users\11527\AppData\Local\Google\Chrome\slnm_data2"



In [None]:
import soundfile as sf
from pydub import AudioSegment



In [None]:
!python3 -m pip install -U git+https://github.com/facebookresearch/demucs#egg=demucs

# 移动到外层解压
!rm -rf /kaggle/working/music*
!rm -rf /kaggle/working/res*json*
!cp /kaggle/input/music123/music_zip0 /kaggle/working/music.tar.gz && tar -zxvf /kaggle/working/music.tar.gz 2>&1 >/dev/null
!ls -A1 /kaggle/working/music/temp/music | wc -l


import demucs.separate
import librosa
import os
import scipy.io.wavfile as wavfile
import numpy as np
import shutil
import tqdm
import soundfile as sf
from pydub import AudioSegment
import json,time
import psutil

# 判断内存使用情况
def check_mem_out():
    memory = psutil.virtual_memory()
    if memory.percent >= 90:
        print("memory out, need restart kernal!")
        return True
    return False

# output_dir = ".\\output"

def split_music_file_name(file_name):
    base_name = ""
    ext_name = ""

    if file_name[-4:] in [".wav", ".WAV", ".mp3", ".MP3", ".ogg", ".OGG", ".m4a"]:
        base_name = file_name[:-4]
        ext_name = file_name[-4:]
    elif file_name[-5:] in [".wave", ".WAVE", ".flac", ".FLAC"]:
        base_name = file_name[:-5]
        ext_name = file_name[-5:]
    else:
        print(file_name, " is not a valid file")
        assert(0)
    return base_name, ext_name

def sf_read(f):
    try:
        data, _ = sf.read(f)
    except:
        if f[-3:] == 'mp3':
            assert(0)
        audio = AudioSegment.from_file(f, format=f[-3:])
        out_file=f[:-3]+"mp3"
        audio.export(out_f=out_file, format="mp3")
        # with open(out_file, 'rb') as ff:
        #     ff.flush()
        data, _ = sf.read(out_file)
        os.remove(out_file)
    return data
        

# 工作目录
work_dir = "/kaggle/working/"
os.makedirs(work_dir, exist_ok=True)

# 原始音频路径
raw_music_root_path = os.path.join(work_dir, "music/temp/music")
os.makedirs(raw_music_root_path, exist_ok=True)

# 识别为有vocal的音频存放路径
pos_music_save_path = os.path.join(work_dir, "demucs/pos")
os.makedirs(pos_music_save_path, exist_ok=True)

# vocal+识别结果路径
res_path = os.path.join(work_dir, "demucs/res")
os.makedirs(res_path, exist_ok=True)

# 临时文件路径
temp_music_path = os.path.join(work_dir, "demucs/temp")
os.makedirs(temp_music_path, exist_ok=True)

# 结果文件
res_file = os.path.join(work_dir, "res.json")

# 需要转换的音频格式
need_trans_formats = ["m4a", "M4A", "aac", "AAC"]

# 待识别音频文件列表
music_file_list = [
    os.path.abspath(os.path.join(raw_music_root_path, file)) 
    for file in os.listdir(raw_music_root_path)
]


print("total song num: ", len(music_file_list))


for [f,_,_2] in res:
    f = os.path.basename(f)
    # os.remove(os.path.join(raw_music_root_path, f))
    try:
        os.remove(os.path.join(raw_music_root_path, f))
    except:
        pass
print("cur ok num: ", len(res))



# 待识别音频文件列表
music_file_list = [
    os.path.abspath(os.path.join(raw_music_root_path, file)) 
    for file in os.listdir(raw_music_root_path)
]
print(len(music_file_list))



for file in tqdm.tqdm(music_file_list):
    if file in [
        "/kaggle/working/music/temp/music/TAMUSIC - 亡き王女の为のセプテット (1).ogg",
        "/kaggle/working/music/temp/music/TAMUSIC - 冻土高原.ogg",
    ]:
        continue
    
    if check_mem_out():
        break
    
    # 获取音频后缀、文件名
    file_name = os.path.basename(file)
    base_name, ext_name = split_music_file_name(file_name)
    
    if file[-3:] in need_trans_formats:
        continue
    
    # 输出文件路径
    out_put_dir = os.path.join(os.path.join(res_path, "mdx_extra"), base_name)
    out_put_vocal_file = os.path.join(out_put_dir, "vocals.wav")
    out_put_novocal_file = os.path.join(out_put_dir, "no_vocals.wav")
    ratio_res = os.path.join(out_put_dir, "res.txt")

    if not os.path.exists(ratio_res):
        demucs.separate.main(["--two-stems", "vocals", "-o", res_path, "-n", "mdx_extra", file])
        data_org = sf_read(file)
        data_vocal = sf_read(out_put_vocal_file)
        energy_org = np.sum(data_org**2, axis=1)
        # 计算原曲的唯一标识
        org_song_id_val = round(np.sum(energy_org), 3)
        energy_vocal = np.sum(data_vocal**2, axis=1)
        ratio = np.mean(energy_org)/np.mean(energy_vocal)
        with open(ratio_res, "w") as f:
            f.write(str(ratio))
    else:
        with open(ratio_res, "r") as f:
            ratio = float(f.read())
    
    res.append([file_name,org_song_id_val,ratio])
    
    if ratio < 100:
        print("[positove] ", ratio, "\t", file)
    else:
        print("[negative] ", ratio, "\t", file)

    print("res:")
    print(res)
    print("\n")

    
    with open(res_file, "w") as f:
        f.write(json.dumps(res))

    # os.remove(file)
    shutil.rmtree(out_put_dir)  # 删除非必要目录减少session恢复时间




In [None]:

!python3 -m pip install -U git+https://github.com/facebookresearch/demucs#egg=demucs

# 移动到外层解压
!rm -rf /kaggle/working/music*
!rm -rf /kaggle/working/res*json*
!cp /kaggle/input/music123/music_zip* /kaggle/working/music.tar.gz && tar -zxvf /kaggle/working/music.tar.gz 2>&1 >/dev/null
!ls -A1 /kaggle/working/music/temp/music | wc -l


import demucs.separate
from demucs.api import *
import librosa
import os
import scipy.io.wavfile as wavfile
import numpy as np
import shutil
import tqdm
import soundfile as sf
from pydub import AudioSegment
import json,time
import psutil

# 判断内存使用情况
def check_mem_out():
    memory = psutil.virtual_memory()
    if memory.percent >= 90:
        print("memory out, need restart kernal!")
        return True
    return False

# output_dir = ".\\output"

def split_music_file_name(file_name):
    base_name = ""
    ext_name = ""

    if file_name[-4:] in [".wav", ".WAV", ".mp3", ".MP3", ".ogg", ".OGG", ".m4a"]:
        base_name = file_name[:-4]
        ext_name = file_name[-4:]
    elif file_name[-5:] in [".wave", ".WAVE", ".flac", ".FLAC"]:
        base_name = file_name[:-5]
        ext_name = file_name[-5:]
    else:
        print(file_name, " is not a valid file")
        assert(0)
    return base_name, ext_name

def sf_read(f):
    try:
        data, _ = sf.read(f)
    except:
        if f[-3:] == 'mp3':
            assert(0)
        audio = AudioSegment.from_file(f, format=f[-3:])
        out_file=f[:-3]+"mp3"
        audio.export(out_f=out_file, format="mp3")
        # with open(out_file, 'rb') as ff:
        #     ff.flush()
        data, _ = sf.read(out_file)
        os.remove(out_file)
    return data
        

# 工作目录
work_dir = "/kaggle/working/"
os.makedirs(work_dir, exist_ok=True)

# 原始音频路径
raw_music_root_path = os.path.join(work_dir, "music/temp/music")
os.makedirs(raw_music_root_path, exist_ok=True)

# 识别为有vocal的音频存放路径
pos_music_save_path = os.path.join(work_dir, "demucs/pos")
os.makedirs(pos_music_save_path, exist_ok=True)

# vocal+识别结果路径
res_path = os.path.join(work_dir, "demucs/res")
os.makedirs(res_path, exist_ok=True)

# 临时文件路径
temp_music_path = os.path.join(work_dir, "demucs/temp")
os.makedirs(temp_music_path, exist_ok=True)

# 结果文件
res_file = os.path.join(work_dir, "res.json")

# 需要转换的音频格式
need_trans_formats = ["m4a", "M4A", "aac", "AAC"]

# 待识别音频文件列表
music_file_list = [
    os.path.abspath(os.path.join(raw_music_root_path, file)) 
    for file in os.listdir(raw_music_root_path)
]


print("total song num: ", len(music_file_list))


for [f,_,_2] in res:
    f = os.path.basename(f)
    # os.remove(os.path.join(raw_music_root_path, f))
    try:
        os.remove(os.path.join(raw_music_root_path, f))
    except:
        pass
print("cur ok num: ", len(res))



# 待识别音频文件列表
music_file_list = [
    os.path.abspath(os.path.join(raw_music_root_path, file)) 
    for file in os.listdir(raw_music_root_path)
]
print(len(music_file_list))



for file in tqdm.tqdm(music_file_list):
    if check_mem_out():
        break
    
    # 获取音频后缀、文件名
    file_name = os.path.basename(file)
    base_name, ext_name = split_music_file_name(file_name)
    
    if file[-3:] in need_trans_formats:
        continue
    
    # 输出文件路径
    out_put_dir = os.path.join(os.path.join(res_path, "mdx_extra"), base_name)
    out_put_vocal_file = os.path.join(out_put_dir, "vocals.wav")
    out_put_novocal_file = os.path.join(out_put_dir, "no_vocals.wav")
    # ratio_res = os.path.join(out_put_dir, "res.txt")

    demucs_ok = False
    try:
        demucs.separate.main(["--two-stems", "vocals", "-o", res_path, "-n", "mdx_extra", file])
        demucs_ok = True
    # except LoadAudioError as e:
    except:
        demucs_ok = False

    if demucs_ok:
        data_org = sf_read(file)
        energy_org = np.sum(data_org**2, axis=1)
        # 计算原曲的唯一标识
        org_song_id_val = round(np.sum(energy_org), 3)
        data_vocal = sf_read(out_put_vocal_file)
        energy_vocal = np.sum(data_vocal**2, axis=1)
        ratio = np.mean(energy_org)/np.mean(energy_vocal)
        shutil.rmtree(out_put_dir)  # 删除非必要目录减少session恢复时间
    else:
        org_song_id_val = 0
        ratio = 0.001

    res.append([file_name,org_song_id_val,ratio])
    
    if ratio < 100:
        print("[positove] ", ratio, "\t", file)
    else:
        print("[negative] ", ratio, "\t", file)

    print("res:")
    print(res)
    print("\n")

    
    with open(res_file, "w") as f:
        f.write(json.dumps(res))

    # os.remove(file)
    


# res0 =[] 
# import json
# with open('/kaggle/working/res.json', "w") as f:
#     f.write(json.dumps(res0))


In [31]:
import os,json


# for file in os.listdir("./album/ne")[-2000:]:
#     with open(os.path.join("./album/ne", file)) as f:
#         album_data = json.loads(f.read())
#     if "Baroque In the Future" == album_data.get("name", ""):
#         print(file)
#         break

song_list = []
url_set = set()

for i in range(7502, 9000):
    file_path = os.path.join("./album/ne", f"{i}.json")
    if not os.path.exists(file_path):
        continue
    with open(file_path, "r") as f:
        album_data = json.loads(f.read())
    
    for song in album_data.get("s", []):
        if not song[2] and (song[1] not in url_set):
            url_set.add(song[1])
            song_list.append(song)

print(len(song_list))


4998


In [None]:
import requests
import threading
from queue import Queue

# 下载MP3的核心函数
def download_mp3(song_info):
    song_id = int(song_info[1].split("=")[-1])
    url = f"http://music.163.com/song/media/outer/url?id={song_id}.mp3"
    try:
        response = requests.get(url, stream=True)
        if response.status_code == 200:
            # 以songid作为文件名保存
            file_name = os.path.join(r"E:\music\20250227", f"{song_info[0]}.mp3")
            with open(file_name, 'wb') as f:
                for chunk in response.iter_content(chunk_size=1024):
                    if chunk:
                        f.write(chunk)
            print(f"[成功] 文件 {song_id}.mp3 下载完成")
        else:
            print(f"[失败] 无效响应码：{response.status_code}（SongID: {song_id}）")
    except Exception as e:
        print(f"[异常] SongID {song_id} 下载失败：{str(e)}")
        print(song_info)

# 多线程任务分发
def worker(queue):
    while not queue.empty():
        song_info = queue.get()
        download_mp3(song_info)
        queue.task_done()

def dl_song_list_3threads(song_list):
    thread_count = 3  # 三路并发

    # 创建任务队列
    task_queue = Queue()
    for song_info in song_list:
        task_queue.put(song_info)

    # 启动线程池
    threads = []
    for _ in range(thread_count):
        thread = threading.Thread(target=worker, args=(task_queue,))
        thread.start()
        threads.append(thread)

    # 等待所有任务完成
    task_queue.join()
    for thread in threads:
        thread.join()

    print("所有任务处理完毕")

In [None]:
## 将本次专辑中，下载失败的专辑重新加入新的歌单

new_vip_songlist_url = ""

start_album_id = 7502
end_album_id = 8934




OK
