# Harmony Line Isolator

## MVP Progress Report: Phase 1 (Steps 0–7)

The overall goal of the Harmony Line Isolator MVP is to create a tool that can automatically separate and isolate individual vocal lines from a 3- or 4-part harmony recording — even when the voices are blended together in a single mix — so that each part can be soloed, muted, or repurposed for practice, learning, or creative rearrangement.

Phase 1 concentrates on the building blocks needed to reach that goal, using recordings where each vocal part is already available as a separate stem. In this phase, we track the pitch of each part over time, assign musical roles (high, mid, low), and export them as MIDI for quick playback and analysis. This provides a reliable, testable foundation before tackling the more complex step of isolating roles directly from a mixed recording.

### Step 0: Download audio samples via index file
**What it does:** Reads `audio_file_list.txt` from your GitHub Pages `audio_samples/` folder and downloads each listed `.wav` into a local `audio_samples/` directory in Colab. Skips files already present and prints a summary.
**Why it matters:** Keeps your Colab environment in sync with the repo without manual uploads.
**Expected output:** A short report showing how many files were downloaded, skipped, or failed, plus a few sample filenames.

### Step 1: Environment setup and dataset indexing
**What it does:** Installs needed libraries, scans `audio_samples` for `.wav` files, validates the `gw_phraseNN_part.wav` naming, and builds an index of phrases, each with a single mix and multiple stems. Saves `dataset_index.csv`.
**Why it matters:** Ensures the dataset is consistent, which avoids hard-to-debug errors later.
**Expected output:** A table of phrases with counts and filenames, and a message confirming how many phrases are ready.

### Step 2: Load one phrase and validate alignment
**What it does:** Loads one phrase’s mix and stems at a common sample rate, prints durations, and trims or pads stems to match the mix length within a small tolerance.
**Why it matters:** Even small timing mismatches can throw off pitch tracking and comparisons.
**Expected output:** Durations for the mix and each stem, plus a note indicating if any stems were trimmed or padded.

### Step 3: Pitch tracking with CREPE (mix and stems)
**What it does:** Runs a deep learning pitch detector at 10 ms intervals on the mix and each stem, filtering out low-confidence frames. Reports the percentage of “voiced” frames.
**Why it matters:** Produces the pitch contours that power role assignment and MIDI export.
**Expected output:** Frame count, analysis duration, voiced coverage for the mix and each stem, and confirmation that pitch arrays were created.

### Step 4: Role assignment from stem pitch
**What it does:** For each time step, sorts the available stem pitches and assigns roles (high, mid, low) based on relative pitch. Smoothing reduces small jitters. Reports which stem most often supplies each role.
**Why it matters:** Converts raw pitch into usable musical roles that match how singers think about parts.
**Expected output:** Voiced coverage per role and a mapping like “high → alto” or “low → tenor” most of the time.

### Step 5: Export roles as MIDI
**What it does:** Segments each role’s pitch contour into notes and writes one MIDI per role, enforcing a minimum note length to reduce blips.
**Why it matters:** MIDI is a compact and editable representation you can audition in a DAW or notation tool.
**Expected output:** File names of the created MIDIs and note counts for each.

### Step 6: Quick audio previews from MIDI
**What it does:** Renders simple sine-wave previews from the role MIDIs and saves WAVs so you can listen in Colab without external tools.
**Why it matters:** Lets you quickly hear if the extracted lines feel musically correct before attempting vocal-like resynthesis.
**Expected output:** Confirmation of generated preview WAVs and inline audio players if running in Colab.

### Step 7: MIDI vs stem pitch validation
**What it does:** Compares each role’s MIDI to the matching stem by extracting pitch from both on the same timeline. Reports pitch error in cents (mean, median, 90th percentile) and the percentage of frames where both are voiced.
**Why it matters:** Gives an objective accuracy check and highlights where settings might need tuning.
**Expected output:** A small table with metrics per role. As a rule of thumb, a median error under ~25 cents is very good; 25–50 cents is acceptable; higher suggests further tuning.



### Step 0 — Download audio samples via index file

This step reads a plain‑text index (`audio_file_list.txt`) hosted in your GitHub Pages `audio_samples/` folder, then downloads each listed `.wav` into a local `audio_samples/` directory in Colab. It skips files that already exist, prints a summary, and fails fast if the index cannot be fetched. Run this once per session before the later steps.

In [None]:
# Step 0: Read audio_file_list.txt from GitHub Pages and download WAVs to audio_samples/

!pip -q install requests

import os
import requests
from urllib.parse import urljoin

# Point to your GitHub Pages location
BASE_URL = "https://harmonyisolator.github.io/mvp/audio_samples/"
INDEX_NAME = "audio_file_list.txt"   # the file you uploaded
INDEX_URL = urljoin(BASE_URL, INDEX_NAME)

