# MuVi-Sync / Vevo 数据集预处理
在 `dataset` 目录下运行此 Notebook，按顺序执行，从新增的 `.mp4` 开始提取各类特征。


## 0. 环境与路径
- 需安装 `ffmpeg`、Python 依赖（`pip install -r ../requirements.txt`）
- 将新视频放到 `dataset/vevo/<id>.mp4`，建议 `<id>` 用 3 位数字，与现有文件一致
- 运行前修改下面的 `VIDEO_ID`


In [1]:
from pathlib import Path
VIDEO_ID = '049'  # 修改为你的新视频编号（不含扩展名）
DATASET_ROOT = Path(__file__).resolve().parent if '__file__' in globals() else Path('.').resolve()
PROJECT_ROOT = DATASET_ROOT.parent
VIDEO_PATH = DATASET_ROOT / 'vevo' / f'{VIDEO_ID}.mp4'
assert VIDEO_PATH.exists(), f'未找到视频文件: {VIDEO_PATH}'
VIDEO_PATH


PosixPath('/home/jim/Video2Music/dataset/vevo/049.mp4')

In [2]:
import subprocess
# 可选：安装依赖
# subprocess.run(['pip','install','-r','../requirements.txt'], check=True)
subprocess.run(['ffmpeg','-version'], check=True, stdout=subprocess.PIPE, text=True).stdout.split('\n')[0]


'ffmpeg version 4.2.7-0ubuntu0.1 Copyright (c) 2000-2022 the FFmpeg developers'

## 1. 抽帧
每秒抽 1 帧，保存到 `vevo_frame/<id>/`。


In [3]:
import subprocess
FRAME_DIR = DATASET_ROOT / 'vevo_frame' / VIDEO_ID
FRAME_DIR.mkdir(parents=True, exist_ok=True)
frame_pattern = FRAME_DIR / f'{VIDEO_ID}_%03d.jpg'
cmd = [
    'ffmpeg', '-i', str(VIDEO_PATH),
    '-vf', 'select=bitor(gte(t-prev_selected_t\,1)\,isnan(prev_selected_t))',
    '-vsync', '0', '-qmin', '1', '-q:v', '1', str(frame_pattern)
]
subprocess.run(cmd, check=True)
len(list(FRAME_DIR.glob('*.jpg'))), FRAME_DIR


ffmpeg version 4.2.7-0ubuntu0.1 Copyright (c) 2000-2022 the FFmpeg developers
  built with gcc 9 (Ubuntu 9.4.0-1ubuntu1~20.04.1)
  configuration: --prefix=/usr --extra-version=0ubuntu0.1 --toolchain=hardened --libdir=/usr/lib/x86_64-linux-gnu --incdir=/usr/include/x86_64-linux-gnu --arch=amd64 --enable-gpl --disable-stripping --enable-avresample --disable-filter=resample --enable-avisynth --enable-gnutls --enable-ladspa --enable-libaom --enable-libass --enable-libbluray --enable-libbs2b --enable-libcaca --enable-libcdio --enable-libcodec2 --enable-libflite --enable-libfontconfig --enable-libfreetype --enable-libfribidi --enable-libgme --enable-libgsm --enable-libjack --enable-libmp3lame --enable-libmysofa --enable-libopenjpeg --enable-libopenmpt --enable-libopus --enable-libpulse --enable-librsvg --enable-librubberband --enable-libshine --enable-libsnappy --enable-libsoxr --enable-libspeex --enable-libssh --enable-libtheora --enable-libtwolame --enable-libvidstab --enable-libvorbis --e

(234, PosixPath('/home/jim/Video2Music/dataset/vevo_frame/049'))

In [4]:
import cv2
MOTION_DIR = DATASET_ROOT / 'vevo_motion' / 'all'
MOTION_DIR.mkdir(parents=True, exist_ok=True)
cap = cv2.VideoCapture(str(VIDEO_PATH))
motiondict = {0: '0.0000'}
prev_frame = None
prev_time = 0.0
fps = cap.get(cv2.CAP_PROP_FPS) or 30.0
frame_interval = 1.0  # 每秒采样一次
frame_count = 0

while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    
    curr_time = frame_count / fps
    
    if prev_frame is not None and curr_time - prev_time >= frame_interval:
        diff = cv2.absdiff(frame, prev_frame)
        diff_rgb = cv2.cvtColor(diff, cv2.COLOR_BGR2RGB)
        motion_value = format(diff_rgb.mean(), '.4f')
        sec_idx = int(curr_time)
        motiondict[sec_idx] = str(motion_value)
        prev_time = curr_time
    
    prev_frame = frame.copy()
    frame_count += 1

