# BPM Detection Algorithm Visualizer

This notebook re-implements and visualizes each stage of the C++ BPM detection pipeline:

1. **Audio Loading** — Load and display the raw waveform
2. **Onset Detection** — Mel-frequency spectral flux
3. **Tempo Estimation** — Autocorrelation with log-Gaussian prior
4. **Beat Tracking** — Dynamic programming (Ellis 2007)
5. **Meter Detection** — Accent pattern analysis for time signature detection
6. **Metronome Overlay** — Synthesized click track with accented downbeats

All algorithm parameters match the C++ implementation exactly. See `IMPLEMENTATION_NOTES.md` for detailed comparisons with the research literature.

## 1. Setup & Audio Loading

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import librosa
import subprocess
import tempfile
import os
from scipy.signal import get_window

plt.rcParams['figure.figsize'] = (14, 4)
plt.rcParams['figure.dpi'] = 100

# --- Configuration ---
# Set AUDIO_PATH to a local file OR a YouTube URL
AUDIO_PATH = 'https://www.youtube.com/watch?v=WdSGEvDGZAo&themeRefresh=1'
MIN_BPM = 50.0
MAX_BPM = 220.0
KNOWN_BPM = None  # Set to None if unknown

In [None]:
# Download from YouTube if URL, otherwise load directly
_temp_wav = None

# Install yt-dlp if not already installed
try:
    subprocess.run(['yt-dlp', '--version'], capture_output=True, check=True)
except FileNotFoundError:
    print('yt-dlp not found, installing...')
    subprocess.run(['pip', 'install', 'yt-dlp'], capture_output=True, check=True)
    print('yt-dlp installed.')

if '://' in AUDIO_PATH:
    print(f'Downloading from YouTube: {AUDIO_PATH}')
    _temp_wav = os.path.join(tempfile.gettempdir(), 'bpm_viz_audio.wav')
    _temp_dl = os.path.join(tempfile.gettempdir(), 'bpm_viz_download')

    # Get video title
    result = subprocess.run(
        ['yt-dlp', '--get-title', '--no-playlist', AUDIO_PATH],
        capture_output=True, text=True)
    video_title = result.stdout.strip() if result.returncode == 0 else ''
    print(f'Title: {video_title}')

    # Download best audio
    subprocess.run(
        ['yt-dlp', '-f', 'bestaudio', '--no-playlist', '-o', _temp_dl, AUDIO_PATH],
        capture_output=True)

    # Convert to WAV via ffmpeg
    subprocess.run(
        ['ffmpeg', '-y', '-i', _temp_dl, '-vn', '-acodec', 'pcm_s16le',
         '-ar', '44100', '-ac', '2', _temp_wav],
        capture_output=True)

    if os.path.exists(_temp_dl):
        os.remove(_temp_dl)

    audio_file = _temp_wav
    print(f'Downloaded and converted to WAV')
else:
    audio_file = AUDIO_PATH

# Load audio (stereo)
y_stereo, sr = librosa.load(audio_file, sr=None, mono=False)

# Clean up temp file
if _temp_wav and os.path.exists(_temp_wav):
    os.remove(_temp_wav)

# Handle mono files loaded by librosa
if y_stereo.ndim == 1:
    y_stereo = np.stack([y_stereo, y_stereo])

# Convert to mono (average channels)
y_mono = np.mean(y_stereo, axis=0)

duration = len(y_mono) / sr
print(f'Sample rate: {sr} Hz')
print(f'Channels: {y_stereo.shape[0]}')
print(f'Frames: {len(y_mono)}')
print(f'Duration: {duration:.2f} sec')

In [None]:
# Plot stereo waveform
time_axis = np.arange(len(y_mono)) / sr

fig, axes = plt.subplots(2, 1, figsize=(14, 5), sharex=True)
for ch in range(y_stereo.shape[0]):
    axes[ch].plot(time_axis, y_stereo[ch], linewidth=0.3, color='steelblue')
    axes[ch].set_ylabel(f'Ch {ch+1}')
    axes[ch].set_ylim(-1, 1)
axes[0].set_title('Stereo Waveform')
axes[1].set_xlabel('Time (s)')
plt.tight_layout()
plt.show()

# Plot mono waveform
fig, ax = plt.subplots(figsize=(14, 3))
ax.plot(time_axis, y_mono, linewidth=0.3, color='darkblue')
ax.set_title('Mono Waveform')
ax.set_xlabel('Time (s)')
ax.set_ylabel('Amplitude')
ax.set_ylim(-1, 1)
plt.tight_layout()
plt.show()

## 2. Onset Detection (Mel-Frequency Spectral Flux)

The onset detector converts raw audio into a 1D **onset strength function** that peaks at note onsets.

**Algorithm** (matches `src/onset_detector.cpp`):
1. Apply a Hann window (2048 samples) and compute the FFT with 512-sample hops
2. Compute the power spectrum: $|X[k]|^2$
3. Apply a 40-band mel filterbank (30–8000 Hz) to get perceptually-weighted energy
4. Log-compress: $\log_{10}(\text{energy} + 10^{-10})$
5. Compute half-wave rectified difference between consecutive frames (spectral flux)
6. Sum flux across all mel bands to get one value per frame
7. Normalize to zero mean, unit variance

In [None]:
# --- Onset Detection Parameters (match C++) ---
FFT_SIZE = 2048
HOP_SIZE = 512
MEL_BANDS = 40
MEL_LOW_HZ = 30.0
MEL_HIGH_HZ = 8000.0

def hz_to_mel(hz):
    return 2595.0 * np.log10(1.0 + hz / 700.0)

def mel_to_hz(mel):
    return 700.0 * (10.0 ** (mel / 2595.0) - 1.0)