LOCAL_DIR = "audio_samples"
os.makedirs(LOCAL_DIR, exist_ok=True)

def fetch_index_lines(url, timeout=30):
    r = requests.get(url, timeout=timeout)
    if r.status_code != 200:
        raise RuntimeError(f"Failed to fetch index: {url} (status {r.status_code})")
    # Keep only non-empty, non-comment lines
    lines = [ln.strip() for ln in r.text.splitlines()]
    lines = [ln for ln in lines if ln and not ln.startswith("#")]
    return lines

def download_file(url, out_path, timeout=60, chunk=1<<16):
    with requests.get(url, stream=True, timeout=timeout) as r:
        if r.status_code != 200:
            raise RuntimeError(f"GET {url} -> {r.status_code}")
        with open(out_path, "wb") as f:
            for blk in r.iter_content(chunk_size=chunk):
                if blk:
                    f.write(blk)

print(f"[INFO] Fetching index: {INDEX_URL}")
filenames = fetch_index_lines(INDEX_URL)
if not filenames:
    raise RuntimeError("Index is empty. Ensure audio_file_list.txt lists one filename per line.")

print(f"[INFO] {len(filenames)} files listed in index.")
downloaded = 0
skipped = 0
errors = []

for fname in filenames:
    # Basic guard: only accept .wav files in this workflow
    if not fname.lower().endswith(".wav"):
        print(f"[SKIP] Not a .wav entry: {fname}")
        continue
    file_url = urljoin(BASE_URL, fname)
    out_path = os.path.join(LOCAL_DIR, os.path.basename(fname))
    if os.path.exists(out_path):
        skipped += 1
        continue
    try:
        print(f"[DL] {fname}")
        download_file(file_url, out_path)
        downloaded += 1
    except Exception as e:
        errors.append((fname, str(e)))

print("\n[INFO] Download summary")
print("   downloaded:", downloaded)
print("   skipped (already present):", skipped)
print("   errors:", len(errors))
for fname, msg in errors[:10]:
    print("   -", fname, "->", msg)

local_wavs = [f for f in os.listdir(LOCAL_DIR) if f.lower().endswith(".wav")]
print(f"[INFO] Local folder now has {len(local_wavs)} .wav files in '{LOCAL_DIR}/'")

# Optional: show a few names
print("[INFO] Sample files:", local_wavs[:8])

### Step 1 — Environment Setup and Dataset Indexing

This step prepares the Colab environment for the Harmony Line Isolator MVP.  
It installs required libraries, scans the `audio_samples` folder for `.wav` files, validates the naming pattern, and groups files by phrase.  
The result is a clean index showing each phrase’s mix file and stems, ready for processing in later steps.


In [None]:
# Install dependencies used across the MVP
!pip -q install librosa crepe pretty_midi soundfile matplotlib pandas
print("Dependencies installed")

In [None]:
# Step 1: Environment setup and dataset indexing

# 1) Imports
import os
import re
from collections import defaultdict, Counter
import pandas as pd

# 2) Configuration
AUDIO_DIR = "audio_samples"   # set this to your folder with .wav files
INDEX_CSV = "dataset_index.csv"

# 3) Helpers
PATTERN = re.compile(r"^(gw_phrase\d+)_([^.]+)\.wav$", re.IGNORECASE)

def log(msg):
    print(f"[INFO] {msg}")

def list_wav_files(folder):
    if not os.path.isdir(folder):
        raise FileNotFoundError(f"Folder not found: {folder}")
    return sorted([f for f in os.listdir(folder) if f.lower().endswith(".wav")])

# 4) Scan folder
wav_files = list_wav_files(AUDIO_DIR)
log(f"Found {len(wav_files)} .wav files in '{AUDIO_DIR}'")

# 5) Parse filenames and group by phrase
phrase_groups = defaultdict(lambda: {"mix": [], "stems": []})
unmatched = []

for fname in wav_files:
    m = PATTERN.match(fname)
    if not m:
        unmatched.append(fname)
        continue
    phrase = m.group(1).lower()
    part = m.group(2).lower()
    if part == "mix":
        phrase_groups[phrase]["mix"].append(fname)
    else:
        phrase_groups[phrase]["stems"].append((part, fname))

# 6) Basic validations
if unmatched:
    log("Some files did not match 'gw_phraseNN_part.wav':")
    for u in unmatched[:20]:
        print("   -", u)
    if len(unmatched) > 20:
        print(f"   ... and {len(unmatched) - 20} more")

missing_mix = [p for p, g in phrase_groups.items() if len(g["mix"]) == 0]
duplicate_mix = [p for p, g in phrase_groups.items() if len(g["mix"]) > 1]

if missing_mix:
    log("Phrases with no mix file detected:")
    for p in sorted(missing_mix):
        print("   -", p)

if duplicate_mix:
    log("Phrases with more than one mix file detected:")
    for p in sorted(duplicate_mix):
        print("   -", p, "->", phrase_groups[p]["mix"])