cap.release()
cv2.destroyAllWindows()

# 确保所有秒都有值（填充缺失的秒）
max_sec = max(motiondict.keys()) if motiondict else 0
for i in range(max_sec + 1):
    if i not in motiondict:
        motiondict[i] = '0.0000'

motion_path = MOTION_DIR / f'{VIDEO_ID}.lab'
with open(motion_path, 'w', encoding='utf-8') as f:
    for i in sorted(motiondict.keys()):
        f.write(f'{i} {motiondict[i]}\n')
motion_path


PosixPath('/home/jim/Video2Music/dataset/vevo_motion/all/049.lab')

## 2. 语义特征 (CLIP)
输出到 `vevo_semantic/all/2d/clip_l14p/<id>.npy`。


In [5]:
import torch, clip, numpy as np
from PIL import Image
SEM_DIR = DATASET_ROOT / 'vevo_semantic' / 'all' / '2d' / 'clip_l14p'
SEM_DIR.mkdir(parents=True, exist_ok=True)
device = 'cuda' if torch.cuda.is_available() else 'cpu'
model, preprocess = clip.load('ViT-L/14@336px', device=device)
frame_files = sorted(FRAME_DIR.glob('*.jpg'))
features = torch.zeros((len(frame_files), 768), device=device)
for idx, fpath in enumerate(frame_files):
    image = preprocess(Image.open(fpath)).unsqueeze(0).to(device)
    with torch.no_grad():
        features[idx] = model.encode_image(image)[0]
np.save(SEM_DIR / f'{VIDEO_ID}.npy', features.cpu().numpy())
SEM_DIR / f'{VIDEO_ID}.npy'


PosixPath('/home/jim/Video2Music/dataset/vevo_semantic/all/2d/clip_l14p/049.npy')

## 3. 情感特征 (6 类)
输出到 `vevo_emotion/6c_l14p/all/<id>.lab`。


In [6]:
EMO_DIR = DATASET_ROOT / 'vevo_emotion' / '6c_l14p' / 'all'
EMO_DIR.mkdir(parents=True, exist_ok=True)
text = clip.tokenize(['exciting', 'fearful', 'tense', 'sad', 'relaxing', 'neutral']).to(device)
emolist = []
for fpath in frame_files:
    image = preprocess(Image.open(fpath)).unsqueeze(0).to(device)
    with torch.no_grad():
        logits_per_image, _ = model(image, text)
        probs = logits_per_image.softmax(dim=-1).cpu().numpy()[0]
    emolist.append(' '.join([format(p, '.4f') for p in probs]))
out_path = EMO_DIR / f'{VIDEO_ID}.lab'
with open(out_path, 'w', encoding='utf-8') as f:
    f.write('time exciting_prob fearful_prob tense_prob sad_prob relaxing_prob neutral_prob\n')
    for i, line in enumerate(emolist):
        f.write(f'{i} {line}\n')
out_path


PosixPath('/home/jim/Video2Music/dataset/vevo_emotion/6c_l14p/all/049.lab')

## 4. 分镜 + Scene Offset
生成 `vevo_scene/all/<id>.lab`，再转换为 `vevo_scene_offset/all/<id>.lab`。


In [7]:
import math
from scenedetect import open_video, SceneManager
from scenedetect.detectors import AdaptiveDetector
SCENE_DIR = DATASET_ROOT / 'vevo_scene' / 'all'
SCENE_DIR.mkdir(parents=True, exist_ok=True)
SCENE_OFFSET_DIR = DATASET_ROOT / 'vevo_scene_offset' / 'all'
SCENE_OFFSET_DIR.mkdir(parents=True, exist_ok=True)
video_stream = open_video(str(VIDEO_PATH))
scene_manager = SceneManager()
scene_manager.add_detector(AdaptiveDetector())
scene_manager.detect_scenes(video_stream, show_progress=False)
scene_list = scene_manager.get_scene_list()
scenedict = {}
sec = 0
for idx, scene in enumerate(scene_list):
    end_int = math.ceil(scene[1].get_seconds())
    for s in range(sec, end_int):
        scenedict[s] = str(idx)
        sec += 1
scene_path = SCENE_DIR / f'{VIDEO_ID}.lab'
with open(scene_path, 'w', encoding='utf-8') as f:
    for i in range(len(scenedict)):
        f.write(f'{i} {scenedict[i]}\n')