def build_mel_filterbank(sr, fft_size, n_mels, low_hz, high_hz):
    """Build triangular mel filterbank matching the C++ implementation."""
    low_mel = hz_to_mel(low_hz)
    high_mel = hz_to_mel(high_hz)
    mel_points = np.linspace(low_mel, high_mel, n_mels + 2)
    hz_points = mel_to_hz(mel_points)
    bin_points = np.floor((fft_size + 1) * hz_points / sr).astype(int)
    bin_points = np.clip(bin_points, 0, fft_size // 2)

    n_fft_bins = fft_size // 2 + 1
    filters = np.zeros((n_mels, n_fft_bins))

    for band in range(n_mels):
        left = bin_points[band]
        center = bin_points[band + 1]
        right = bin_points[band + 2]
        if center == left:
            center = left + 1
        if right == center:
            right = center + 1
        for b in range(left, center):
            if 0 <= b <= fft_size // 2:
                filters[band, b] = (b - left) / (center - left)
        for b in range(center, right):
            if 0 <= b <= fft_size // 2:
                filters[band, b] = (right - b) / (right - center)

    return filters

mel_filters = build_mel_filterbank(sr, FFT_SIZE, MEL_BANDS, MEL_LOW_HZ, MEL_HIGH_HZ)
print(f'Mel filterbank shape: {mel_filters.shape} (bands x FFT bins)')

In [None]:
# Visualize the mel filterbank
fig, ax = plt.subplots(figsize=(14, 4))
freqs = np.arange(FFT_SIZE // 2 + 1) * sr / FFT_SIZE
for band in range(MEL_BANDS):
    ax.plot(freqs, mel_filters[band], linewidth=0.8)
ax.set_title(f'Mel Filterbank ({MEL_BANDS} bands, {MEL_LOW_HZ}-{MEL_HIGH_HZ} Hz)')
ax.set_xlabel('Frequency (Hz)')
ax.set_ylabel('Weight')
ax.set_xlim(0, MEL_HIGH_HZ + 500)
plt.tight_layout()
plt.show()

In [None]:
# --- Compute onset strength (matching C++ exactly) ---
window = np.hanning(FFT_SIZE)

n_frames = 1 + (len(y_mono) - FFT_SIZE) // HOP_SIZE if len(y_mono) >= FFT_SIZE else 0
print(f'Number of STFT frames: {n_frames}')

# Compute mel spectrogram and onset strength
mel_spectrogram = np.zeros((n_frames, MEL_BANDS))
onset_strength = np.zeros(n_frames)
prev_mel = np.zeros(MEL_BANDS)

for i in range(n_frames):
    offset = i * HOP_SIZE
    frame = y_mono[offset:offset + FFT_SIZE] * window

    # FFT and power spectrum
    spectrum = np.fft.rfft(frame)
    power = np.abs(spectrum) ** 2

    # Mel energy
    mel_energy = np.log10(mel_filters @ power + 1e-10)
    mel_spectrogram[i] = mel_energy

    # Half-wave rectified spectral flux
    diff = mel_energy - prev_mel
    flux = np.sum(np.maximum(diff, 0.0))
    onset_strength[i] = flux
    prev_mel = mel_energy.copy()

# Normalize to zero mean, unit variance
mean = np.mean(onset_strength)
std = np.std(onset_strength)
if std > 1e-6:
    onset_strength = (onset_strength - mean) / std

print(f'Onset strength: {len(onset_strength)} frames')
print(f'Frame rate: {sr / HOP_SIZE:.2f} fps')

In [None]:
# Plot mel spectrogram
onset_times = np.arange(n_frames) * HOP_SIZE / sr

fig, ax = plt.subplots(figsize=(14, 5))
img = ax.imshow(mel_spectrogram.T, aspect='auto', origin='lower',
                extent=[0, onset_times[-1], 0, MEL_BANDS],
                cmap='magma', interpolation='nearest')
ax.set_title('Mel Spectrogram (log energy per mel band)')
ax.set_xlabel('Time (s)')
ax.set_ylabel('Mel Band')
plt.colorbar(img, ax=ax, label='log10(energy)')
plt.tight_layout()
plt.show()

In [None]:
# Plot onset strength overlaid on waveform
fig, ax1 = plt.subplots(figsize=(14, 4))

ax1.plot(time_axis, y_mono, linewidth=0.2, color='lightgray', label='Waveform')
ax1.set_ylabel('Amplitude', color='gray')
ax1.set_ylim(-1, 1)

ax2 = ax1.twinx()
ax2.plot(onset_times, onset_strength, linewidth=0.6, color='red', label='Onset strength')
ax2.set_ylabel('Onset Strength (normalized)', color='red')

ax1.set_title('Onset Strength Function overlaid on Waveform')
ax1.set_xlabel('Time (s)')
plt.tight_layout()
plt.show()

## 3. Tempo Estimation (Autocorrelation + Log-Gaussian Prior)

The tempo estimator finds the dominant periodicity in the onset strength function.

**Algorithm** (matches `src/tempo_estimator.cpp`):
1. Compute normalized autocorrelation for candidate lags (derived from BPM range)
2. Apply a log-Gaussian tempo prior centered at 120 BPM ($\sigma = 1.0$ octave)
3. Select the lag with highest weighted score
4. **Octave correction**: iteratively halve the lag if the half-lag peak is genuine
5. **Half-tempo cap**: if BPM > 200, double the lag
6. **Parabolic interpolation** for sub-lag BPM precision
7. Collect top 5 candidate periods for multi-candidate beat tracking

In [None]:
# --- Tempo Estimation (matching C++ exactly) ---
frame_rate = sr / HOP_SIZE

max_lag = int(np.floor(60.0 * frame_rate / max(MIN_BPM, 1.0)))
min_lag = int(np.ceil(60.0 * frame_rate / MAX_BPM))
min_lag = max(min_lag, 1)
max_lag = min(max_lag, len(onset_strength) - 1)

print(f'Frame rate: {frame_rate:.2f} fps')
print(f'Lag range: {min_lag} to {max_lag} frames')
print(f'BPM range: {60.0 * frame_rate / max_lag:.1f} to {60.0 * frame_rate / min_lag:.1f}')

In [None]:
# Compute normalized autocorrelation
autocorr = np.zeros(max_lag + 1)
for lag in range(min_lag, max_lag + 1):
    count = len(onset_strength) - lag
    if count > 0:
        autocorr[lag] = np.sum(onset_strength[lag:] * onset_strength[:count]) / count

# Apply log-Gaussian tempo prior
PRIOR_CENTER = 120.0  # BPM
PRIOR_SIGMA = 1.0     # octaves

weighted = np.zeros_like(autocorr)
prior_values = np.zeros_like(autocorr)

for lag in range(min_lag, max_lag + 1):
    bpm = 60.0 * frame_rate / lag
    if bpm > 0:
        log_ratio = np.log2(bpm / PRIOR_CENTER)
        prior = np.exp(-0.5 * log_ratio**2 / PRIOR_SIGMA**2)
        prior_values[lag] = prior
        weighted[lag] = autocorr[lag] * prior

best_lag = min_lag + np.argmax(weighted[min_lag:max_lag + 1])
print(f'Best lag before octave correction: {best_lag} ({60.0 * frame_rate / best_lag:.1f} BPM)')

In [None]:
# Plot raw autocorrelation
lags = np.arange(min_lag, max_lag + 1)
bpms = 60.0 * frame_rate / lags

fig, axes = plt.subplots(3, 1, figsize=(14, 10))

# Raw autocorrelation
ax = axes[0]
ax.plot(bpms, autocorr[min_lag:max_lag + 1], linewidth=0.8, color='steelblue')
ax.set_title('Raw Autocorrelation')
ax.set_xlabel('BPM')
ax.set_ylabel('Autocorrelation')
ax.invert_xaxis()

# Tempo prior
ax = axes[1]
ax.plot(bpms, prior_values[min_lag:max_lag + 1], linewidth=1.5, color='orange')
ax.axvline(PRIOR_CENTER, color='red', linestyle='--', alpha=0.7, label=f'Center: {PRIOR_CENTER} BPM')
ax.set_title(f'Log-Gaussian Tempo Prior (center={PRIOR_CENTER} BPM, sigma={PRIOR_SIGMA} octave)')
ax.set_xlabel('BPM')
ax.set_ylabel('Prior Weight')
ax.legend()
ax.invert_xaxis()

# Weighted autocorrelation
ax = axes[2]
ax.plot(bpms, weighted[min_lag:max_lag + 1], linewidth=0.8, color='green')
best_bpm_pre = 60.0 * frame_rate / best_lag
ax.axvline(best_bpm_pre, color='red', linestyle='--', alpha=0.8,
           label=f'Best: {best_bpm_pre:.1f} BPM (lag={best_lag})')
ax.set_title('Weighted Autocorrelation (autocorr x prior)')
ax.set_xlabel('BPM')
ax.set_ylabel('Weighted Score')
ax.legend()
ax.invert_xaxis()

plt.tight_layout()
plt.show()

In [None]:
# --- Octave Correction ---
# Compute median weighted score as noise floor
sorted_scores = np.sort(weighted[min_lag:max_lag + 1])
median_weighted = sorted_scores[len(sorted_scores) // 2]

octave_steps = []  # Track corrections for visualization

while True:
    half_center = best_lag // 2
    search_lo = max(min_lag, half_center - 2)
    search_hi = min(max_lag, half_center + 2)

    best_half = -1
    best_half_score = -np.inf
    for lag in range(search_lo, search_hi + 1):
        if weighted[lag] > best_half_score:
            best_half_score = weighted[lag]
            best_half = lag

    if best_half < min_lag:
        break

    parent_score = weighted[best_lag]
    ratio = best_half_score / parent_score if parent_score > 0 else 0

    octave_steps.append({
        'parent_lag': best_lag,
        'parent_bpm': 60.0 * frame_rate / best_lag,
        'half_lag': best_half,
        'half_bpm': 60.0 * frame_rate / best_half,
        'ratio': ratio,
        'accepted': best_half_score > median_weighted and best_half_score > 0.5 * parent_score
    })

    if best_half_score > median_weighted and best_half_score > 0.5 * parent_score:
        best_lag = best_half
    else:
        break

# Half-tempo cap
candidate_bpm = 60.0 * frame_rate / best_lag
if candidate_bpm > 200.0:
    double_lag = best_lag * 2
    if double_lag <= max_lag:
        print(f'Half-tempo correction: {candidate_bpm:.1f} BPM -> {60.0 * frame_rate / double_lag:.1f} BPM')
        best_lag = double_lag

# Parabolic interpolation
def parabolic_interpolate(data, peak, lo, hi):
    if peak <= lo or peak >= hi:
        return float(peak)
    a = data[peak - 1]
    b = data[peak]
    c = data[peak + 1]
    denom = a - 2.0 * b + c
    if abs(denom) < 1e-12:
        return float(peak)
    delta = 0.5 * (a - c) / denom
    return float(peak) + delta

refined_lag = parabolic_interpolate(autocorr, best_lag, min_lag, max_lag)
estimated_bpm = 60.0 * frame_rate / refined_lag

print('Octave correction steps:')
if not octave_steps:
    print('  (half-lag outside search range, no correction attempted)')
for step in octave_steps:
    status = 'ACCEPTED' if step['accepted'] else 'REJECTED'
    print(f"  {step['parent_bpm']:.1f} BPM -> {step['half_bpm']:.1f} BPM "
          f"(ratio={step['ratio']:.3f}, {status})")

print(f'\nFinal lag: {best_lag} (refined: {refined_lag:.3f})')
print(f'Estimated BPM: {estimated_bpm:.1f}')

In [None]:
# Visualize octave correction
fig, ax = plt.subplots(figsize=(14, 4))
ax.plot(bpms, weighted[min_lag:max_lag + 1], linewidth=0.8, color='green', alpha=0.6)

colors = ['blue', 'orange', 'purple', 'brown']
for i, step in enumerate(octave_steps):
    color = colors[i % len(colors)]
    marker = 'o' if step['accepted'] else 'x'
    ax.axvline(step['parent_bpm'], color=color, linestyle=':', alpha=0.5)
    ax.plot(step['half_bpm'], weighted[step['half_lag']], marker=marker, markersize=10,
            color=color, label=f"Step {i+1}: {step['parent_bpm']:.0f}->{step['half_bpm']:.0f} "
            f"(ratio={step['ratio']:.2f}, {'OK' if step['accepted'] else 'STOP'})")

ax.axvline(estimated_bpm, color='red', linewidth=2, linestyle='--',
           label=f'Final: {estimated_bpm:.1f} BPM')
ax.axhline(median_weighted, color='gray', linestyle=':', alpha=0.5, label='Median (noise floor)')
ax.set_title('Octave Correction Visualization')
ax.set_xlabel('BPM')
ax.set_ylabel('Weighted Score')
ax.legend(fontsize=8)
ax.invert_xaxis()
plt.tight_layout()
plt.show()

In [None]:
# --- Collect top 5 candidate periods ---
peaks = sorted([(weighted[lag], lag) for lag in range(min_lag, max_lag + 1)], reverse=True)

candidate_periods = [best_lag]
for score, lag in peaks:
    if len(candidate_periods) >= 5:
        break
    if all(abs(lag - existing) >= 3 for existing in candidate_periods):
        candidate_periods.append(lag)

print('Candidate periods for beat tracking:')
for cp in candidate_periods:
    cp_bpm = 60.0 * frame_rate / cp
    print(f'  lag={cp} ({cp_bpm:.1f} BPM) weighted={weighted[cp]:.6f}')

# Plot candidates on weighted autocorrelation
fig, ax = plt.subplots(figsize=(14, 4))
ax.plot(bpms, weighted[min_lag:max_lag + 1], linewidth=0.8, color='green', alpha=0.6)
for i, cp in enumerate(candidate_periods):
    cp_bpm = 60.0 * frame_rate / cp
    color = 'red' if i == 0 else 'blue'
    ax.axvline(cp_bpm, color=color, linestyle='--', alpha=0.7)
    ax.plot(cp_bpm, weighted[cp], 'o', color=color, markersize=8,
            label=f'#{i+1}: {cp_bpm:.1f} BPM')
ax.set_title('Tempo Candidates on Weighted Autocorrelation')
ax.set_xlabel('BPM')
ax.set_ylabel('Weighted Score')
ax.legend(fontsize=8)
ax.invert_xaxis()
plt.tight_layout()
plt.show()

## 4. Beat Tracking (Dynamic Programming — Ellis 2007)

The beat tracker finds the globally optimal sequence of beat positions by maximizing:

$$\text{Score} = \sum_i \text{onset}[b_i] - \alpha \cdot \left(\ln\frac{b_i - b_{i-1}}{\tau}\right)^2$$

where $\tau$ is the expected beat period and $\alpha = 680$ controls tightness.

**Algorithm** (matches `src/beat_tracker.cpp`):
1. Forward DP pass: for each frame, find the best predecessor within $[0.5\tau, 2\tau]$
2. Backtrace from the best-scoring frame in the last 10% of the signal
3. Run for each candidate period, pick the one with the highest normalized score

In [None]:
# --- Beat Tracker (matching C++ exactly) ---
ALPHA = 680.0

def beat_track_dp(onset, period_frames, alpha=ALPHA):
    """Ellis 2007 DP beat tracker, matching src/beat_tracker.cpp."""
    n = len(onset)
    if period_frames <= 0 or n == 0:
        return [], 0.0, np.zeros(0)

    min_lag = max(1, round(period_frames * 0.5))
    max_lag = max(min_lag + 1, round(period_frames * 2.0))

    dp = np.full(n, -np.inf)
    prev = np.full(n, -1, dtype=int)

    for t in range(n):
        best_score = onset[t]
        best_prev = -1

        start = max(0, t - max_lag)
        end = max(0, t - min_lag)
        for p in range(start, end + 1):
            lag = t - p
            if lag <= 0:
                continue
            log_ratio = np.log(lag / period_frames)
            penalty = alpha * log_ratio ** 2
            score = dp[p] + onset[t] - penalty
            if score > best_score:
                best_score = score
                best_prev = p

        dp[t] = best_score
        prev[t] = best_prev

    # Backtrace from best in last 10%
    search_start = max(0, min(int(n * 0.9), n - 1))
    best_end = search_start + np.argmax(dp[search_start:])

    beat_frames = []
    idx = best_end
    while idx >= 0:
        beat_frames.append(idx)
        idx = prev[idx]
    beat_frames.reverse()

    beat_samples = [f * HOP_SIZE for f in beat_frames]
    return beat_samples, dp[best_end], dp

print(f'Running DP beat tracker with period={best_lag} frames...')

In [None]:
# Visualize the penalty function
fig, ax = plt.subplots(figsize=(10, 4))
deltas = np.linspace(best_lag * 0.3, best_lag * 2.5, 500)
penalties = ALPHA * np.log(deltas / best_lag) ** 2
ax.plot(deltas, penalties, linewidth=1.5, color='purple')
ax.axvline(best_lag, color='red', linestyle='--', label=f'Expected period ({best_lag} frames)')
ax.axvline(best_lag * 0.5, color='gray', linestyle=':', alpha=0.5, label='Search bounds')
ax.axvline(best_lag * 2.0, color='gray', linestyle=':', alpha=0.5)
ax.set_title(f'DP Penalty Function: alpha * (ln(delta/period))^2 (alpha={ALPHA})')
ax.set_xlabel('Inter-beat interval (frames)')
ax.set_ylabel('Penalty')
ax.legend()
plt.tight_layout()
plt.show()

In [None]:
# --- Multi-candidate evaluation (matching pipeline.cpp) ---
primary_bpm = estimated_bpm
PRIMARY_MARGIN = 1.05  # non-primary candidates must exceed primary's score by 5%
results = []
primary_norm_score = -np.inf

for cp in candidate_periods:
    cp_bpm = 60.0 * frame_rate / cp if cp > 0 else 0.0

    # Filter: only compare within +/-30% of primary BPM
    ratio = cp_bpm / primary_bpm
    if ratio < 0.7 or ratio > 1.3:
        print(f'  Candidate period={cp} ({cp_bpm:.1f} BPM) -- SKIPPED (outside +/-30%)')
        continue

    beats, score, dp_array = beat_track_dp(onset_strength, cp)
    norm_score = score / len(beats) if len(beats) > 0 else 0.0
    results.append({
        'period': cp,
        'bpm': cp_bpm,
        'beats': beats,
        'score': score,
        'norm_score': norm_score,
        'dp': dp_array,
        'is_primary': cp == best_lag
    })
    if cp == best_lag:
        primary_norm_score = norm_score
    print(f'  Candidate period={cp} ({cp_bpm:.1f} BPM) '
          f'score={score:.2f} beats={len(beats)} norm={norm_score:.4f}')

# Pick best by normalized score, requiring non-primary candidates to exceed
# the primary's score by a margin (sub-harmonics can achieve slightly inflated
# per-beat scores due to wider DP search windows).
best_result = None
best_beat_score = -np.inf
for r in results:
    threshold = best_beat_score
    if not r['is_primary'] and primary_norm_score > -np.inf:
        threshold = max(threshold, primary_norm_score * PRIMARY_MARGIN)
    if r['norm_score'] > threshold:
        best_beat_score = r['norm_score']
        best_result = r

final_beats = best_result['beats']
final_period = best_result['period']
final_bpm = 60.0 * frame_rate / final_period

print(f'\nWinner: period={final_period} ({final_bpm:.1f} BPM) '
      f'with {len(final_beats)} beats')
if KNOWN_BPM:
    error = abs(final_bpm - KNOWN_BPM) / KNOWN_BPM * 100
    print(f'Known BPM: {KNOWN_BPM}, Error: {error:.1f}%')

In [None]:
# Multi-candidate comparison with PRIMARY_MARGIN threshold
if len(results) > 1:
    fig, ax = plt.subplots(figsize=(10, 5))
    labels = [f"{r['bpm']:.1f} BPM\n(lag={r['period']})" for r in results]
    scores = [r['norm_score'] for r in results]

    # Color: winner=red, primary (if different)=green, rejected by margin=orange, others=steelblue
    colors = []
    for r in results:
        if r is best_result:
            colors.append('red')
        elif r['is_primary']:
            colors.append('green')
        elif (not r['is_primary'] and primary_norm_score > -np.inf
              and r['norm_score'] > primary_norm_score
              and r['norm_score'] <= primary_norm_score * PRIMARY_MARGIN):
            colors.append('orange')
        else:
            colors.append('steelblue')

    bars = ax.bar(labels, scores, color=colors, edgecolor='black', linewidth=0.5)

    # Draw the margin threshold line if primary was evaluated
    if primary_norm_score > -np.inf:
        margin_threshold = primary_norm_score * PRIMARY_MARGIN
        ax.axhline(margin_threshold, color='darkorange', linestyle='--', linewidth=1.5,
                   label=f'Override threshold ({PRIMARY_MARGIN:.0%} of primary = {margin_threshold:.4f})')
        ax.axhline(primary_norm_score, color='green', linestyle=':', linewidth=1,
                   label=f'Primary score = {primary_norm_score:.4f}')

    # Annotate bars with score values
    for bar, score in zip(bars, scores):
        ax.text(bar.get_x() + bar.get_width() / 2, bar.get_height() + 0.02,
                f'{score:.4f}', ha='center', va='bottom', fontsize=9)

    # Build legend entries for the color coding
    from matplotlib.patches import Patch
    legend_handles = [
        Patch(facecolor='red', edgecolor='black', label='Winner'),
    ]
    if any(r['is_primary'] and r is not best_result for r in results):
        legend_handles.append(Patch(facecolor='green', edgecolor='black', label='Primary (tempo estimator)'))
    if any(c == 'orange' for c in colors):
        legend_handles.append(Patch(facecolor='orange', edgecolor='black',
                                    label=f'Rejected (within {PRIMARY_MARGIN:.0%} margin)'))
    if any(c == 'steelblue' for c in colors):
        legend_handles.append(Patch(facecolor='steelblue', edgecolor='black', label='Other candidates'))
    # Add line legend entries
    legend_handles.extend(ax.get_legend_handles_labels()[0])
    ax.legend(handles=legend_handles, fontsize=8, loc='upper right')

    ax.set_title('Multi-Candidate Comparison (Normalized DP Score)')
    ax.set_ylabel('Score / Beat Count')
    plt.tight_layout()
    plt.show()

In [None]:
# Plot DP score array
fig, ax = plt.subplots(figsize=(14, 3))
ax.plot(onset_times, best_result['dp'], linewidth=0.5, color='purple')
ax.set_title('DP Cumulative Score Over Time')
ax.set_xlabel('Time (s)')
ax.set_ylabel('DP Score')
plt.tight_layout()
plt.show()

In [None]:
# Plot onset strength with detected beat positions
beat_times = np.array(final_beats) / sr

fig, ax = plt.subplots(figsize=(14, 4))
ax.plot(onset_times, onset_strength, linewidth=0.5, color='steelblue', label='Onset strength')
for bt in beat_times:
    ax.axvline(bt, color='red', alpha=0.4, linewidth=0.5)
# Add one labeled line for the legend
ax.axvline(beat_times[0], color='red', alpha=0.4, linewidth=0.5, label=f'Beats ({len(final_beats)})')
ax.set_title(f'Detected Beats on Onset Strength ({final_bpm:.1f} BPM)')
ax.set_xlabel('Time (s)')
ax.set_ylabel('Onset Strength')
ax.legend()
plt.tight_layout()
plt.show()

# Zoomed view (first 10 seconds)
fig, ax = plt.subplots(figsize=(14, 4))
mask_onset = onset_times < 10
mask_beats = beat_times < 10
ax.plot(onset_times[mask_onset], onset_strength[mask_onset], linewidth=0.8, color='steelblue')
for bt in beat_times[mask_beats]:
    ax.axvline(bt, color='red', alpha=0.6, linewidth=1)
ax.set_title('Zoomed: First 10 seconds')
ax.set_xlabel('Time (s)')
ax.set_ylabel('Onset Strength')
plt.tight_layout()
plt.show()

## 5. Meter Detection (Time Signature)

The meter detector analyzes accent patterns in the detected beats to identify the time signature.

**Algorithm** (matches `src/meter_detector.cpp`):
1. Collect onset strength at each beat position
2. Test groupings of 2, 3, and 4 beats at all phase offsets
3. Compute **accent contrast**: how much stronger the proposed downbeat is vs other beats, normalized by stddev
4. Compute **beat-level autocorrelation** at each grouping lag for periodicity confirmation
5. Combined score: $0.7 \times \text{accent\_contrast} + 0.3 \times \text{autocorr}$
6. **2/4 vs 4/4 disambiguation**: prefer 4/4 when a meaningful 4-beat accent hierarchy exists
7. **6/8 detection**: check for compound (ternary) subdivision between beats
8. Low-confidence fallback to 4/4 (the most common time signature)

In [None]:
# --- Meter Detection (matching C++ exactly) ---
ACCENT_WEIGHT = 0.7
AUTOCORR_WEIGHT = 0.3

num_beats = len(final_beats)
onset_len = len(onset_strength)

# Collect onset strengths at beat positions
onset_at_beat = np.zeros(num_beats)
for i, bs in enumerate(final_beats):
    frame = bs // HOP_SIZE
    onset_at_beat[i] = onset_strength[frame] if 0 <= frame < onset_len else 0.0

def accent_score(onset_at_beat, grouping, phase):
    """Accent contrast: how much position 0 stands out as the downbeat."""
    n = len(onset_at_beat)
    if n < grouping:
        return 0.0
    position_sum = np.zeros(grouping)
    position_count = np.zeros(grouping, dtype=int)
    for i in range(n):
        pos = (i - phase) % grouping
        position_sum[pos] += onset_at_beat[i]
        position_count[pos] += 1
    if position_count[0] == 0:
        return 0.0
    downbeat_mean = position_sum[0] / position_count[0]
    other_sum = np.sum(position_sum[1:])
    other_count = np.sum(position_count[1:])
    if other_count == 0:
        return 0.0
    other_mean = other_sum / other_count
    stddev = np.std(onset_at_beat)
    return (downbeat_mean - other_mean) / (stddev + 1e-6)

def beat_autocorrelation(onset_at_beat, lag):
    """Normalized autocorrelation of beat-level onsets at given lag."""
    n = len(onset_at_beat)
    if lag <= 0 or lag >= n:
        return 0.0
    r0 = np.sum(onset_at_beat ** 2)
    if r0 < 1e-12:
        return 0.0
    r_lag = np.sum(onset_at_beat[:n - lag] * onset_at_beat[lag:])
    scale = n / (n - lag)
    return (r_lag * scale) / r0

def check_compound_subdivision(beat_samples, onset_strength, hop_size):
    """Check if subdivision between beats is ternary (compound -> 6/8).

    Both ternary and binary averages are z-score normalized.  When the
    ternary average is non-positive, there is no elevated energy at
    ternary positions, so default to binary (not compound).  When both
    are positive, require a 10% margin to avoid borderline false positives.
    """
    n = len(beat_samples)
    if n < 4:
        return False
    onset_len = len(onset_strength)
    ternary_total = 0.0
    binary_total = 0.0
    count = 0
    for i in range(n - 1):
        start = beat_samples[i]
        end = beat_samples[i + 1]
        span = end - start
        if span <= 0:
            continue
        frame_t1 = round((start + span / 3) / hop_size)
        frame_t2 = round((start + 2 * span / 3) / hop_size)
        frame_b = round((start + span / 2) / hop_size)
        if not (0 <= frame_t1 < onset_len and 0 <= frame_t2 < onset_len and 0 <= frame_b < onset_len):
            continue
        t_strength = (onset_strength[frame_t1] + onset_strength[frame_t2]) / 2
        b_strength = onset_strength[frame_b]
        ternary_total += t_strength
        binary_total += b_strength
        count += 1
    if count < 4:
        return False
    ternary_avg = ternary_total / count
    binary_avg = binary_total / count
    if ternary_avg <= 0.0:
        return False
    if binary_avg <= 0.0:
        return True  # ternary positive, binary not
    return ternary_avg > 1.1 * binary_avg

# Test candidate groupings
best_grouping = 4
best_phase = 0
best_score = -1e9
best_accent = 0.0
all_results = {}

if num_beats >= 8:
    for g in [2, 3, 4]:
        autocorr_g = beat_autocorrelation(onset_at_beat, g)
        for phi in range(g):
            accent = accent_score(onset_at_beat, g, phi)
            score = ACCENT_WEIGHT * accent + AUTOCORR_WEIGHT * autocorr_g
            all_results[(g, phi)] = {
                'accent': accent, 'autocorr': autocorr_g, 'score': score
            }
            if score > best_score:
                best_score = score
                best_grouping = g
                best_phase = phi
                best_accent = accent

    # 2/4 vs 4/4 disambiguation
    if best_grouping == 2:
        autocorr4 = beat_autocorrelation(onset_at_beat, 4)
        best4_accent = -1e9
        best4_phase = 0
        for phi in range(4):
            a = accent_score(onset_at_beat, 4, phi)
            if a > best4_accent:
                best4_accent = a
                best4_phase = phi
        score4 = ACCENT_WEIGHT * best4_accent + AUTOCORR_WEIGHT * autocorr4
        if best4_accent > 0.1 or score4 > best_score * 0.8:
            print(f'Preferring 4/4 over 2/4 (4-beat accent={best4_accent:.4f}, score={score4:.4f})')
            best_grouping = 4
            best_phase = best4_phase
            best_accent = best4_accent
            best_score = score4

confidence = max(0.0, min(1.0, best_accent / 2.0))

# Low-confidence fallback: only fall back to 4/4 when the winning non-4/4
# grouping doesn't clearly outperform grouping=4 (by >10%).
if confidence < 0.15 and best_grouping != 4:
    best4_fallback_score = -1e9
    best4_fallback_phase = 0
    autocorr4 = beat_autocorrelation(onset_at_beat, 4)
    for phi in range(4):
        a = accent_score(onset_at_beat, 4, phi)
        s = ACCENT_WEIGHT * a + AUTOCORR_WEIGHT * autocorr4
        if s > best4_fallback_score:
            best4_fallback_score = s
            best4_fallback_phase = phi
    if best_score < best4_fallback_score * 1.1:
        print(f'Low confidence ({confidence:.4f}), falling back to 4/4 '
              f'(winner={best_score:.4f} vs 4/4={best4_fallback_score:.4f})')
        best_grouping = 4
        best_phase = best4_fallback_phase
    else:
        print(f'Low confidence ({confidence:.4f}) but winner clearly beats 4/4 '
              f'(score={best_score:.4f} vs 4/4={best4_fallback_score:.4f}), '
              f'keeping {best_grouping}-grouping')

# Map grouping to time signature
ts_map = {2: '2/4', 3: '3/4', 4: '4/4'}
detected_ts = ts_map.get(best_grouping, '4/4')
beats_per_measure = best_grouping

# 6/8 check for grouping=2: compound subdivision
if best_grouping == 2:
    if check_compound_subdivision(final_beats, onset_strength, HOP_SIZE):
        detected_ts = '6/8'
        print('Compound subdivision detected -> 6/8')

# 6/8 check for grouping=3: compound subdivision only (no BPM heuristic —
# fast waltzes at >160 BPM are legitimate 3/4).
# A full 6/8 measure = 2 groups of 3 beats = 6 beats.
if best_grouping == 3:
    if check_compound_subdivision(final_beats, onset_strength, HOP_SIZE):
        detected_ts = '6/8'
        beats_per_measure = 6
        print('Compound subdivision detected (3/4 -> 6/8)')

# Extract downbeat positions
downbeat_samples = [final_beats[i] for i in range(best_phase, num_beats, beats_per_measure)]
downbeat_times = np.array(downbeat_samples) / sr

print(f'\nDetected time signature: {detected_ts}')
print(f'Beats per measure: {beats_per_measure}')
print(f'Downbeat phase: {best_phase}')
print(f'Confidence: {confidence:.4f}')
print(f'Downbeats: {len(downbeat_samples)}')

In [None]:
# Visualize meter detection results
fig, axes = plt.subplots(2, 1, figsize=(14, 8))

# 1. Accent contrast per grouping/phase
ax = axes[0]
if all_results:
    labels = [f'{g}/{phi}' for (g, phi) in sorted(all_results.keys())]
    accents = [all_results[k]['accent'] for k in sorted(all_results.keys())]
    scores = [all_results[k]['score'] for k in sorted(all_results.keys())]
    x = np.arange(len(labels))
    w = 0.35
    bars1 = ax.bar(x - w/2, accents, w, label='Accent contrast', color='steelblue')
    bars2 = ax.bar(x + w/2, scores, w, label='Combined score', color='coral')
    # Highlight winner
    win_key = (best_grouping, best_phase)
    if win_key in all_results:
        win_idx = sorted(all_results.keys()).index(win_key)
        ax.bar(x[win_idx] - w/2, accents[win_idx], w, color='darkblue', edgecolor='gold', linewidth=2)
        ax.bar(x[win_idx] + w/2, scores[win_idx], w, color='darkred', edgecolor='gold', linewidth=2)
    ax.set_xticks(x)
    ax.set_xticklabels([f'G={g}, P={phi}' for (g, phi) in sorted(all_results.keys())], rotation=45, fontsize=8)
    ax.set_ylabel('Score')
    ax.set_title(f'Meter Detection: Accent Analysis (Winner: {detected_ts}, confidence={confidence:.2f})')
    ax.legend()

# 2. Onset strength with downbeat markers
ax = axes[1]
ax.plot(onset_times, onset_strength, linewidth=0.5, color='steelblue', alpha=0.7)
for bt in beat_times:
    ax.axvline(bt, color='lightcoral', alpha=0.3, linewidth=0.5)
for dt in downbeat_times:
    ax.axvline(dt, color='red', alpha=0.7, linewidth=1.5)
ax.axvline(beat_times[0], color='lightcoral', alpha=0.3, linewidth=0.5, label='Beats')
ax.axvline(downbeat_times[0], color='red', alpha=0.7, linewidth=1.5, label='Downbeats')
ax.set_title(f'Beats and Downbeats ({detected_ts})')
ax.set_xlabel('Time (s)')
ax.set_ylabel('Onset Strength')
ax.set_xlim(0, min(15, onset_times[-1]))
ax.legend()

plt.tight_layout()
plt.show()

## 6. Metronome Click & Final Result

The metronome synthesizes a short click sound (20ms exponentially-decaying sine) and overlays it at each beat position. **Downbeats get a higher-pitched click** (1500 Hz) to distinguish them from regular beats (1000 Hz).

**Click waveform** (matches `src/metronome.cpp`):
$$\text{click}(t) = V \cdot \sin(2\pi f t) \cdot e^{-\delta t}$$
where $V=0.5$, $\delta=200$, duration = 20 ms. $f=1000$ Hz for normal beats, $f=1500$ Hz for downbeats.

In [None]:
# --- Metronome Click Synthesis (matching C++ exactly) ---
CLICK_FREQ = 1000.0   # Hz (normal beats)
DOWNBEAT_FREQ = 1500.0  # Hz (downbeats)
CLICK_DURATION = 0.02  # seconds
CLICK_DECAY = 200.0
CLICK_VOLUME = 0.5

click_length = max(1, round(CLICK_DURATION * sr))
t_click = np.arange(click_length) / sr
click_normal = CLICK_VOLUME * np.sin(2 * np.pi * CLICK_FREQ * t_click) * np.exp(-CLICK_DECAY * t_click)
click_downbeat = CLICK_VOLUME * np.sin(2 * np.pi * DOWNBEAT_FREQ * t_click) * np.exp(-CLICK_DECAY * t_click)

# Plot both click waveforms
fig, axes = plt.subplots(1, 2, figsize=(14, 3))
axes[0].plot(t_click * 1000, click_normal, linewidth=1.5, color='darkred')
axes[0].set_title(f'Normal Click ({CLICK_FREQ} Hz)')
axes[0].set_xlabel('Time (ms)')
axes[0].set_ylabel('Amplitude')
axes[0].axhline(0, color='gray', linewidth=0.5)

axes[1].plot(t_click * 1000, click_downbeat, linewidth=1.5, color='darkblue')
axes[1].set_title(f'Downbeat Click ({DOWNBEAT_FREQ} Hz)')
axes[1].set_xlabel('Time (ms)')
axes[1].set_ylabel('Amplitude')
axes[1].axhline(0, color='gray', linewidth=0.5)

plt.tight_layout()
plt.show()

In [None]:
# Final waveform with beat markers
fig, ax = plt.subplots(figsize=(14, 4))
ax.plot(time_axis, y_mono, linewidth=0.2, color='steelblue')
for bt in beat_times:
    ax.axvline(bt, color='red', alpha=0.3, linewidth=0.5)
ax.set_title(f'Waveform with Beat Positions ({final_bpm:.1f} BPM, {len(final_beats)} beats)')
ax.set_xlabel('Time (s)')
ax.set_ylabel('Amplitude')
plt.tight_layout()
plt.show()

# Zoomed view showing click alignment
fig, ax = plt.subplots(figsize=(14, 4))
t_start, t_end = 5.0, 8.0  # 3 second window
mask = (time_axis >= t_start) & (time_axis <= t_end)
ax.plot(time_axis[mask], y_mono[mask], linewidth=0.5, color='steelblue', label='Audio')
for bt in beat_times[(beat_times >= t_start) & (beat_times <= t_end)]:
    ax.axvline(bt, color='red', alpha=0.7, linewidth=1.5)
ax.set_title(f'Zoomed: {t_start}-{t_end}s with beat markers')
ax.set_xlabel('Time (s)')
ax.set_ylabel('Amplitude')
plt.tight_layout()
plt.show()

In [None]:
# --- Mix click track into stereo audio and play ---
from IPython.display import Audio, display
import scipy.io.wavfile as wavfile

# Copy stereo audio and overlay clicks with downbeat differentiation
y_with_clicks = y_stereo.copy()
n_ch, n_samples = y_with_clicks.shape

# Two-pointer scan for downbeat detection (matching C++ metronome overlay)
downbeat_set = set(downbeat_samples)

for beat_sample in final_beats:
    click = click_downbeat if beat_sample in downbeat_set else click_normal
    for i in range(len(click)):
        idx = beat_sample + i
        if idx >= n_samples:
            break
        for ch in range(n_ch):
            y_with_clicks[ch, idx] += click[i]

# Clamp to [-1, 1]
y_with_clicks = np.clip(y_with_clicks, -1.0, 1.0)

# Write full WAV files to disk
def write_wav(filename, audio, sample_rate):
    """Write float audio (channels x samples) to 16-bit WAV."""
    int16 = np.clip(audio.T, -1.0, 1.0)  # (samples, channels)
    int16 = (int16 * 32767).astype(np.int16)
    wavfile.write(filename, sample_rate, int16)

write_wav('output_original.wav', y_stereo, sr)
write_wav('output_with_clicks.wav', y_with_clicks, sr)
print(f'Saved full audio to: output_original.wav, output_with_clicks.wav')

# Inline playback: use a 30-second preview to avoid huge base64 embedding
PREVIEW_SEC = 30
preview_samples = int(PREVIEW_SEC * sr)
# Start preview from the first beat (skip silence/intro)
preview_start = final_beats[0] if len(final_beats) > 0 else 0
preview_end = min(preview_start + preview_samples, n_samples)

preview_original = y_stereo[:, preview_start:preview_end]
preview_clicks = y_with_clicks[:, preview_start:preview_end]

preview_t = preview_start / sr
print(f'\nInline preview: {PREVIEW_SEC}s starting at {preview_t:.1f}s (from first beat)')
print(f'For full audio, open the WAV files saved above.\n')

print('Original audio (preview):')
display(Audio(preview_original, rate=sr))
print('With click track (preview):')
display(Audio(preview_clicks, rate=sr))

In [None]:
# --- Download full WAV files ---
from IPython.display import FileLink, display

try:
    from google.colab import files as colab_files
    _on_colab = True
except ImportError:
    _on_colab = False

if _on_colab:
    print('Downloading files via Colab...')
    colab_files.download('output_original.wav')
    colab_files.download('output_with_clicks.wav')
else:
    print('Download full WAV files:')
    display(FileLink('output_original.wav', result_html_prefix='Original: '))
    display(FileLink('output_with_clicks.wav', result_html_prefix='With click track: '))

## 7. Full Pipeline Summary

In [None]:
# Summary panel: 4 key plots
fig, axes = plt.subplots(4, 1, figsize=(14, 14))

# 1. Waveform
ax = axes[0]
ax.plot(time_axis, y_mono, linewidth=0.2, color='steelblue')
ax.set_title('1. Audio Waveform (mono)')
ax.set_ylabel('Amplitude')

# 2. Onset strength
ax = axes[1]
ax.plot(onset_times, onset_strength, linewidth=0.5, color='red')
ax.set_title('2. Onset Strength (mel-frequency spectral flux, normalized)')
ax.set_ylabel('Strength')

# 3. Weighted autocorrelation
ax = axes[2]
ax.plot(bpms, weighted[min_lag:max_lag + 1], linewidth=0.8, color='green')
ax.axvline(final_bpm, color='red', linestyle='--', linewidth=1.5,
           label=f'{final_bpm:.1f} BPM')
ax.set_title('3. Tempo Estimation (weighted autocorrelation)')
ax.set_xlabel('BPM')
ax.set_ylabel('Score')
ax.invert_xaxis()
ax.legend()

# 4. Beats on waveform
ax = axes[3]
ax.plot(time_axis, y_mono, linewidth=0.2, color='lightgray')
for bt in beat_times:
    ax.axvline(bt, color='red', alpha=0.3, linewidth=0.5)
ax.set_title(f'4. Beat Tracking ({len(final_beats)} beats at {final_bpm:.1f} BPM)')
ax.set_xlabel('Time (s)')
ax.set_ylabel('Amplitude')

plt.tight_layout()
plt.show()

In [None]:
# Print summary
print('=' * 50)
print('BPM DETECTION SUMMARY')
print('=' * 50)
print(f'Audio: {AUDIO_PATH}')
print(f'Duration: {duration:.2f} sec')
print(f'Sample rate: {sr} Hz')
print(f'Onset frames: {len(onset_strength)}')
print(f'Frame rate: {frame_rate:.2f} fps')
print(f'---')
print(f'Detected BPM: {final_bpm:.1f}')
print(f'Time signature: {detected_ts} (confidence: {confidence:.2f})')
print(f'Beat count: {len(final_beats)}')
print(f'Downbeats: {len(downbeat_samples)}')
print(f'Winning period: {final_period} frames')
if KNOWN_BPM:
    error = abs(final_bpm - KNOWN_BPM) / KNOWN_BPM * 100
    print(f'Known BPM: {KNOWN_BPM}')
    print(f'Error: {error:.1f}%')
print('=' * 50)