# 7) Build a flat index DataFrame
rows = []
for phrase, group in sorted(phrase_groups.items()):
    mix_files = group["mix"]
    stems = group["stems"]
    stem_names = [s[0] for s in stems]
    rows.append({
        "phrase": phrase,
        "mix_count": len(mix_files),
        "mix_file": mix_files[0] if mix_files else "",
        "stem_count": len(stems),
        "stem_names": ", ".join(sorted(stem_names)),
    })

df = pd.DataFrame(rows, columns=["phrase", "mix_count", "mix_file", "stem_count", "stem_names"]).sort_values("phrase")
display(df.head(20))

# 8) Save index
df.to_csv(INDEX_CSV, index=False)
log(f"Saved dataset index to {INDEX_CSV}")

# 9) Summary prints
all_stem_names = []
for g in phrase_groups.values():
    all_stem_names.extend([s[0] for s in g["stems"]])

stem_name_counts = Counter(all_stem_names)
log(f"Detected {len(phrase_groups)} phrases")
log(f"Distinct stem labels seen: {sorted(stem_name_counts.keys())}")
log("Stem label frequency (top 20):")
for name, count in stem_name_counts.most_common(20):
    print(f"   {name}: {count}")

ready_phrases = [p for p, g in phrase_groups.items() if len(g['mix']) == 1 and len(g['stems']) >= 1]
log(f"Phrases ready for processing (1 mix and at least 1 stem): {len(ready_phrases)}")

# 10) Keep data structures for later steps
PHRASE_GROUPS = phrase_groups
READY_PHRASES = sorted(ready_phrases)

print("\nExample ready phrases:", READY_PHRASES[:10])


### Step 2 — Load one phrase and validate alignment

This step loads a single phrase from the dataset to confirm files are readable and aligned. It selects a phrase, loads the mix and stems at native sample rate, prints basic stats, checks duration consistency, and optionally plots quick waveforms. This sanity check prevents chasing downstream errors caused by misaligned audio.


In [None]:
# Step 2: Load one phrase and validate alignment

import os
import re
import numpy as np
import librosa
import soundfile as sf
import matplotlib.pyplot as plt

# Choose a phrase to inspect. You can override this with a specific ID like "gw_phrase03".
phrase_id = READY_PHRASES[0] if READY_PHRASES else None
assert phrase_id is not None, "No ready phrases found. Check Step 1 output."

print(f"[INFO] Inspecting phrase: {phrase_id}")

# Gather file paths for this phrase
mix_path = os.path.join(AUDIO_DIR, PHRASE_GROUPS[phrase_id]["mix"][0])
stem_pairs = PHRASE_GROUPS[phrase_id]["stems"]  # list of (part, filename)

# Load mix at native sample rate, mono for simplicity
y_mix, sr = librosa.load(mix_path, sr=None, mono=True)
dur_mix = len(y_mix) / sr

print(f"[INFO] Loaded mix: {os.path.basename(mix_path)}")
print(f"[INFO] Sample rate: {sr} Hz")
print(f"[INFO] Mix duration: {dur_mix:.3f} s")
print(f"[INFO] Found {len(stem_pairs)} stems:", [p for p, _ in stem_pairs])

# Load stems and track durations
stems_audio = {}
durations = {}

for part, fname in stem_pairs:
    path = os.path.join(AUDIO_DIR, fname)
    y, sr_stem = librosa.load(path, sr=sr, mono=True)  # force same sr as mix
    stems_audio[part] = y
    durations[part] = len(y) / sr
    if sr_stem != sr:
        print(f"[WARN] {fname} sample rate was {sr_stem} and was resampled to {sr}")

# Alignment check: compare lengths
print("\n[INFO] Duration check (seconds):")
print(f"   mix: {dur_mix:.3f}")
for part in sorted(stems_audio.keys()):
    print(f"   {part}: {durations[part]:.3f}")

# Simple tolerance for small mismatches due to trims or silence
TOL = 0.02  # 20 ms

def trim_or_pad_to_match(y, target_len):
    if len(y) == target_len:
        return y
    if len(y) > target_len:
        return y[:target_len]
    pad = target_len - len(y)
    return np.pad(y, (0, pad), mode="constant")

target_len = len(y_mix)
fixed = {}

needs_fix = []
for part, y in stems_audio.items():
    if abs(len(y) - target_len) / sr > TOL:
        needs_fix.append((part, (len(y) - target_len) / sr))

if needs_fix:
    print("\n[WARN] Some stems differ from mix length by more than 20 ms:")
    for part, diff_s in needs_fix:
        print(f"   {part}: {diff_s:+.3f} s difference")
    print("[INFO] Trimming or padding stems to match mix length for this session view.")

# Create aligned copies for analysis
for part, y in stems_audio.items():
    fixed[part] = trim_or_pad_to_match(y, target_len)