ids = [int(v) for v in scenedict.values()]
offset_list = []
if ids:
    current = ids[0]
    offset = 0
    for vid in ids:
        if vid != current:
            current = vid
            offset = 0
        offset_list.append(offset)
        offset += 1
scene_offset_path = SCENE_OFFSET_DIR / f'{VIDEO_ID}.lab'
with open(scene_offset_path, 'w', encoding='utf-8') as f:
    for i, v in enumerate(offset_list):
        f.write(f'{i} {v}\n')
scene_path, scene_offset_path


(PosixPath('/home/jim/Video2Music/dataset/vevo_scene/all/049.lab'),
 PosixPath('/home/jim/Video2Music/dataset/vevo_scene_offset/all/049.lab'))

## 5. 运动特征
输出到 `vevo_motion/all/<id>.lab`。


In [8]:
import cv2
MOTION_DIR = DATASET_ROOT / 'vevo_motion' / 'all'
MOTION_DIR.mkdir(parents=True, exist_ok=True)
cap = cv2.VideoCapture(str(VIDEO_PATH))
motiondict = {0: '0.0000'}
prev_frame = None
prev_time = 0
while cap.isOpened():
    ret, frame = cap.read()
    if not ret:
        break
    curr_time = cap.get(cv2.CAP_PROP_POS_MSEC) / 1000.0
    if prev_frame is not None and curr_time - prev_time >= 1:
        diff = cv2.absdiff(frame, prev_frame)
        diff_rgb = cv2.cvtColor(diff, cv2.COLOR_BGR2RGB)
        motion_value = format(diff_rgb.mean(), '.4f')
        motiondict[int(curr_time)] = str(motion_value)
        prev_time = int(curr_time)
    prev_frame = frame.copy()
cap.release()
cv2.destroyAllWindows()
motion_path = MOTION_DIR / f'{VIDEO_ID}.lab'
with open(motion_path, 'w', encoding='utf-8') as f:
    for i in range(len(motiondict)):
        f.write(f'{i} {motiondict[i]}\n')
motion_path


PosixPath('/home/jim/Video2Music/dataset/vevo_motion/all/049.lab')

## 6. 可选：音频特征 & 元数据
- 先从 mp4 抽取 wav：输出到 `vevo_audio/wav/<id>.wav`
- 有 wav 后可算响度：写入 `vevo_loudness/all/<id>.lab`
- 有 MIDI 和和弦标注：写 note density 到 `vevo_note_density/all/<id>.lab`
- 最后把 `<id>` 加入 `vevo_meta/idlist.txt` 与 `vevo_meta/split/v1/*.txt`


In [9]:
# 6.1 从 mp4 抽取 wav（44100Hz 单声道）
AUDIO_DIR = DATASET_ROOT / 'vevo_audio' / 'wav'
AUDIO_DIR.mkdir(parents=True, exist_ok=True)
WAV_PATH = AUDIO_DIR / f'{VIDEO_ID}.wav'
# 缺省添加 -y，自动覆盖已有 wav
cmd = [
    'ffmpeg', '-y', '-i', str(VIDEO_PATH),
    '-vn', '-ac', '1', '-ar', '44100', str(WAV_PATH)
]
subprocess.run(cmd, check=True)
WAV_PATH, WAV_PATH.exists()


ffmpeg version 4.2.7-0ubuntu0.1 Copyright (c) 2000-2022 the FFmpeg developers
  built with gcc 9 (Ubuntu 9.4.0-1ubuntu1~20.04.1)
  configuration: --prefix=/usr --extra-version=0ubuntu0.1 --toolchain=hardened --libdir=/usr/lib/x86_64-linux-gnu --incdir=/usr/include/x86_64-linux-gnu --arch=amd64 --enable-gpl --disable-stripping --enable-avresample --disable-filter=resample --enable-avisynth --enable-gnutls --enable-ladspa --enable-libaom --enable-libass --enable-libbluray --enable-libbs2b --enable-libcaca --enable-libcdio --enable-libcodec2 --enable-libflite --enable-libfontconfig --enable-libfreetype --enable-libfribidi --enable-libgme --enable-libgsm --enable-libjack --enable-libmp3lame --enable-libmysofa --enable-libopenjpeg --enable-libopenmpt --enable-libopus --enable-libpulse --enable-librsvg --enable-librubberband --enable-libshine --enable-libsnappy --enable-libsoxr --enable-libspeex --enable-libssh --enable-libtheora --enable-libtwolame --enable-libvidstab --enable-libvorbis --e

(PosixPath('/home/jim/Video2Music/dataset/vevo_audio/wav/049.wav'), True)

