<a href="https://colab.research.google.com/github/gocgodman/M2M/blob/main/M2M(mp3_to_midi).ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# ============================================================
# 셀 1: 필요한 패키지 설치 (한 번만 실행)
# ============================================================

# Transkun v2 전사기 설치
!pip install transkun

# PyTorch (GPU 런타임일 경우 CUDA 맞는 버전 사용)
!pip install -q torch torchvision torchaudio --index-url https://download.pytorch.org/whl/cu118

# 기타 유틸
!pip install -q yt-dlp gdown gradio pretty_midi librosa numpy soundfile pyfluidsynth scipy
!apt-get update -qq
!apt-get install -y -qq ffmpeg fluidsynth p7zip-full || true

# librosa 버전 고정(필요시)
!pip install -q librosa==0.9.2 --upgrade

Collecting transkun
  Downloading transkun-2.0.1-py3-none-any.whl.metadata (10 kB)
Collecting ncls (from transkun)
  Downloading ncls-0.0.70-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (4.9 kB)
Collecting pretty-midi (from transkun)
  Downloading pretty_midi-0.2.11.tar.gz (5.6 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m5.6/5.6 MB[0m [31m38.7 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting mir-eval (from transkun)
  Downloading mir_eval-0.8.2-py3-none-any.whl.metadata (3.0 kB)
Collecting torch-optimizer (from transkun)
  Downloading torch_optimizer-0.3.0-py3-none-any.whl.metadata (55 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m55.9/55.9 kB[0m [31m2.9 MB/s[0m eta [36m0:00:00[0m
[?25hCollecting sox (from transkun)
  Downloading sox-1.5.0.tar.gz (63 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m63.9/63.9 kB[0m [31m2.3 MB/s[0m eta [36

In [None]:
# ============================================================
# 셀 2: Google Drive 마운트 및 경로 설정
# ============================================================

from google.colab import drive
drive.mount('/content/drive', force_remount=False)

import os, glob, shutil, zipfile, tarfile, subprocess
import gdown

# 기본 경로 설정
WORK_ROOT = "/content/ytd_pipeline_work"
DRIVE_SF2_DIR = "/content/drive/MyDrive/sf2_library"
DRIVE_RESULTS_DIR = "/content/drive/MyDrive/ytd_pipeline_results"
STATE_FILE = os.path.join(DRIVE_RESULTS_DIR, "pipeline_state.json")

os.makedirs(WORK_ROOT, exist_ok=True)
os.makedirs(DRIVE_SF2_DIR, exist_ok=True)
os.makedirs(DRIVE_RESULTS_DIR, exist_ok=True)

print("WORK_ROOT:", WORK_ROOT)
print("DRIVE_SF2_DIR:", DRIVE_SF2_DIR)
print("DRIVE_RESULTS_DIR:", DRIVE_RESULTS_DIR)

# 공유 폴더에서 SF2 다운로드 (선택사항)
SHARED_FOLDER_ID = "1JkTMvPwM_XURqG2114n4Qj0rR83WEucL"
OUT_DIR = "/content/sf2_from_shared"
os.makedirs(OUT_DIR, exist_ok=True)

if SHARED_FOLDER_ID:
    try:
        gdown.download_folder(id=SHARED_FOLDER_ID, output=OUT_DIR, quiet=False)
        print("✓ 공유 폴더 다운로드 완료")
    except Exception as e:
        print("공유 폴더 다운로드 실패:", e)

# 압축 해제 함수
def try_extract_archive(path, dest):
    """압축 파일 자동 해제"""
    path_lower = path.lower()
    os.makedirs(dest, exist_ok=True)
    try:
        if path_lower.endswith(".zip"):
            with zipfile.ZipFile(path, 'r') as zf:
                zf.extractall(dest)
            return True
        if path_lower.endswith((".tar.gz", ".tgz", ".tar")):
            with tarfile.open(path, 'r:*') as tf:
                tf.extractall(dest)
            return True
        if path_lower.endswith(".7z"):
            cmd = ['7z', 'x', '-y', '-o' + dest, path]
            subprocess.run(cmd, check=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE)
            return True
    except Exception as e:
        print("압축 해제 실패:", e)
    return False

# SF2 파일 압축 해제
print("\n[SF2 압축 파일 해제 중...]")
for root, dirs, files in os.walk(DRIVE_SF2_DIR):
    for fn in files:
        if fn.lower().endswith((".zip", ".tar.gz", ".tgz", ".tar", ".7z")):
            extract_dest = os.path.join(root, fn + "_extracted")
            if try_extract_archive(os.path.join(root, fn), extract_dest):
                print(f" ✓ {fn}")

for root, dirs, files in os.walk(OUT_DIR):
    for fn in files:
        if fn.lower().endswith((".zip", ".tar.gz", ".tgz", ".tar", ".7z")):
            extract_dest = os.path.join(root, fn + "_extracted")
            if try_extract_archive(os.path.join(root, fn), extract_dest):
                print(f" ✓ {fn}")

# SF2 파일 수집 및 복사
local_sf2_files = glob.glob(os.path.join(DRIVE_SF2_DIR, "**/*.sf2"), recursive=True)
local_names = {os.path.basename(f) for f in local_sf2_files}
shared_sf2_files = glob.glob(os.path.join(OUT_DIR, "**/*.sf2"), recursive=True)

print(f"\n내 드라이브 sf2 수: {len(local_sf2_files)}")
print(f"공유 드라이브 sf2 수: {len(shared_sf2_files)}")

# 공유 폴더에서 새 SF2 복사
copied = []
for p in shared_sf2_files:
    fname = os.path.basename(p)
    if fname in local_names:
        continue
    dest = os.path.join(DRIVE_SF2_DIR, fname)
    try:
        if not os.path.exists(dest):
            shutil.copy(p, dest)
            copied.append(dest)
            print(f" 복사: {fname}")
    except Exception as e:
        print(f" 복사 실패: {fname}, {e}")

print(f"새로 복사된 sf2 수: {len(copied)}")

# 잘못 배치된 SF2 정리
misplaced = glob.glob(os.path.join(DRIVE_RESULTS_DIR, "**/*.sf2"), recursive=True)
if misplaced:
    print(f"\n결과 폴더에 잘못 들어간 SF2 수: {len(misplaced)}")
    for p in misplaced:
        fname = os.path.basename(p)
        dst = os.path.join(DRIVE_SF2_DIR, fname)
        try:
            if not os.path.exists(dst):
                shutil.move(p, dst)
                print(f" 이동: {fname}")
            else:
                os.remove(p)
                print(f" 중복 제거: {fname}")
        except Exception as e:
            print(f" 이동/삭제 실패: {fname}, {e}")
else:
    print("\n결과 폴더에 잘못된 SF2 없음")

# 최종 SF2 목록
final_sf2_files = glob.glob(os.path.join(DRIVE_SF2_DIR, "**/*.sf2"), recursive=True)
print(f"\n✓ 최종 SF2 라이브러리 파일 수: {len(final_sf2_files)}")
for f in final_sf2_files[:20]:
    print(f" * {os.path.basename(f)}")
if len(final_sf2_files) > 20:
    print(f" ... 외 {len(final_sf2_files) - 20}개")

Mounted at /content/drive
WORK_ROOT: /content/ytd_pipeline_work
DRIVE_SF2_DIR: /content/drive/MyDrive/sf2_library
DRIVE_RESULTS_DIR: /content/drive/MyDrive/ytd_pipeline_results


Retrieving folder contents


Retrieving folder 11BHbbZws4leslR81OSluH0d7VhSYC4N2 Black Midi Soundfonts
Processing file 1kJoVKGS3ovON0OzNP3bodhlsTSo_J42m Brilliant CFX Concert Grand V2.3.7z
Processing file 17Iei7jUjFgJPrwdgWm22NhE_Ap6mRYN1 Brilliant CFX II Concert Grand V.1.4.7z
Processing file 1RaZkbNjG0UwWofmAHIVSsYUAl-iqCgmM LSP Mixable Concert Grand 1.2.9.7z
Processing file 1I_VeR7G0dHXASFFOLTfz00bhjBi7Ii8m Mustafa Concert Grand V1.0.7z
Processing file 1jL4FDCM7YHtCWh4WdlY_45eYm0Bhjy1O Mustafa F308XP Concert Grand V1.0.7z
Processing file 1gucI6eeNxEPBMdsMpJnXHOxKR-gXKSr- Ordinary D274 Concert Grand V1.0.7z
Processing file 1Q0Ljo55-Nezfupc-UBCPygI7kWk7Ep6H Regal S275 Concert Grand V1.2.7z
Processing file 1zCaOTXDqE_AAEuxcF0FaIKHiNJ43WktR Retroid 8SQ Digital Grand V2.sf2
Processing file 16-o3ntN3CF3K8He3LFUgMMQkGsa2wAcl Retroid D274SQ Digital Grand V1.1.7z
Processing file 1K4-14qdkrA3A0sIFQ50M5-DYk_qRM9X1 Supernova Concert Grand V1.4.rar
Processing file 1M26dnhABD45JHrs_zrY73_x7LQu-gxS0 Zayyan Concert Grand V1.2.

Retrieving folder contents completed
Building directory structure
Building directory structure completed
Downloading...
From: https://drive.google.com/uc?id=1kJoVKGS3ovON0OzNP3bodhlsTSo_J42m
To: /content/sf2_from_shared/Black Midi Soundfonts/Brilliant CFX Concert Grand V2.3.7z
100%|██████████| 16.5M/16.5M [00:00<00:00, 36.0MB/s]
Downloading...
From (original): https://drive.google.com/uc?id=17Iei7jUjFgJPrwdgWm22NhE_Ap6mRYN1
From (redirected): https://drive.google.com/uc?id=17Iei7jUjFgJPrwdgWm22NhE_Ap6mRYN1&confirm=t&uuid=f04ca89c-3fe4-45c6-abf1-7b9bdc2714a1
To: /content/sf2_from_shared/Black Midi Soundfonts/Brilliant CFX II Concert Grand V.1.4.7z
100%|██████████| 152M/152M [00:02<00:00, 71.0MB/s]
Downloading...
From (original): https://drive.google.com/uc?id=1RaZkbNjG0UwWofmAHIVSsYUAl-iqCgmM
From (redirected): https://drive.google.com/uc?id=1RaZkbNjG0UwWofmAHIVSsYUAl-iqCgmM&confirm=t&uuid=1706992a-6029-4b5b-8cf7-b6ca63f8c099
To: /content/sf2_from_shared/Black Midi Soundfonts/LSP Mixabl

공유 폴더 다운로드 실패: Failed to retrieve file url:

	Cannot retrieve the public link of the file. You may need to change
	the permission to 'Anyone with the link', or have had many accesses.
	Check FAQ in https://github.com/wkentaro/gdown?tab=readme-ov-file#faq.

You may still be able to access the file from the browser:

	https://drive.google.com/uc?id=19-TMtyZA_ukqyRSgPN2_Db34zxsnFQn7

but Gdown can't. Please check connections and permissions.

[SF2 압축 파일 해제 중...]
 ✓ Retroid D274SQ Digital Grand V1.1.7z
 ✓ Brilliant CFX Concert Grand V2.3.7z
 ✓ Brilliant CFX II Concert Grand V.1.4.7z
 ✓ Mustafa Concert Grand V1.0.7z
 ✓ LSP Mixable Concert Grand 1.2.9.7z
 ✓ Mustafa F308XP Concert Grand V1.0.7z
 ✓ Ordinary D274 Concert Grand V1.0.7z
 ✓ Zayyan II Concert Grand V1.0.7z
 ✓ Regal S275 Concert Grand V1.2.7z
 ✓ Zayyan Concert Grand V1.2.zip
 ✓ الْبِطِّيْخْ CSII Concert Grand (BETA 0.8).7z
 ✓ LSPModel 290 Bosendorfer [Pro] (1.3).7z
 ✓ LSPModel CFX Yamaha [Pro] v1.8.sf2.7z
 ✓ Kawai MP11SE.7z

내 드라이브 

In [None]:
# ============================================================
# 셀 3: Import 및 Transkun 설정
# ============================================================

from pathlib import Path
import torch
import torch.nn as nn
import sys
import numpy as np
import librosa
import pretty_midi
from scipy.ndimage import uniform_filter1d, binary_closing
import soundfile as sf
from torch.nn import functional as F
import json
import time
import uuid
import traceback
import subprocess

# Transkun V2 테스트
!python3 -m transkun.transcribe -h

# 외부 툴 체크
def check_tool(name):
    code, _, _ = run_cmd(["which", name])
    return code == 0

# 명령어 실행 헬퍼
def run_cmd(cmd, check=False, timeout=None):
    """명령어 실행 헬퍼"""
    proc = subprocess.run(
        cmd,
        stdout=subprocess.PIPE,
        stderr=subprocess.PIPE,
        text=True,
        timeout=timeout
    )
    if check and proc.returncode != 0:
        raise RuntimeError(f"Command failed: {' '.join(cmd)}\nSTDOUT:{proc.stdout}\nSTDERR:{proc.stderr}")
    return proc.returncode, proc.stdout, proc.stderr

if not check_tool("yt-dlp"):
    print("Warning: yt-dlp not found.")
if not check_tool("fluidsynth"):
    print("Warning: fluidsynth not found.")

print("✓ Import 및 툴 체크 완료")

  import pkg_resources
usage: transcribe.py [-h] [--weight WEIGHT] [--conf CONF] [--device [DEVICE]]
                     [--segmentHopSize SEGMENTHOPSIZE]
                     [--segmentSize SEGMENTSIZE]
                     audioPath outPath

positional arguments:
  audioPath             path to the input audio file
  outPath               path to the output MIDI file

options:
  -h, --help            show this help message and exit
  --weight WEIGHT       path to the pretrained weight
  --conf CONF           path to the model conf
  --device [DEVICE]     The device used to perform the most computations
                        (optional), DEFAULT: cpu
  --segmentHopSize SEGMENTHOPSIZE
                        The segment hopsize for processing the entire audio
                        file (s), DEFAULT: the value defined in model conf
  --segmentSize SEGMENTSIZE
                        The segment size for processing the entire audio file
                        (s), DEFAULT: the value

In [None]:
# ============================================================
# 셀 4: Transkun V2 전사 함수
# ============================================================

def remove_extension(path):
    """확장자 제거"""
    return os.path.splitext(os.path.basename(path))[0]

def transcribe_file(
    input_path,
    outfolder=".",
    device='cuda' if torch.cuda.is_available() else 'cpu'
):
    """
    Transkun V2 Semi-CRF 전사 함수
    반환: MIDI 파일 경로
    """
    CONF = "/usr/local/lib/python3.12/dist-packages/transkun/pretrained/2.0.conf"
    WEIGHT = "/usr/local/lib/python3.12/dist-packages/transkun/pretrained/2.0.pt"
    input_path = os.path.abspath(input_path)
    outfolder = os.path.abspath(outfolder)
    os.makedirs(outfolder, exist_ok=True)

    base = os.path.splitext(os.path.basename(input_path))[0]
    out_mid = os.path.join(outfolder, base + ".mid")

    cmd = [
        sys.executable,
        "-m", "transkun.transcribe",
        "--conf", CONF,
        "--weight", WEIGHT,
        "--device", device,
        input_path,
        out_mid
    ]

    proc = subprocess.run(cmd, stdout=subprocess.PIPE, stderr=subprocess.PIPE, text=True)

    if proc.returncode != 0:
        raise RuntimeError(f"Transcription failed:\nSTDOUT:\n{proc.stdout}\nSTDERR:\n{proc.stderr}")

    return out_mid

print("✓ Transkun 전사 함수 로드 완료")

✓ Transkun 전사 함수 로드 완료


In [None]:
# ============================================================
# 셀 5: 페달 검출 함수
# ============================================================

def compute_spectral_flux(y, sr, hop_length, n_fft=2048):
    """스펙트럴 플럭스 계산"""
    S = np.abs(librosa.stft(y, n_fft=n_fft, hop_length=hop_length))
    S_norm = S / (np.sum(S, axis=0, keepdims=True) + 1e-8)
    flux = np.sqrt(np.sum(np.diff(S_norm, axis=1).clip(min=0)**2, axis=0))
    return np.concatenate(([0.0], flux))

def detect_pedal_rule(
    audio_path,
    sr=22050,
    hop_length=512,
    low_freq_cut=500,
    energy_smooth=0.5,
    on_z=1.0,
    off_z=0.7,
    min_event_len=0.06,
    merge_gap=0.06,
    closing_size=3
):
    """규칙 기반 페달 검출"""
    y, _ = librosa.load(audio_path, sr=sr, mono=True)

    # 프레임 에너지 계산
    frame_energy = librosa.feature.rms(y=y, frame_length=2048, hop_length=hop_length)[0]

    # 저주파 에너지 계산
    S = np.abs(librosa.stft(y, n_fft=2048, hop_length=hop_length))
    freqs = librosa.fft_frequencies(sr=sr, n_fft=2048)
    low_idx = np.where(freqs <= low_freq_cut)[0]
    low_energy = S[low_idx, :].sum(axis=0) if len(low_idx) > 0 else np.zeros(S.shape[1])

    # 정규화
    e = frame_energy / (frame_energy.max() + 1e-8)
    le = low_energy / (low_energy.max() + 1e-8) if low_energy.max() > 0 else low_energy

    # 결합
    combined = 0.6 * le + 0.4 * e

    # 스무딩
    window = int(max(1, energy_smooth * (sr / hop_length)))
    combined_smooth = uniform_filter1d(combined, size=window)

    # 임계값 계산
    mu = combined_smooth.mean()
    sigma = combined_smooth.std() + 1e-8
    on_thr = mu + on_z * sigma
    off_thr = mu + off_z * sigma

    # 마스크 생성
    mask = np.zeros_like(combined_smooth, dtype=bool)
    state = False
    for i, v in enumerate(combined_smooth):
        if not state and v >= on_thr:
            state = True
            mask[i] = True
        elif state:
            mask[i] = True
            if v < off_thr:
                state = False

    # 모폴로지 클로징
    if closing_size > 1:
        mask = binary_closing(mask, structure=np.ones(closing_size))

    # 시간 변환
    times = librosa.frames_to_time(np.arange(len(mask)), sr=sr, hop_length=hop_length)

    # 이벤트 추출
    events = []
    prev = False
    start = None
    for t, m in zip(times, mask):
        if m and not prev:
            start = t
        if (not m) and prev and start is not None:
            events.append((start, t))
            start = None
        prev = m
    if prev and start is not None:
        events.append((start, times[-1]))

    # 필터링 및 병합
    filtered = []
    for s, e in events:
        if (e - s) >= min_event_len:
            if filtered and s - filtered[-1][1] <= merge_gap:
                filtered[-1] = (filtered[-1][0], e)
            else:
                filtered.append((s, e))

    return filtered

# 경량 PedalCNN (선택적 보조 모델)
class PedalCNN(nn.Module):
    def __init__(self, in_ch=1):
        super().__init__()
        self.conv1 = nn.Conv1d(in_ch, 16, kernel_size=5, padding=2)
        self.bn1 = nn.BatchNorm1d(16)
        self.conv2 = nn.Conv1d(16, 32, kernel_size=5, padding=2)
        self.bn2 = nn.BatchNorm1d(32)
        self.conv3 = nn.Conv1d(32, 64, kernel_size=3, padding=1)
        self.bn3 = nn.BatchNorm1d(64)
        self.fc = nn.Linear(64, 1)

    def forward(self, x):
        x = F.relu(self.bn1(self.conv1(x)))
        x = F.max_pool1d(x, 2)
        x = F.relu(self.bn2(self.conv2(x)))
        x = F.max_pool1d(x, 2)
        x = F.relu(self.bn3(self.conv3(x)))
        x = x.mean(dim=2)
        x = self.fc(x)
        return torch.sigmoid(x).squeeze(-1)

def extract_frame_features(y, sr=22050, hop_length=512, n_mels=40):
    """프레임 특징 추출"""
    S = librosa.feature.melspectrogram(y=y, sr=sr, n_fft=2048, hop_length=hop_length, n_mels=n_mels)
    logS = librosa.power_to_db(S, ref=np.max)
    return logS

def predict_pedal_with_model(
    audio_path,
    model,
    device='cuda' if torch.cuda.is_available() else 'cpu',
    sr=22050,
    hop_length=512,
    threshold=0.5
):
    """모델 기반 페달 예측"""
    y, _ = librosa.load(audio_path, sr=sr, mono=True)
    feats = extract_frame_features(y, sr=sr, hop_length=hop_length)
    X = torch.tensor(feats[np.newaxis, :, :], dtype=torch.float32)
    X = X.mean(dim=1, keepdim=True)

    model.to(device)
    model.eval()
    with torch.no_grad():
        prob = float(model(X.to(device)).cpu().numpy())

    if prob >= threshold:
        events = detect_pedal_rule(audio_path, sr=sr, hop_length=hop_length)
    else:
        events = []

    return events

def detect_pedal_lightweight(
    audio_path,
    use_model=False,
    model_path=None,
    device='cuda' if torch.cuda.is_available() else 'cpu',
    sr=22050,
    hop_length=512,
    rule_params=None,
    model_threshold=0.5
):
    """통합 페달 검출 (규칙 + 선택적 모델)"""
    if rule_params is None:
        rule_params = {}

    rule_events = detect_pedal_rule(audio_path, sr=sr, hop_length=hop_length, **rule_params)

    if not use_model or model_path is None:
        return rule_events

    try:
        model = PedalCNN(in_ch=1)
        ckpt = torch.load(model_path, map_location=device)
        if isinstance(ckpt, dict) and 'state_dict' in ckpt:
            model.load_state_dict(ckpt['state_dict'])
        else:
            model.load_state_dict(ckpt)

        model_events = predict_pedal_with_model(
            audio_path, model, device=device, sr=sr,
            hop_length=hop_length, threshold=model_threshold
        )

        # 결합
        combined = rule_events.copy()
        for me in model_events:
            if not any((abs(me[0]-re[0])<0.05 and abs(me[1]-re[1])<0.05) for re in rule_events):
                combined.append(me)

        combined = sorted(combined, key=lambda x: x[0])

        # 병합
        merged = []
        for s, e in combined:
            if not merged:
                merged.append([s, e])
            else:
                if s <= merged[-1][1] + 0.05:
                    merged[-1][1] = max(merged[-1][1], e)
                else:
                    merged.append([s, e])

        return [(s, e) for s, e in merged]

    except Exception:
        return rule_events

def insert_pedal_cc_into_midi(midi_in_path, midi_out_path, pedal_events, piano_program=0):
    """MIDI에 페달 CC 삽입"""
    pm = pretty_midi.PrettyMIDI(midi_in_path)

    for inst in pm.instruments:
        inst.program = piano_program

    target_inst = pm.instruments[0] if pm.instruments else pretty_midi.Instrument(program=piano_program)
    if not pm.instruments:
        pm.instruments.append(target_inst)

    for (s, e) in pedal_events:
        on_time = max(0.0, s - 0.02)
        off_time = e + 0.02
        target_inst.control_changes.append(pretty_midi.ControlChange(number=64, value=127, time=on_time))
        target_inst.control_changes.append(pretty_midi.ControlChange(number=64, value=0, time=off_time))

    for inst in pm.instruments:
        inst.control_changes.sort(key=lambda cc: cc.time)

    pm.write(midi_out_path)

print("✓ 페달 검출 함수 로드 완료")

✓ 페달 검출 함수 로드 완료


In [None]:
# ============================================================
# 셀 6: FluidSynth 렌더링 및 유틸리티
# ============================================================

def render_midi_to_wav(midi_path, wav_out_path, sf2_path, sample_rate=44100, timeout=300):
    """
    FluidSynth로 MIDI → WAV 렌더링
    반환: (wav_path or None, render_log string)
    """
    if sf2_path is None or not os.path.exists(sf2_path):
        return None, "sf2_not_found"

    try:
        cmd = [
            'fluidsynth', '-ni', sf2_path, midi_path,
            '-F', wav_out_path, '-r', str(sample_rate)
        ]
        proc = subprocess.run(
            cmd,
            check=False,
            timeout=timeout,
            stdout=subprocess.PIPE,
            stderr=subprocess.PIPE,
            text=True
        )
        log = proc.stdout + "\n" + proc.stderr

        if proc.returncode != 0:
            return None, log

        if not os.path.exists(wav_out_path):
            return None, "fluidsynth finished but wav not created\n" + log

        return wav_out_path, log

    except subprocess.TimeoutExpired as e:
        return None, f"fluidsynth timeout: {e}"
    except Exception as e:
        return None, f"fluidsynth exception: {e}\n{traceback.format_exc()}"

def resolve_sf2_path(sf2_choice, uploaded_sf2=None, sf2_library_dir=DRIVE_SF2_DIR):
    """
    SF2 경로 해석 (우선순위: 절대경로 > 업로드 > 라이브러리)
    """
    # 1) 절대 경로 지정
    if sf2_choice and sf2_choice != "None" and os.path.exists(sf2_choice):
        return sf2_choice

    # 2) 업로드된 파일
    if uploaded_sf2 and isinstance(uploaded_sf2, list) and len(uploaded_sf2) > 0:
        up = uploaded_sf2[0]
        src = up['name'] if isinstance(up, dict) and 'name' in up else up
        if os.path.exists(src):
            os.makedirs(sf2_library_dir, exist_ok=True)
            dst = os.path.join(sf2_library_dir, os.path.basename(src))
            if not os.path.exists(dst):
                shutil.copy(src, dst)
            return dst

    # 3) 라이브러리 내 첫 번째 파일
    if os.path.exists(sf2_library_dir):
        candidates = sorted([
            os.path.join(sf2_library_dir, f)
            for f in os.listdir(sf2_library_dir)
            if f.endswith('.sf2')
        ])
        if candidates:
            return candidates[0]

    return None

def load_state():
    """상태 파일 로드"""
    if os.path.exists(STATE_FILE):
        with open(STATE_FILE, "r", encoding="utf-8") as f:
            return json.load(f)
    return {"processed": [], "failed": [], "items": {}, "last_update": None}

def save_state(state):
    """상태 파일 저장"""
    state["last_update"] = time.time()
    with open(STATE_FILE, "w", encoding="utf-8") as f:
        json.dump(state, f, ensure_ascii=False, indent=2)

print("✓ FluidSynth 렌더링 및 유틸리티 로드 완료")

✓ FluidSynth 렌더링 및 유틸리티 로드 완료


In [None]:
# ============================================================
# 셀 7: YouTube 다운로드 함수
# ============================================================

from urllib.parse import urlparse, parse_qs

def sanitize_filename(filename):
    """파일명으로 사용할 수 없는 문자 제거"""
    # Windows/Linux에서 사용 불가능한 문자 제거
    invalid_chars = '<>:"/\\|?*'
    for char in invalid_chars:
        filename = filename.replace(char, '_')
    # 연속된 공백을 하나로
    filename = ' '.join(filename.split())
    # 앞뒤 공백 및 점 제거
    filename = filename.strip('. ')
    # 길이 제한 (200자)
    if len(filename) > 200:
        filename = filename[:200]
    return filename

def download_youtube_audio_single(url, outdir, fmt="mp3", audio_bitrate="192", retries=3):
    """
    YouTube 단일 영상 오디오 다운로드 (제목으로 파일명)
    Args:
        url: YouTube URL
        outdir: 출력 디렉터리
        fmt: 포맷 (기본: mp3)
        audio_bitrate: 비트레이트
        retries: 재시도 횟수
    Returns:
        다운로드된 파일 경로
    """
    os.makedirs(outdir, exist_ok=True)

    # 제목으로 파일명 설정 (특수문자 안전하게 처리)
    out_template = os.path.join(outdir, "%(title)s.%(ext)s")

    # Bot 우회를 위한 yt-dlp 옵션
    cmd = [
        "yt-dlp",
        "-x",  # 오디오만 추출
        "--audio-format", fmt,
        "--audio-quality", audio_bitrate,
        "-o", out_template,
        "--no-warnings",
        "--quiet",
        "--no-playlist",
        "--extractor-retries", str(retries),
        "--socket-timeout", "30",
        "--user-agent", "Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36",
        "--sleep-interval", "2",
        "--max-sleep-interval", "5",
        "--restrict-filenames",  # 안전한 파일명 사용
        url
    ]

    for attempt in range(retries):
        try:
            print(f"  다운로드 시도 {attempt + 1}/{retries}...")
            code, out, err = run_cmd(cmd, timeout=120)

            if code == 0:
                # 다운로드된 파일 찾기 (가장 최근 파일)
                files = sorted(
                    glob.glob(os.path.join(outdir, f"*.{fmt}")),
                    key=os.path.getmtime,
                    reverse=True
                )
                if files:
                    downloaded_file = files[0]
                    print(f"  ✓ 다운로드 완료: {os.path.basename(downloaded_file)}")
                    return downloaded_file

            # 실패 시 대기
            if attempt < retries - 1:
                wait_time = (attempt + 1) * 5
                print(f"  실패. {wait_time}초 대기 중...")
                time.sleep(wait_time)

        except Exception as e:
            print(f"  다운로드 예외 (시도 {attempt + 1}): {e}")
            if attempt < retries - 1:
                time.sleep((attempt + 1) * 5)

    # 모든 재시도 실패
    raise RuntimeError(
        f"YouTube 다운로드 실패 (모든 재시도 소진): {url}\n"
        f"YouTube가 Colab IP를 차단했을 가능성이 있습니다.\n"
        f"해결 방법: 오디오 파일을 직접 업로드하거나 나중에 다시 시도하세요."
    )

def extract_video_id_from_url(url):
    """URL에서 YouTube 비디오 ID 추출"""
    try:
        p = urlparse(url)
        if p.hostname and "youtube" in p.hostname:
            qs = parse_qs(p.query)
            if "v" in qs and qs["v"]:
                return qs["v"][0]
        if p.hostname and "youtu.be" in p.hostname:
            return p.path.lstrip("/")
    except Exception:
        return None
    return None

def expand_playlist_to_video_urls(playlist_url):
    """재생목록을 비디오 URL 목록으로 확장"""
    cmd = ["yt-dlp", "--flat-playlist", "-J", playlist_url]
    code, out, err = run_cmd(cmd)

    if code != 0:
        raise RuntimeError(f"yt-dlp playlist expand failed: {err}")

    j = json.loads(out)
    entries = j.get("entries", [])
    urls = []

    for e in entries:
        vid = e.get("id")
        if vid:
            urls.append(f"https://www.youtube.com/watch?v={vid}")

    return urls

print("✓ YouTube 다운로드 함수 로드 완료")

✓ YouTube 다운로드 함수 로드 완료


In [None]:
# ============================================================
# 셀 8: 통합 처리 함수
# ============================================================

def process_item_from_url_or_path(
    item,
    sf2_choice=None,
    uploaded_sf2=None,
    use_pedal_model=False,
    pedal_model_path=None,
    tmp_dir=None
):
    """
    item: YouTube URL 또는 로컬 오디오 파일 경로
    반환: dict(status, mp3, midi, wav, pedals, transcribe_log, render_log, error, trace)
    """
    if tmp_dir is None:
        tmp_dir = os.path.join(WORK_ROOT, "tmp_" + uuid.uuid4().hex)
    os.makedirs(tmp_dir, exist_ok=True)

    result = {
        "status": "error",
        "mp3": None,
        "midi": None,
        "wav": None,
        "pedals": [],
        "transcribe_log": "",
        "render_log": "",
        "error": "",
        "trace": ""
    }

    try:
        # 1) 오디오 확보
        if isinstance(item, str) and item.startswith("http"):
            mp3 = download_youtube_audio_single(item, tmp_dir, fmt="mp3")
        else:
            mp3 = item
        result["mp3"] = mp3

        # 2) 전사 (Transkun v2)
        mid = transcribe_file(mp3, outfolder=tmp_dir)
        result["transcribe_log"] = "Transcription completed"
        result["midi"] = mid

        # 3) 페달 검출
        pedals = detect_pedal_lightweight(
            mp3,
            use_model=use_pedal_model,
            model_path=pedal_model_path
        )
        result["pedals"] = pedals

        # 4) MIDI에 CC 삽입
        mid_pedal = os.path.join(tmp_dir, os.path.splitext(os.path.basename(mid))[0] + "_pedal.mid")
        insert_pedal_cc_into_midi(mid, mid_pedal, pedals, piano_program=0)
        result["midi"] = mid_pedal

        # 5) Drive에 저장 (mp3, midi)
        saved_mp3 = os.path.join(DRIVE_RESULTS_DIR, f"{remove_extension(os.path.basename(mp3))}.mp3")
        shutil.copy(mp3, saved_mp3)
        result["mp3"] = saved_mp3

        saved_midi = os.path.join(DRIVE_RESULTS_DIR, f"{remove_extension(os.path.basename(mid_pedal))}.mid")
        shutil.copy(mid_pedal, saved_midi)
        result["midi"] = saved_midi

        # 6) 렌더링 (선택) - SF2 경로 해석
        chosen_sf2 = resolve_sf2_path(sf2_choice, uploaded_sf2)
        saved_wav = None
        render_log = None

        if chosen_sf2:
            wav_out = os.path.join(tmp_dir, f"{remove_extension(os.path.basename(mid_pedal))}.wav")
            rendered, render_log = render_midi_to_wav(mid_pedal, wav_out, chosen_sf2)
            if rendered:
                saved_wav = os.path.join(DRIVE_RESULTS_DIR, f"{remove_extension(os.path.basename(rendered))}.wav")
                shutil.copy(rendered, saved_wav)

        result["wav"] = saved_wav
        result["render_log"] = render_log or ""
        result["status"] = "ok"
        return result

    except Exception as e:
        result["error"] = str(e)
        result["trace"] = traceback.format_exc()
        return result

def process_files(
    gr_files,
    sf2_choice=None,
    uploaded_sf2=None,
    sustain_tolerance=0.2,
    pedal_energy_smooth=0.5,
    pedal_low_freq_cut=500,
    pedal_low_energy_weight=0.6,
    pedal_flux_weight=0.4,
    pedal_on_threshold=1.0,
    pedal_off_threshold=0.7,
    pedal_min_event_len=0.08,
    pedal_merge_gap=0.08
):
    """
    Gradio 업로드 파일 처리
    반환: (midi_path, wav_path)
    """
    if not gr_files:
        return None, None

    # 첫 번째 파일만 처리
    audio_file = gr_files[0] if isinstance(gr_files, list) else gr_files
    audio_path = audio_file.get('name') if isinstance(audio_file, dict) else audio_file

    # process_item_from_url_or_path 호출
    result = process_item_from_url_or_path(
        audio_path,
        sf2_choice=sf2_choice,
        uploaded_sf2=uploaded_sf2
    )

    if result.get("status") == "ok":
        return result.get("midi"), result.get("wav")
    else:
        raise RuntimeError(f"처리 실패: {result.get('error')}")

def playlist_pipeline_generator(playlist_text, sf2_choice_path, **kwargs):
    """재생목록 배치 처리 제너레이터"""
    lines = [ln.strip() for ln in str(playlist_text).splitlines() if ln.strip()]
    items = []

    # 재생목록 확장
    for ln in lines:
        if ln.startswith("http") and ("playlist" in ln or "list=" in ln):
            yield f"재생목록 확장 중: {ln[:50]}...", None
            try:
                urls = expand_playlist_to_video_urls(ln)
                if not urls:
                    yield f"재생목록에서 URL을 찾지 못했습니다: {ln}", None
                else:
                    items.extend(urls)
                    yield f"재생목록에서 {len(urls)}개 항목 발견", None
            except Exception as e:
                yield f"재생목록 확장 실패: {e}", None
        else:
            items.append(ln)

    total = len(items)
    if total == 0:
        yield "처리할 항목이 없습니다.", None
        return

    # 상태 로드
    try:
        state = load_state()
    except Exception as e:
        yield f"상태 파일 로드 실패: {e}", None
        return

    yield f"총 항목: {total}. 이미 처리된 항목: {len(state.get('processed', []))}", None

    # 각 항목 처리
    for idx, item in enumerate(items, start=1):
        # 이미 처리됨 확인
        if item in state.get("processed", []):
            yield f"[{idx}/{total}] 건너뜀(이미 처리됨): {item[:50]}...", None
            continue

        yield f"[{idx}/{total}] 다운로드/전사 시작: {item[:50]}...", None

        try:
            res = process_item_from_url_or_path(
                item,
                sf2_choice=sf2_choice_path,
                uploaded_sf2=None,
                **kwargs
            )
        except Exception as e:
            res = {"status": "error", "error": str(e)}

        if res.get("status") == "ok":
            state.setdefault("processed", []).append(item)
            state.setdefault("items", {})[item] = {
                "mp3": res.get("mp3"),
                "midi": res.get("midi"),
                "wav": res.get("wav"),
                "pedals": len(res.get("pedals", [])),
                "time": time.time()
            }
            try:
                save_state(state)
            except Exception as e:
                yield f"[{idx}/{total}] 완료했으나 상태 저장 실패: {e}", res
                continue

            yield f"[{idx}/{total}] ✓ 완료: {os.path.basename(res.get('midi', 'unknown'))}", res
        else:
            state.setdefault("failed", []).append({"item": item, "error": res.get("error")})
            try:
                save_state(state)
            except Exception as e:
                yield f"[{idx}/{total}] 실패: 에러 저장 실패: {e}", None
                continue

            yield f"[{idx}/{total}] ✗ 실패: {res.get('error')}", None

    yield f"모든 항목 처리 완료. 결과 위치: {DRIVE_RESULTS_DIR}", None

print("✓ 통합 처리 함수 로드 완료")

✓ 통합 처리 함수 로드 완료


In [None]:
# ============================================================
# 셀 9: Gradio UI
# ============================================================

import gradio as gr

# SF2 라이브러리 디렉터리
SF2_LIB_DIR = DRIVE_SF2_DIR

# 1) 드라이브에 있는 SF2 절대경로 목록 생성 (라벨, 값) 쌍
def list_sf2_choices(sf2_dir=SF2_LIB_DIR):
    if not os.path.exists(sf2_dir):
        return [("None", "None")]
    files = sorted(glob.glob(os.path.join(sf2_dir, "**/*.sf2"), recursive=True))
    if not files:
        return [("None", "None")]
    # (label, value) 형태로 반환: label은 basename, value는 절대경로
    choices = [("None", "None")] + [(os.path.basename(p), p) for p in files]
    return choices

sf2_choices = list_sf2_choices()

# 2) 업로드된 SF2 처리: 업로드 임시파일을 라이브러리로 복사하고 절대경로 반환
def handle_uploaded_sf2(uploaded_sf2, sf2_library_dir=SF2_LIB_DIR):
    if not uploaded_sf2:
        return None
    up = uploaded_sf2[0] if isinstance(uploaded_sf2, list) else uploaded_sf2
    # Gradio Files may provide dict with 'name' key or direct path
    src = up.get('name') if isinstance(up, dict) and 'name' in up else up
    if not src or not os.path.exists(src):
        return None
    os.makedirs(sf2_library_dir, exist_ok=True)
    dst = os.path.join(sf2_library_dir, os.path.basename(src))
    try:
        if not os.path.exists(dst):
            shutil.copy(src, dst)
        return dst
    except Exception:
        return None

# 3) UI에서 선택된 값과 업로드된 파일을 합쳐 최종 SF2 절대경로 결정
def get_sf2_choice_value(choice, uploaded_sf2, direct_input):
    # uploaded_sf2 우선
    uploaded_path = handle_uploaded_sf2(uploaded_sf2)
    if uploaded_path:
        return uploaded_path
    # direct input (사용자가 직접 경로 입력) 우선
    if direct_input:
        return direct_input if os.path.exists(direct_input) else None
    # dropdown value는 (label,value) 쌍의 value가 넘어오므로 그대로 사용
    if choice and choice != "None":
        return choice if os.path.exists(choice) else None
    return None

# 4) playlist wrapper (기존 제너레이터에 업로드 우선 경로 전달)
def playlist_wrapper_with_upload(playlist_text, sf2_choice_val, uploaded_sf2_val, sf2_direct_input):
    try:
        chosen = get_sf2_choice_value(sf2_choice_val, uploaded_sf2_val, sf2_direct_input)
        yield from playlist_pipeline_generator(playlist_text, chosen)
    except Exception as e:
        yield f"재생목록 처리 중 예외 발생: {e}", None

# 5) 업로드 처리 콜백: chosen_sf2(절대경로)를 process_files에 전달
def on_run_upload(
    gr_files,
    sf2_choice_val,
    uploaded_sf2_val,
    sustain_tolerance,
    pedal_energy_smooth,
    pedal_low_freq_cut,
    pedal_low_energy_weight,
    pedal_flux_weight,
    pedal_on_threshold,
    pedal_off_threshold,
    pedal_min_event_len,
    pedal_merge_gap,
    sf2_direct_input
):
    try:
        chosen_sf2 = get_sf2_choice_value(sf2_choice_val, uploaded_sf2_val, sf2_direct_input)
        chosen_name = os.path.basename(chosen_sf2) if chosen_sf2 else "없음"
        status = f"업로드 처리 시작. SF2: {chosen_name} (경로: {chosen_sf2 or 'None'})"

        # process_files에 절대경로 chosen_sf2만 전달
        final_midi, wav_path = process_files(
            gr_files,
            sf2_choice=chosen_sf2,
            uploaded_sf2=None,  # 이미 처리했으므로 None
            sustain_tolerance=sustain_tolerance,
            pedal_energy_smooth=pedal_energy_smooth,
            pedal_low_freq_cut=pedal_low_freq_cut,
            pedal_low_energy_weight=pedal_low_energy_weight,
            pedal_flux_weight=pedal_flux_weight,
            pedal_on_threshold=pedal_on_threshold,
            pedal_off_threshold=pedal_off_threshold,
            pedal_min_event_len=pedal_min_event_len,
            pedal_merge_gap=pedal_merge_gap
        )

        # 결과가 None일 수 있으니 안전하게 반환
        return status, final_midi if final_midi else None, wav_path if wav_path else None

    except Exception as e:
        tb = traceback.format_exc()
        return f"업로드 처리 중 오류: {e}\n{tb}", None, None

# 6) Gradio Blocks UI
with gr.Blocks() as combined_demo:
    gr.Markdown("## 통합: 파일 업로드 + 재생목록 배치 처리 (공통 SF2 선택)")

    with gr.Row():
        with gr.Column(scale=1):
            sf2_dropdown = gr.Dropdown(
                choices=sf2_choices,
                value=sf2_choices[0][1],
                label="SF2 선택 (드라이브)"
            )
            sf2_upload = gr.Files(file_types=[".sf2"], label="또는 SF2 업로드 (우선)")
            sf2_direct = gr.Textbox(
                label="(선택) SF2 절대경로 직접 입력",
                placeholder="/content/drive/MyDrive/..."
            )
            gr.Markdown("**설명:** 업로드된 SF2가 있으면 업로드된 파일을 우선 사용합니다.")

        with gr.Column(scale=2):
            status_box = gr.Textbox(label="전역 상태", interactive=False)
            status_box.value = f"결과 폴더: {DRIVE_RESULTS_DIR if 'DRIVE_RESULTS_DIR' in dir() else 'Not Set'}"

    with gr.Tabs():
        with gr.TabItem("파일 업로드 처리"):
            upload_files = gr.Files(file_types=[".wav", ".mp3"], label="오디오 업로드")
            sustain = gr.Slider(0.05, 0.6, value=0.2, step=0.01, label="Sustain Tolerance")
            pedal_smooth = gr.Slider(0.1, 1.0, value=0.5, step=0.05, label="Pedal Energy Smooth")
            pedal_lowcut = gr.Slider(200, 2000, value=500, step=50, label="Pedal Low Freq Cut")
            pedal_low_w = gr.Slider(0.0, 1.0, value=0.6, step=0.05, label="Low Energy Weight")
            pedal_flux_w = gr.Slider(0.0, 1.0, value=0.4, step=0.05, label="Flux Weight")
            pedal_on = gr.Slider(0.2, 2.0, value=1.0, step=0.1, label="Pedal On Threshold")
            pedal_off = gr.Slider(0.0, 1.5, value=0.7, step=0.05, label="Pedal Off Threshold")
            pedal_min = gr.Slider(0.02, 0.5, value=0.08, step=0.01, label="Min Event Length")
            pedal_merge = gr.Slider(0.01, 0.3, value=0.08, step=0.01, label="Merge Gap")

            run_upload = gr.Button("업로드 파일 처리 시작")
            upload_result_file = gr.File(label="Download MIDI or ZIP")
            upload_preview = gr.Audio(label="Preview (WAV) - rendered with chosen SF2")

            run_upload.click(
                fn=on_run_upload,
                inputs=[
                    upload_files, sf2_dropdown, sf2_upload,
                    sustain, pedal_smooth, pedal_lowcut, pedal_low_w, pedal_flux_w,
                    pedal_on, pedal_off, pedal_min, pedal_merge, sf2_direct
                ],
                outputs=[status_box, upload_result_file, upload_preview]
            )

        with gr.TabItem("재생목록 배치 처리"):
            playlist_input = gr.Textbox(
                lines=6,
                label="재생목록 URL 또는 영상 URL (줄바꿈으로 구분)"
            )
            playlist_sf2_input = gr.Textbox(
                label="(선택) SF2 경로를 직접 입력하거나 상단 드롭다운/업로드 사용"
            )
            run_playlist_btn = gr.Button("재생목록 처리 시작")
            playlist_log = gr.Textbox(label="진행 로그")
            playlist_last = gr.JSON(label="마지막 항목 결과")

            run_playlist_btn.click(
                fn=playlist_wrapper_with_upload,
                inputs=[playlist_input, sf2_dropdown, sf2_upload, playlist_sf2_input],
                outputs=[playlist_log, playlist_last]
            )

    gr.Markdown(
        "**사용법:** 상단에서 SF2를 드롭다운으로 선택하거나 SF2 파일을 업로드하세요. "
        "업로드된 SF2가 있으면 업로드된 파일이 우선 사용됩니다."
    )

combined_demo.launch(share=True, debug=True)

Colab notebook detected. This cell will run indefinitely so that you can see errors and logs. To turn off, set debug=False in launch().
* Running on public URL: https://7cb4fa55def6094834.gradio.live

This share link expires in 1 week. For free permanent hosting and GPU upgrades, run `gradio deploy` from the terminal in the working directory to deploy to Hugging Face Spaces (https://huggingface.co/spaces)


  다운로드 시도 1/3...
  ✓ 다운로드 완료: OST_-_Sparkle_RADWIMPS_-_Sparkle_extreme_piano_cover.mp3
Keyboard interruption in main thread... closing server.
Killing tunnel 127.0.0.1:7860 <> https://7cb4fa55def6094834.gradio.live