# Optional quick plots
PLOT = True
if PLOT:
    # Plot mix and up to three stems to keep it readable
    to_plot = ["mix"] + sorted(list(fixed.keys()))[:3]
    plt.figure(figsize=(12, 6))
    ax = plt.gca()
    t = np.arange(len(y_mix)) / sr
    ax.plot(t, y_mix, linewidth=0.8, label="mix")
    for part in to_plot[1:]:
        ax.plot(t, fixed[part], linewidth=0.6, alpha=0.8, label=part)
    ax.set_title(f"Waveforms for {phrase_id} (mix and first stems)")
    ax.set_xlabel("Time (s)")
    ax.set_ylabel("Amplitude")
    ax.legend()
    plt.show()

print("\n[INFO] Step 2 complete.")
print("[INFO] Variables available for next step:")
print("   phrase_id      ->", phrase_id)
print("   y_mix, sr      -> NumPy array audio and sample rate")
print("   stems_audio    -> dict of raw stems by part")
print("   fixed          -> dict of length-aligned stems by part")


### Step 3 — Pitch tracking with CREPE on mix and stems

This step estimates fundamental frequency (f0) for the selected phrase using CREPE. It runs on the mix and on each stem, aligns frame lengths, applies a confidence mask, and reports voiced coverage. Quick plots help verify that tracks look sensible before role assignment. Results are saved for the next step.


In [None]:
# Step 3: Pitch tracking with CREPE on mix and stems

import numpy as np
import librosa
import crepe
import matplotlib.pyplot as plt

# Use variables from Step 2: phrase_id, y_mix, sr, fixed (aligned stems)
print(f"[INFO] Pitch tracking for {phrase_id}")

TARGET_SR = 16000
STEP_MS = 10
CONF_THR = 0.5
USE_VITERBI = True

def estimate_f0_crepe(y, sr, step_ms=10, conf_thr=0.5, viterbi=True):
    """Resample to 16 kHz, run CREPE, and mask low-confidence frames."""
    y16 = librosa.resample(y, orig_sr=sr, target_sr=TARGET_SR)
    time, freq, conf, _ = crepe.predict(
        y16, TARGET_SR, step_size=step_ms, viterbi=viterbi, model_capacity="full"
    )
    freq = freq.astype(np.float32)
    conf = conf.astype(np.float32)
    freq_masked = freq.copy()
    freq_masked[conf < conf_thr] = np.nan
    return time, freq_masked, conf

# 1) Mix f0
t_mix, f0_mix, c_mix = estimate_f0_crepe(y_mix, sr, step_ms=STEP_MS, conf_thr=CONF_THR, viterbi=USE_VITERBI)
T_len = len(f0_mix)

# 2) Stem f0s
f0_stems = {}
c_stems = {}
for part in sorted(fixed.keys()):
    t_s, f0_s, c_s = estimate_f0_crepe(fixed[part], sr, step_ms=STEP_MS, conf_thr=CONF_THR, viterbi=USE_VITERBI)
    # Keep lengths consistent by trimming to the shortest
    min_len = min(T_len, len(f0_s))
    if min_len != T_len:
        T_len = min_len
    f0_stems[part] = f0_s
    c_stems[part] = c_s

# 3) Align all to common length
f0_mix = f0_mix[:T_len]
c_mix = c_mix[:T_len]
t_crepe = t_mix[:T_len]  # CREPE time base at 16 kHz

for part in list(f0_stems.keys()):
    f0_stems[part] = f0_stems[part][:T_len]
    c_stems[part] = c_stems[part][:T_len]

# 4) Coverage stats
def voiced_pct(f0):
    total = len(f0)
    voiced = np.sum(~np.isnan(f0))
    return 100.0 * voiced / max(total, 1)

print(f"[INFO] Frames (CREPE): {T_len}, step {STEP_MS} ms, duration ~ {T_len * STEP_MS / 1000:.2f} s")
print(f"[INFO] Mix voiced coverage: {voiced_pct(f0_mix):.1f}%")
for part in sorted(f0_stems.keys()):
    print(f"   {part:>8} voiced coverage: {voiced_pct(f0_stems[part]):.1f}%")

# 5) Quick visualization
PLOT = True
if PLOT:
    plt.figure(figsize=(12, 6))
    # Plot mix f0
    plt.plot(t_crepe, f0_mix, linewidth=1.2, label="mix f0 (Hz)")
    # Plot up to three stems for readability
    for i, part in enumerate(sorted(f0_stems.keys())[:3]):
        plt.plot(t_crepe, f0_stems[part], linewidth=0.9, alpha=0.9, label=f"{part} f0 (Hz)")
    plt.xlabel("Time (s)")
    plt.ylabel("Frequency (Hz)")
    plt.title(f"CREPE f0 for {phrase_id} (mix and stems)")
    plt.legend()
    plt.show()