In [10]:
# 6.2 响度特征（与 script/loudness_feature.py 一致）
import audioop, numpy as np
from pydub import AudioSegment
from pydub.utils import make_chunks
LOUDNESS_DIR = DATASET_ROOT / 'vevo_loudness' / 'all'
LOUDNESS_DIR.mkdir(parents=True, exist_ok=True)
audio_data = AudioSegment.from_file(WAV_PATH)
audio_data = audio_data.set_channels(1).set_frame_rate(44100)
chunks = make_chunks(audio_data, 1000)  # 每秒
loudness_per_second = []
for chunk in chunks:
    data = chunk.raw_data
    rms = audioop.rms(data, 2)
    loudness = 20 * np.log10(rms / 32767)
    normalized = 10 ** (loudness / 20)
    loudness_per_second.append(format(normalized, '.4f'))
loudness_path = LOUDNESS_DIR / f'{VIDEO_ID}.lab'
with open(loudness_path, 'w', encoding='utf-8') as f:
    for i, v in enumerate(loudness_per_second):
        f.write(f'{i} {v}\n')
loudness_path


  loudness = 20 * np.log10(rms / 32767)


PosixPath('/home/jim/Video2Music/dataset/vevo_loudness/all/049.lab')

## 6.3 准备 MIDI（可选自动合成或外部转录）
- 若已有 MIDI，跳过本步。将文件放到 `vevo_midi/all/<id>.mid`。
- 若无 MIDI，可用外部转录工具（如 basic-pitch/Onsets & Frames 等）对 `vevo_audio/wav/<id>.wav` 转录并保存到上述路径。
- 或者勾选下方开关，依据和弦标注 `vevo_chord/lab_v2_norm/all/<id>.lab` 合成简单和弦垫底 MIDI（每秒一个和弦）。


## 6.2b 无和弦标注时生成占位和弦文件（可选）
如果没有人工和弦标注，可用本单元基于音频时长生成占位文件 `vevo_chord/lab_v2_norm/all/<id>.lab`，内容为固定和弦或 N，仅为后续步骤跑通。


## 6.2c 使用 Omnizart 自动和弦识别
需要先安装 `omnizart` 并下载 checkpoint：
- 安装：`pip install omnizart`
- 下载模型：`omnizart download-checkpoints`
无法联网时，可在有网环境下载后拷贝 `~/.omnizart/` 到当前机器。


In [11]:
import os
import subprocess
import tempfile
from pathlib import Path

# omni 环境的 Python 路径
OMNI_PYTHON = '/home/jim/anaconda3/envs/omni/bin/python'
assert os.path.exists(OMNI_PYTHON), f'omni 环境不存在: {OMNI_PYTHON}'

CHORD_DIR = DATASET_ROOT / 'vevo_chord' / 'lab_v2_norm' / 'all'
CHORD_DIR.mkdir(parents=True, exist_ok=True)
CHORD_PATH = CHORD_DIR / f'{VIDEO_ID}.lab'

# 设置环境变量（包括 LD_PRELOAD）
ffi_path = '/lib/x86_64-linux-gnu/libffi.so.7'
env = os.environ.copy()
if os.path.exists(ffi_path):
    if 'LD_PRELOAD' in env:
        env['LD_PRELOAD'] = ffi_path + ':' + env['LD_PRELOAD']
    else:
        env['LD_PRELOAD'] = ffi_path