# 6) Persist results for the next step (role assignment)
F0_TIME = t_crepe
F0_MIX = f0_mix
CONF_MIX = c_mix
F0_STEMS = f0_stems
CONF_STEMS = c_stems

print("\n[INFO] Step 3 complete.")
print("[INFO] Variables available for next step:")
print("   F0_TIME    -> time base from CREPE (seconds)")
print("   F0_MIX     -> f0 track for mix (Hz, NaN for unvoiced)")
print("   F0_STEMS   -> dict of f0 tracks per part")
print("   CONF_MIX   -> CREPE confidence for mix")
print("   CONF_STEMS -> dict of CREPE confidence per part")


### Step 4 — Role assignment from stem f0 tracks

This step assigns musical roles per frame using the stem f0 tracks. It ranks available pitches and labels them as low, mid, or high, depending on how many parts are present. It applies light smoothing, reports coverage, and plots the result. Outputs feed the MIDI export in the next step.


In [None]:
# Step 4: Role assignment from stem f0 tracks (adaptive to 2, 3, or 4 parts)

import numpy as np
import pandas as pd
import matplotlib.pyplot as plt

# Uses from Step 3:
#   phrase_id, F0_TIME, F0_STEMS (dict part->f0 ndarray), F0_MIX (optional), CONF_STEMS
print(f"[INFO] Role assignment for {phrase_id}")

# 1) Pack stems into a matrix [P, T]
part_names = sorted(F0_STEMS.keys())
assert len(part_names) >= 2, "Need at least two stems for role assignment."

F = np.vstack([F0_STEMS[p] for p in part_names])  # shape [P, T], Hz with NaN for unvoiced
T = F.shape[1]

def median_smooth_vec(x, k=7):
    s = pd.Series(x)
    s = s.fillna(method="ffill").fillna(method="bfill")
    return s.rolling(k, center=True, min_periods=1).median().values

# 2) Optional light smoothing on each stem f0 to reduce jitter
F_smooth = F.copy()
for i in range(F.shape[0]):
    mask = ~np.isnan(F[i])
    if mask.any():
        F_smooth[i, mask] = median_smooth_vec(F[i, mask], k=7)

# 3) Assign roles per frame by sorting available pitches
roles_avail = ["low", "mid", "high"]  # we will drop "mid" when only 2 parts exist

role_f0 = {r: np.full(T, np.nan, dtype=float) for r in roles_avail}
role_src = {r: np.full(T, -1, dtype=int) for r in roles_avail}  # index of source part per frame

for t in range(T):
    vals = [(i, F_smooth[i, t]) for i in range(F_smooth.shape[0]) if not np.isnan(F_smooth[i, t])]
    if not vals:
        continue
    vals.sort(key=lambda x: x[1])  # by frequency ascending
    if len(vals) >= 3:
        mapping = [("low", 0), ("mid", 1), ("high", -1)]
    elif len(vals) == 2:
        mapping = [("low", 0), ("high", -1)]  # no "mid"
    else:  # single pitch present
        mapping = [("high", -1)]
    for role, pos in mapping:
        idx = vals[pos][0]
        role_src[role][t] = idx
        role_f0[role][t] = vals[pos][1]

# 4) Post smoothing on role f0 to improve continuity
for r in list(role_f0.keys()):
    f = role_f0[r]
    if np.all(np.isnan(f)):
        continue
    mask = ~np.isnan(f)
    role_f0[r][mask] = median_smooth_vec(f[mask], k=11)

# 5) Coverage and basic stats
def voiced_pct(x):
    return 100.0 * np.sum(~np.isnan(x)) / max(len(x), 1)

print("[INFO] Role voiced coverage:")
for r in roles_avail:
    if np.all(np.isnan(role_f0[r])):
        continue
    print(f"   {r:>4}: {voiced_pct(role_f0[r]):.1f}%")

# Map roles to most common source part for a quick sanity check
for r in roles_avail:
    src = role_src[r]
    src = src[src >= 0]
    if len(src) == 0:
        continue
    counts = pd.Series(src).value_counts()
    top_idx = counts.index[0]
    print(f"[INFO] Role '{r}' most often comes from stem: {part_names[top_idx]}  ({int(100*counts.iloc[0]/len(src))}% of voiced frames)")

# 6) Plot roles vs stems for a quick visual check
PLOT = True
if PLOT:
    plt.figure(figsize=(12, 6))
    # plot stems faint
    for i, p in enumerate(part_names):
        plt.plot(F0_TIME, F_smooth[i], linewidth=0.8, alpha=0.5, label=f"{p} stem f0")
    # plot roles thicker
    color_order = {"low": None, "mid": None, "high": None}  # let matplotlib pick defaults
    for r in roles_avail:
        if np.all(np.isnan(role_f0[r])):
            continue
        plt.plot(F0_TIME, role_f0[r], linewidth=2.0, label=f"{r} role f0")
    plt.xlabel("Time (s)")
    plt.ylabel("Frequency (Hz)")
    plt.title(f"Role assignment for {phrase_id}")
    plt.legend()
    plt.show()