# 创建临时 Python 脚本的代码行列表
script_lines = [
    'import os',
    'import sys',
    'import math',
    'import csv',
    'from pathlib import Path',
    '',
    "# 设置 LD_PRELOAD 以解决 libffi 问题",
    "ffi_path = '/lib/x86_64-linux-gnu/libffi.so.7'",
    'if os.path.exists(ffi_path):',
    "    if 'LD_PRELOAD' in os.environ:",
    "        os.environ['LD_PRELOAD'] = ffi_path + ':' + os.environ['LD_PRELOAD']",
    '    else:',
    "        os.environ['LD_PRELOAD'] = ffi_path",
    '',
    'try:',
    '    from omnizart.chord import app as chord_app',
    '',
    '    wav_path = sys.argv[1]',
    '    output_dir = sys.argv[2]',
    '    video_id = sys.argv[3]',
    '',
    '# 运行 Omnizart Chord 模型',
    '    midi_result = chord_app.transcribe(wav_path, output=output_dir)',
    '',
    '# 尝试读取生成的 CSV 文件',
    '    csv_path = Path(output_dir) / f\'{video_id}\'',
    '    chord_path = Path(output_dir) / f\'{video_id}.lab\'',
    '',
    "    lines = ['key C major']",
    '',
    '    if csv_path.exists():',
    '        changes = []',
    '        with open(csv_path, \'r\', encoding=\'utf-8\') as f:',
    '            reader = csv.reader(f)',
    '            next(reader)  # 跳过标题行',
    '            for row in reader:',
    '                if len(row) >= 3:',
    '                    chord_label = row[0] if len(row) > 0 else \'N\'',
    '                    start_time = float(row[1]) if len(row) > 1 else 0.0',
    '                    end_time = float(row[2]) if len(row) > 2 else 0.0',
    '                    changes.append((start_time, end_time, chord_label))',
    '        ',
    '        if changes:',
    '            total_dur = int(math.ceil(changes[-1][1]))',
    '            for sec in range(total_dur):',
    '                label = \'N\'',
    '                for start, end, chord in changes:',
    '                    if start <= sec < end:',
    '                        label = chord or \'N\'',
    '                        break',
    '                lines.append(f\'{sec} {label}\')',
    '        else:',
    '            lines.append(\'0 N\')',
    '    else:',
    '        print(f\'警告: 未找到 CSV 文件 {csv_path}，使用占位和弦\')',
    '        lines.append(\'0 N\')',
    '',
    '# 写入结果',
    '    with open(chord_path, \'w\', encoding=\'utf-8\') as f:',
    '        f.write(\'\\n\'.join(lines) + \'\\n\')',
    '',
    '    print(f\'成功生成和弦标注: {chord_path}\')',
    '    sys.exit(0)',
    'except Exception as e:',
    '    print(f\'Omnizart 失败: {e}\')',
    '    print(\'将使用占位和弦\')',
    '    sys.exit(1)',
]

# 将脚本写入临时文件
with tempfile.NamedTemporaryFile(mode='w', suffix='.py', delete=False, encoding='utf-8') as f:
    f.write('\n'.join(script_lines))
    temp_script = f.name

try:
    # 在 omni 环境中运行脚本，传递环境变量
    result = subprocess.run(
        [OMNI_PYTHON, temp_script, str(WAV_PATH), str(CHORD_DIR), VIDEO_ID],
        env=env,  # 传递包含 LD_PRELOAD 的环境变量
        capture_output=True,
        text=True,
        check=False  # 保留错误处理，失败时使用占位和弦
    )
    
    if result.returncode == 0:
        print(result.stdout)
        if result.stderr:
            print('警告:', result.stderr)
    else:
        print(f'Omnizart 执行失败 (退出码: {result.returncode})')
        print('stdout:', result.stdout)
        print('stderr:', result.stderr)
        print('\n使用占位和弦...')
        # 生成占位和弦文件
        from pydub import AudioSegment
        from math import ceil
        audio = AudioSegment.from_file(WAV_PATH)
        dur_sec = ceil(len(audio) / 1000)
        with open(CHORD_PATH, 'w', encoding='utf-8') as f:
            f.write('key C major\n')
            for t in range(dur_sec):
                f.write(f'{t} N\n')
        print(f'已生成占位和弦标注: {CHORD_PATH}')
finally:
    # 清理临时文件
    if os.path.exists(temp_script):
        os.unlink(temp_script)

print('写出和弦标注:', CHORD_PATH)
CHORD_PATH



成功生成和弦标注: /home/jim/Video2Music/dataset/vevo_chord/lab_v2_norm/all/049.lab

警告: 2025-12-04 01:10:28 Extracting feature
2025-12-04 01:10:29 Loading model
2025-12-04 01:10:29 Using built-in model /home/jim/anaconda3/envs/omni/lib/python3.8/site-packages/omnizart/checkpoints/chord/chord_v1 for transcription.
2025-12-04 01:10:34 Preparing feature for model prediction
INFO:Chord Application:Preparing feature for model prediction
2025-12-04 01:10:34 Predicting...
INFO:Chord Application:Predicting...
2025-12-04 01:10:35 Infering chords...
INFO:Chord Application:Infering chords...
2025-12-04 01:10:35 MIDI file has been written to /home/jim/Video2Music/dataset/vevo_chord/lab_v2_norm/all/049.mid.
INFO:Base Class:MIDI file has been written to /home/jim/Video2Music/dataset/vevo_chord/lab_v2_norm/all/049.mid.
2025-12-04 01:10:35 MIDI and CSV file have been written to /home/jim/Video2Music/dataset/vevo_chord/lab_v2_norm/all
INFO:Chord Application:MIDI and CSV file have been written to /home/jim/Vid

PosixPath('/home/jim/Video2Music/dataset/vevo_chord/lab_v2_norm/all/049.lab')

In [12]:
import os, sys, subprocess, types, re
ffi_path = '/usr/lib/x86_64-linux-gnu/libffi.so.7'
if os.path.exists(ffi_path):
    os.environ['LD_PRELOAD'] = ffi_path + (':' + os.environ['LD_PRELOAD'] if 'LD_PRELOAD' in os.environ else '')
# 卸载 fluidsynth，避免 pretty_midi 导入系统库冲突
subprocess.run([sys.executable, '-m', 'pip', 'uninstall', '-y', 'fluidsynth'], check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
os.environ.setdefault('PRETTY_MIDI_USE_FLUIDSYNTH', '0')
def safe_import_pretty_midi():
    try:
        import pretty_midi as pm
        return pm
    except OSError:
        sys.modules['fluidsynth'] = types.ModuleType('fluidsynth')
        return __import__('pretty_midi')
pretty_midi = safe_import_pretty_midi()
GENERATE_MIDI_FROM_CHORDS = True  # 若已有 MIDI，请设为 False
from typing import List
MIDI_DIR = DATASET_ROOT / 'vevo_midi' / 'all'
MIDI_DIR.mkdir(parents=True, exist_ok=True)
CHORD_DIR = DATASET_ROOT / 'vevo_chord' / 'lab_v2_norm' / 'all'
MID_PATH = MIDI_DIR / f'{VIDEO_ID}.mid'
CHORD_PATH = CHORD_DIR / f'{VIDEO_ID}.lab'

if MID_PATH.exists():
    print('已有 MIDI，跳过生成:', MID_PATH)
else:
    assert CHORD_PATH.exists(), f'缺少和弦标注: {CHORD_PATH}'
    if not GENERATE_MIDI_FROM_CHORDS:
        raise SystemExit('请先提供 vevo_midi/all/<id>.mid 后再继续')

    def chord_to_intervals(chord_text: str) -> List[int]:
        text = chord_text.lower()
        if 'dim' in text or 'o' in text:
            return [0,3,6]
        if 'aug' in text or '+' in text:
            return [0,4,8]
        if 'sus2' in text:
            return [0,2,7]
        if 'sus' in text:
            return [0,5,7]
        if 'min6' in text:
            return [0,3,7,9]
        if 'min7' in text or (':7' in text and 'min' in text):
            return [0,3,7,10]
        if '7' in text and 'maj' in text:
            return [0,4,7,11]
        if '7' in text:
            return [0,4,7,10]
        if 'min' in text or 'm:' in text or text.endswith('m'):
            return [0,3,7]
        return [0,4,7]

    def parse_root(chord_text: str) -> str:
        m = re.match(r'([A-Ga-g][b#]?)', chord_text)
        return m.group(1).upper() if m else 'C'

    pm = pretty_midi.PrettyMIDI()
    instrument = pretty_midi.Instrument(program=0)
    with open(CHORD_PATH, encoding='utf-8') as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) < 2 or parts[0] == 'key':
                continue
            try:
                t = int(parts[0])
            except ValueError:
                continue
            chord = parts[1]
            if chord == 'N':
                continue
            root = parse_root(chord)
            intervals = chord_to_intervals(chord)
            try:
                root_pitch = pretty_midi.note_name_to_number(root + '4')
            except Exception:
                root_pitch = pretty_midi.note_name_to_number('C4')
            velocity = 80
            duration = 1.0
            for iv in intervals:
                n = pretty_midi.Note(velocity=velocity, pitch=root_pitch + iv, start=t, end=t+duration)
                instrument.notes.append(n)
    pm.instruments.append(instrument)
    pm.write(str(MID_PATH))
    print('生成 MIDI:', MID_PATH)
MID_PATH


生成 MIDI: /home/jim/Video2Music/dataset/vevo_midi/all/049.mid


PosixPath('/home/jim/Video2Music/dataset/vevo_midi/all/049.mid')

In [13]:
import os, sys, subprocess, types
ffi_path = '/usr/lib/x86_64-linux-gnu/libffi.so.7'
if os.path.exists(ffi_path):
    os.environ['LD_PRELOAD'] = ffi_path + (':' + os.environ['LD_PRELOAD'] if 'LD_PRELOAD' in os.environ else '')