# 7) Persist for next step (MIDI export)
ROLE_F0 = role_f0       # dict role -> f0 array
ROLE_SRC = role_src     # dict role -> source index over time
ROLE_PART_NAMES = part_names

print("\n[INFO] Step 4 complete.")
print("[INFO] Variables available for next step:")
print("   ROLE_F0          -> dict of role f0 tracks: 'low', 'mid', 'high' (some may be NaN)")
print("   ROLE_SRC         -> dict of per-frame source part indices for each role")
print("   ROLE_PART_NAMES  -> list mapping source indices to part names")


### Step 5 — Export roles as MIDI for quick listening and verification

This step converts role f0 tracks into simple note sequences and writes a MIDI file per role. MIDI makes it easy to check pitch and phrasing in a DAW or notation app, and is a lightweight way to evaluate separation quality before attempting audio resynthesis. Files are saved alongside your audio.


In [None]:
# Step 5: Export roles as MIDI

import numpy as np
import pretty_midi

# Replace the deprecated fillna(method=...) usage from Step 4, if you re-use it later
def median_smooth_vec(x, k=7):
    import pandas as pd
    s = pd.Series(x)
    s = s.ffill().bfill()
    return s.rolling(k, center=True, min_periods=1).median().values

def f0_to_midi_notes(times_s, f0_hz, min_note_ms=80):
    """
    Convert an f0 curve to monophonic notes with simple segmentation.
    Drops NaNs. Merges short blips. Returns a list of (start, end, midi_pitch).
    """
    if len(times_s) == 0 or len(f0_hz) == 0:
        return []
    assert len(times_s) == len(f0_hz)
    dt = np.median(np.diff(times_s)) if len(times_s) > 1 else 0.01
    min_frames = max(1, int((min_note_ms / 1000.0) / dt))

    notes = []
    cur_pitch = None
    cur_start = None

    for i, hz in enumerate(f0_hz):
        if np.isnan(hz):
            # close any open note
            if cur_pitch is not None:
                # end at current frame time
                end_t = times_s[i]
                # enforce minimum length
                if int((end_t - cur_start) / dt) >= min_frames:
                    notes.append((cur_start, end_t, cur_pitch))
                cur_pitch, cur_start = None, None
            continue

        pitch = int(round(pretty_midi.hz_to_note_number(hz)))

        if cur_pitch is None:
            cur_pitch, cur_start = pitch, times_s[i]
        elif pitch != cur_pitch:
            # close previous
            end_t = times_s[i]
            if int((end_t - cur_start) / dt) >= min_frames:
                notes.append((cur_start, end_t, cur_pitch))
            # start new
            cur_pitch, cur_start = pitch, times_s[i]

    # close tail
    if cur_pitch is not None:
        end_t = times_s[-1] + dt
        if int((end_t - cur_start) / dt) >= min_frames:
            notes.append((cur_start, end_t, cur_pitch))

    return notes

def write_role_midi(role_name, times_s, f0_hz, out_path, program=54):
    """
    Writes a single-track MIDI file for a role.
    Default program 54 is a choir-like synth in General MIDI.
    """
    # Guard: skip entirely empty role
    if np.all(np.isnan(f0_hz)):
        print(f"[INFO] Role '{role_name}' has no voiced frames. Skipping MIDI.")
        return False

    pm = pretty_midi.PrettyMIDI()
    inst = pretty_midi.Instrument(program=program, name=role_name)

    notes = f0_to_midi_notes(times_s, f0_hz, min_note_ms=80)
    for start, end, pitch in notes:
        inst.notes.append(pretty_midi.Note(velocity=80, pitch=pitch, start=start, end=end))

    if len(inst.notes) == 0:
        print(f"[INFO] Role '{role_name}' produced zero notes after filtering. Skipping.")
        return False

    pm.instruments.append(inst)
    pm.write(out_path)
    print(f"[OK] Wrote {out_path} with {len(inst.notes)} notes.")
    return True

# Export available roles for the current phrase
exported = []
for role in ["low", "mid", "high"]:
    if role not in ROLE_F0:
        continue
    out_mid = f"{phrase_id}_{role}.mid"
    ok = write_role_midi(role, F0_TIME, ROLE_F0[role], out_mid, program=54)
    if ok:
        exported.append(out_mid)

print("\n[INFO] Step 5 complete.")
print("[INFO] Exported MIDI files:", exported)
print("[INFO] You can drag these into a DAW or MuseScore to inspect the lines.")


### Step 6 — Quick audio previews from MIDI (listen in Colab)