# 卸载 fluidsynth，避免 pretty_midi 导入系统库冲突
subprocess.run([sys.executable, '-m', 'pip', 'uninstall', '-y', 'fluidsynth'], check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
os.environ.setdefault('PRETTY_MIDI_USE_FLUIDSYNTH', '0')
def safe_import_pretty_midi():
    try:
        import pretty_midi as pm
        return pm
    except OSError:
        # stub fluidsynth to bypass optional dependency
        sys.modules['fluidsynth'] = types.ModuleType('fluidsynth')
        return __import__('pretty_midi')
pretty_midi = safe_import_pretty_midi()
import os, sys, subprocess
if os.path.exists(ffi_path):
    os.environ['LD_PRELOAD'] = ffi_path + (':' + os.environ['LD_PRELOAD'] if 'LD_PRELOAD' in os.environ else '')
subprocess.run([sys.executable, '-m', 'pip', 'uninstall', '-y', 'fluidsynth'], check=False, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
# 关闭 pretty_midi 的 fluidsynth 可选依赖
if os.path.exists(ffi_path):
    # 强制预加载系统 libffi，避免 fluidsynth 依赖报错
    os.environ['LD_PRELOAD'] = ffi_path + (':' + os.environ['LD_PRELOAD'] if 'LD_PRELOAD' in os.environ else '')
# 关闭 pretty_midi 的 fluidsynth 可选依赖，避免额外库加载
if os.path.exists(ffi_path):
    os.environ.setdefault('LD_PRELOAD', ffi_path)
# 6.4 Note Density（需 vevo_midi/all/<id>.mid + vevo_chord/lab_v2_norm/all/<id>.lab）
MIDI_DIR = DATASET_ROOT / 'vevo_midi' / 'all'
CHORD_DIR = DATASET_ROOT / 'vevo_chord' / 'lab_v2_norm' / 'all'
NOTE_DENSITY_DIR = DATASET_ROOT / 'vevo_note_density' / 'all'
NOTE_DENSITY_DIR.mkdir(parents=True, exist_ok=True)
MID_PATH = MIDI_DIR / f'{VIDEO_ID}.mid'
CHORD_PATH = CHORD_DIR / f'{VIDEO_ID}.lab'
assert MID_PATH.exists(), f'MIDI 不存在: {MID_PATH}'
assert CHORD_PATH.exists(), f'和弦标注不存在: {CHORD_PATH}'
# 以和弦标注长度截断，和 script/note_density_feature.py 保持一致
ct = 0
with open(CHORD_PATH, encoding='utf-8') as f:
    for line in f:
        line_arr = line.strip().split(' ')
        if len(line_arr) > 1:
            ct += 1
midi_data = pretty_midi.PrettyMIDI(str(MID_PATH))
total_time = int(midi_data.get_end_time())
note_density_list = []
for i in range(total_time + 1):
    start_time, end_time = i, i + 1
    total_notes = 0
    for instrument in midi_data.instruments:
        for note in instrument.notes:
            if note.start < end_time and note.end > start_time:
                total_notes += 1
    note_density_list.append(total_notes / float(end_time - start_time))
note_density_path = NOTE_DENSITY_DIR / f'{VIDEO_ID}.lab'
with open(note_density_path, 'w', encoding='utf-8') as f:
    for i in range(ct - 1):
        if i < len(note_density_list):
            f.write(f'{i} {note_density_list[i]}\n')
        else:
            f.write(f'{i} 0\n')
note_density_path


PosixPath('/home/jim/Video2Music/dataset/vevo_note_density/all/049.lab')

## 6.5 更新元数据（idlist + split）
执行下方代码将 `<id>` 写入 `vevo_meta/idlist.txt` 以及指定 split 文件（train/val/test）。


In [14]:
from pathlib import Path
meta_root = DATASET_ROOT / 'vevo_meta'
split_root = meta_root / 'split' / 'v1'
meta_root.mkdir(parents=True, exist_ok=True)
split_root.mkdir(parents=True, exist_ok=True)
# 确保元数据文件存在
for fname in ['idlist.txt']:
    fp = meta_root / fname
    fp.touch(exist_ok=True)
for fname in ['train.txt','val.txt','test.txt']:
    fp = split_root / fname
    fp.touch(exist_ok=True)
sorted(list(split_root.glob('*.txt')))


[PosixPath('/home/jim/Video2Music/dataset/vevo_meta/split/v1/test.txt'),
 PosixPath('/home/jim/Video2Music/dataset/vevo_meta/split/v1/train.txt'),
 PosixPath('/home/jim/Video2Music/dataset/vevo_meta/split/v1/val.txt')]

### 6.5.1 构建本地字典与 split（不依赖官方文件）
从已有和弦标注扫描和弦集合，生成 chord/chord_inv/chord_root/chord_attr 词典，并按 8:1:1 随机划分 train/val/test。
如需自定义属性集合，可在代码中的 attrs 列表里增删。


In [15]:
from pathlib import Path
import json, random
# 路径
meta_root = DATASET_ROOT / 'vevo_meta'
meta_root.mkdir(parents=True, exist_ok=True)
split_root = meta_root / 'split' / 'v1'
split_root.mkdir(parents=True, exist_ok=True)
chord_root = DATASET_ROOT / 'vevo_chord' / 'lab_v2_norm' / 'all'
labs = list(chord_root.glob('*.lab'))

# 1) 扫描和弦集合
chords = set()
ids = []
for lab in labs:
    ids.append(lab.stem)
    with open(lab, encoding='utf-8') as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) >= 2 and parts[0] != 'key':
                chords.add(parts[1])
ids = sorted(ids)
chords = sorted(chords)
# 2) 生成 chord 词典
chord2id = {c:i for i,c in enumerate(chords)}
(meta_root/'chord.json').write_text(json.dumps(chord2id, ensure_ascii=False, indent=2), encoding='utf-8')
(meta_root/'chord_inv.json').write_text(json.dumps({v:k for k,v in chord2id.items()}, ensure_ascii=False, indent=2), encoding='utf-8')
# 3) 根/属性词典（可按需调整）
roots = ['C','C#','D','D#','E','F','F#','G','G#','A','A#','B','N']
attrs = ['maj','min','dim','aug','sus','sus2','7','maj7','min7','min6','N']
root2id = {r:i for i,r in enumerate(roots)}
attr2id = {a:i for i,a in enumerate(attrs)}
(meta_root/'chord_root.json').write_text(json.dumps(root2id, ensure_ascii=False, indent=2), encoding='utf-8')
(meta_root/'chord_root_inv.json').write_text(json.dumps({v:k for k,v in root2id.items()}, ensure_ascii=False, indent=2), encoding='utf-8')
(meta_root/'chord_attr.json').write_text(json.dumps(attr2id, ensure_ascii=False, indent=2), encoding='utf-8')
(meta_root/'chord_attr_inv.json').write_text(json.dumps({v:k for k,v in attr2id.items()}, ensure_ascii=False, indent=2), encoding='utf-8')
# 保存 top_chord 列表
(meta_root/'top_chord.txt').write_text('\n'.join(chords)+'\n', encoding='utf-8')
# 写 idlist
(meta_root/'idlist.txt').write_text('\n'.join(ids)+'\n', encoding='utf-8')
# 4) 随机划分 8:1:1
ids_shuf = ids.copy()
random.shuffle(ids_shuf)
n = len(ids_shuf)
train, val, test = ids_shuf[:int(0.8*n)], ids_shuf[int(0.8*n):int(0.9*n)], ids_shuf[int(0.9*n):]
for name, split in [('train', train), ('val', val), ('test', test)]:
    (split_root/f'{name}.txt').write_text('\n'.join(split)+'\n', encoding='utf-8')
print('chords:', len(chords), 'ids:', len(ids))
split_root


chords: 8 ids: 1


PosixPath('/home/jim/Video2Music/dataset/vevo_meta/split/v1')

In [16]:
from pathlib import Path
SPLIT_TARGET = 'train'  # 可改为 'val' 或 'test'
meta_root = DATASET_ROOT / 'vevo_meta'
idlist_path = meta_root / 'idlist.txt'
split_path = meta_root / 'split' / 'v1' / f'{SPLIT_TARGET}.txt'

def append_unique(path: Path, value: str):
    existing = []
    if path.exists():
        with open(path, 'r', encoding='utf-8') as f:
            existing = [ln.strip() for ln in f if ln.strip()]
    status = 'exists'
    if value not in existing:
        existing.append(value)
        with open(path, 'w', encoding='utf-8') as f:
            f.write('\n'.join(existing) + '\n')
        status = 'appended'
    return {'path': str(path), 'status': status, 'count': len(existing)}

result = {
    'idlist': append_unique(idlist_path, VIDEO_ID),
    'split': append_unique(split_path, VIDEO_ID),
}
result


{'idlist': {'path': '/home/jim/Video2Music/dataset/vevo_meta/idlist.txt',
  'status': 'exists',
  'count': 1},
 'split': {'path': '/home/jim/Video2Music/dataset/vevo_meta/split/v1/train.txt',
  'status': 'appended',
  'count': 1}}