This step turns each role’s MIDI into a simple synthesized preview so you can hear the separation without any heavy models. It renders sine‑based audio with a tiny fade to avoid clicks, saves WAVs, and embeds players in Colab. This is just for auditing pitch and phrasing before fancier resynthesis.


In [None]:
# Step 6: Render simple audio previews from role MIDIs and play them in Colab

import numpy as np
import pretty_midi
import soundfile as sf
from IPython.display import Audio, display

SR_PREVIEW = 16000  # lightweight sample rate for quick previews

def render_sine(f_hz, dur_s, sr=SR_PREVIEW):
    n = int(dur_s * sr)
    t = np.arange(n) / sr
    y = np.sin(2 * np.pi * f_hz * t)
    # tiny fade to avoid clicks
    fade = int(0.01 * sr)
    if fade > 0 and n > 2 * fade:
        ramp = np.linspace(0, 1, fade)
        y[:fade] *= ramp
        y[-fade:] *= ramp[::-1]
    return y

def midi_to_preview_wav(midi_path, out_wav, sr=SR_PREVIEW):
    pm = pretty_midi.PrettyMIDI(midi_path)
    dur = pm.get_end_time()
    if dur <= 0:
        print(f"[WARN] {midi_path} has zero duration.")
        return None
    y = np.zeros(int(np.ceil(dur * sr)), dtype=np.float32)
    for inst in pm.instruments:
        for note in inst.notes:
            f = pretty_midi.note_number_to_hz(note.pitch)
            start = int(note.start * sr)
            end = int(note.end * sr)
            y[start:end] += 0.25 * render_sine(f, (end - start) / sr, sr=sr)
    # normalize gently
    peak = np.max(np.abs(y)) or 1.0
    y = (y / peak).astype(np.float32)
    sf.write(out_wav, y, sr)
    return out_wav

exported = [f"{phrase_id}_low.mid", f"{phrase_id}_mid.mid", f"{phrase_id}_high.mid"]
made = []
for m in exported:
    try:
        wav = m.replace(".mid", "_preview.wav")
        path = midi_to_preview_wav(m, wav, sr=SR_PREVIEW)
        if path:
            made.append(path)
            print(f"[OK] Wrote {path}")
    except FileNotFoundError:
        pass

print("\n[INFO] Inline players below (if any previews were created):")
for w in made:
    display(Audio(w, rate=SR_PREVIEW))


### Step 7 — MIDI vs stem pitch validation (stats + plot)

This cell compares each role’s MIDI to its matching stem audio. It extracts f0 from the stem with CREPE, converts MIDI notes to f0 on a 10 ms grid, and computes pitch error in cents on frames where both are voiced. It reports coverage and error stats and plots the two curves for a quick visual check.


In [None]:
# Step 7: Validate MIDI vs stem with pitch stats

# Requirements: pretty_midi, librosa, crepe, matplotlib already installed
import os
import numpy as np
import pandas as pd
import librosa
import pretty_midi
import matplotlib.pyplot as plt
import crepe

# Configuration
STEP_MS = 10          # analysis hop, must match your earlier CREPE step
CONF_THR = 0.5        # confidence threshold for stem f0
TARGET_SR = 16000     # CREPE expects 16 kHz
ROLES_TO_CHECK = ["low", "mid", "high"]  # will skip roles without MIDI

# Uses these from earlier cells:
# - AUDIO_DIR
# - phrase_id
# - PHRASE_GROUPS
# - ROLE_SRC, ROLE_PART_NAMES  (optional, used to auto-map role -> stem)

def estimate_f0_crepe(y, sr, step_ms=10, thr=0.5):
    y16 = librosa.resample(y, orig_sr=sr, target_sr=TARGET_SR)
    times, f0, conf, _ = crepe.predict(
        y16, TARGET_SR, step_size=step_ms, viterbi=True, model_capacity="full"
    )
    f0 = f0.astype(np.float32)
    conf = conf.astype(np.float32)
    f0[conf < thr] = np.nan
    return times, f0, conf

def midi_to_f0_series(pm, step_ms=10):
    """Rasterize MIDI to an f0 series on uniform time steps."""
    dur = pm.get_end_time()
    if dur <= 0:
        return np.array([0.0]), np.array([np.nan], dtype=np.float32)
    dt = step_ms / 1000.0
    times = np.arange(0.0, dur + dt/2, dt)
    f0 = np.full_like(times, np.nan, dtype=np.float32)
    # Assume monophonic per role
    for inst in pm.instruments:
        for note in inst.notes:
            start_idx = int(np.floor(note.start / dt))
            end_idx = int(np.ceil(note.end / dt))
            hz = pretty_midi.note_number_to_hz(note.pitch)
            f0[start_idx:end_idx] = hz
    return times, f0

def cents_error(f_ref, f_est):
    mask = (~np.isnan(f_ref)) & (~np.isnan(f_est)) & (f_ref > 0) & (f_est > 0)
    if not mask.any():
        return np.array([]), mask
    err = 1200.0 * np.log2(f_est[mask] / f_ref[mask])
    return err, mask

def most_common_src_index(src_indices):
    src = src_indices[src_indices >= 0]
    if len(src) == 0:
        return None
    return int(pd.Series(src).mode().iloc[0])

def infer_role_to_stem(phrase_id):
    """Try to infer the best stem filename for each role using ROLE_SRC majority vote."""
    role_to_stem = {}
    if 'ROLE_SRC' in globals() and 'ROLE_PART_NAMES' in globals():
        for role, arr in ROLE_SRC.items():
            idx = most_common_src_index(arr)
            if idx is None:
                continue
            part_name = ROLE_PART_NAMES[idx]
            # find filename in phrase group
            for part, fname in PHRASE_GROUPS[phrase_id]["stems"]:
                if part == part_name:
                    role_to_stem[role] = fname
                    break
    return role_to_stem

# Build role -> stem map
role_to_stem = infer_role_to_stem(phrase_id)

# Fallback: if not inferred, try name heuristics
if not role_to_stem:
    heur = {
        "high": ["soprano", "alto", "tenor2", "tenor1", "tenor"],
        "mid":  ["tenor", "tenor1", "tenor2", "alto", "baritone"],
        "low":  ["baritone", "bass", "tenor", "tenor1"]
    }
    stems_here = dict(PHRASE_GROUPS[phrase_id]["stems"])
    for role in ROLES_TO_CHECK:
        for cand in heur.get(role, []):
            if cand in stems_here:
                role_to_stem[role] = stems_here[cand]
                break

print("[INFO] Role → stem mapping to evaluate:")
for r in ROLES_TO_CHECK:
    midi_file = f"{phrase_id}_{r}.mid"
    stem_file = role_to_stem.get(r)
    exists_midi = os.path.exists(midi_file)
    print(f"   {r:>4}: MIDI={exists_midi and midi_file or 'missing'}  |  stem={stem_file or 'not mapped'}")

results = []

for role in ROLES_TO_CHECK:
    midi_path = f"{phrase_id}_{role}.mid"
    stem_fname = role_to_stem.get(role)
    if not (os.path.exists(midi_path) and stem_fname):
        continue

    # Load stem audio
    stem_path = os.path.join(AUDIO_DIR, stem_fname)
    y, sr = librosa.load(stem_path, sr=None, mono=True)

    # f0 for stem
    t_stem, f0_stem, conf = estimate_f0_crepe(y, sr, step_ms=STEP_MS, thr=CONF_THR)

    # f0 for MIDI
    pm = pretty_midi.PrettyMIDI(midi_path)
    t_midi, f0_midi = midi_to_f0_series(pm, step_ms=STEP_MS)

    # Align to a common time base
    T = min(len(t_stem), len(t_midi))
    t = t_stem[:T]
    f0_s = f0_stem[:T]
    f0_m = f0_midi[:T]

    # Metrics
    err_cents, mask = cents_error(f0_s, f0_m)
    voiced_overlap_pct = 100.0 * np.mean(mask) if len(mask) else 0.0
    stats = {}
    if err_cents.size:
        stats = {
            "mean_abs_cents": float(np.mean(np.abs(err_cents))),
            "median_abs_cents": float(np.median(np.abs(err_cents))),
            "p90_abs_cents": float(np.percentile(np.abs(err_cents), 90)),
            "signed_mean_cents": float(np.mean(err_cents)),
            "voiced_overlap_pct": float(voiced_overlap_pct)
        }
    else:
        stats = {
            "mean_abs_cents": np.nan,
            "median_abs_cents": np.nan,
            "p90_abs_cents": np.nan,
            "signed_mean_cents": np.nan,
            "voiced_overlap_pct": float(voiced_overlap_pct)
        }

    results.append({"phrase": phrase_id, "role": role, "stem": stem_fname, **stats})

    # Plot
    plt.figure(figsize=(12, 5))
    plt.plot(t, f0_s, linewidth=1.2, label=f"stem f0: {stem_fname}")
    plt.plot(t, f0_m, linewidth=2.0, alpha=0.9, label=f"MIDI f0: {role}")
    plt.xlabel("Time (s)")
    plt.ylabel("Frequency (Hz)")
    plt.title(f"Pitch comparison: {phrase_id} — {role}")
    plt.legend()
    plt.show()

# Summary table
df_res = pd.DataFrame(results)
display(df_res)

# Save results
out_csv = f"{phrase_id}_midi_vs_stem_pitch_stats.csv"
df_res.to_csv(out_csv, index=False)
print(f"[OK] Saved stats to {out_csv}")

# Quick read on quality
if not df_res.empty:
    print("\n[INFO] Rough guide: <25 cents median is very good, 25–50 cents acceptable, >50 cents needs attention.")
