In [2]:
import random
from typing import Tuple

import librosa
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import seaborn as sns
from scipy.ndimage import gaussian_filter1d
from sklearn.decomposition import NMF
from sklearn.exceptions import ConvergenceWarning
from sklearn.preprocessing import MinMaxScaler
from tqdm.notebook import tqdm
import warnings

# Suppress specific warnings from ConvergenceWarning
warnings.filterwarnings("ignore", category=ConvergenceWarning)

In [3]:
def create_beat_grid(beats: np.ndarray, tempo: float, sr: int, hop_length: int, duration: float, unit: str = 'time') -> np.ndarray:
    """Generate beat grid within the duration of a song. Returns beat grid in time units"""
    first_beat_time = librosa.frames_to_time(beats[0], sr=sr, hop_length=hop_length)
    seconds_per_beat = 60.0 / tempo
    num_beats_forward = int((duration - first_beat_time) / seconds_per_beat)
    num_beats_backward = int(first_beat_time / seconds_per_beat) + 1
    beat_times_forward = first_beat_time + np.arange(num_beats_forward) * seconds_per_beat
    beat_times_backward = first_beat_time - np.arange(1, num_beats_backward) * seconds_per_beat
    beat_grid = np.concatenate((np.array([0.0]), beat_times_backward[::-1], beat_times_forward))
    if unit == 'frames':
        beat_grid = librosa.time_to_frames(beat_grid, sr=sr, hop_length=hop_length)
    return beat_grid

def beat_sync(y: np.ndarray, feature: np.ndarray, sr: int, hop_length: int, time_signature: int = 4) -> Tuple[np.ndarray, np.ndarray]:
    """Sync the features to a beat grid."""
    onset_env = librosa.onset.onset_strength(y=y, sr=sr, hop_length=hop_length)
    duration = librosa.get_duration(y=y, sr=sr)
    tempo, beats = librosa.beat.beat_track(onset_envelope=onset_env, sr=sr, hop_length=hop_length)
    beat_grid = create_beat_grid(beats, sr=sr, hop_length=hop_length, tempo=tempo, duration=duration)
    beat_frames = librosa.util.fix_frames(librosa.time_to_frames(beat_grid, sr=sr, hop_length=hop_length))
    feature_beat_synced = librosa.util.sync(feature, librosa.time_to_frames(beat_grid, sr=sr, hop_length=hop_length), aggregate=np.mean)
    print(f"Audio features beat-synchronized. Tempo: {tempo:.2f} BPM.")
    return feature_beat_synced, beat_frames

def find_optimal_n_components(X, random_state=None, plot_reconstruction_error=False):
    """
    Find the optimal number of components for NMF by minimizing the reconstruction error with a penalty term.
    Plots the reconstruction error and the score (reconstruction error + penalty) 
    over different values of n_components.
    """
    n_features = X.shape[1]
    max_components = n_features
    reconstruction_errors = []
    scores = []
    
    # Calculate the penalty weight
    nmf = NMF(n_components=1, random_state=random_state, max_iter=5000, init='nndsvd')
    W = nmf.fit_transform(X)
    H = nmf.components_
    X_approx = W @ H
    reconstruction_error_1 = np.sum((X - X_approx) ** 2)
    penalty_weight = np.abs(reconstruction_error_1 / max_components)
    
    for n_components in range(1, max_components + 1):
        nmf = NMF(n_components=n_components, random_state=random_state, max_iter=5000, init='nndsvd')
        W = nmf.fit_transform(X)
        H = nmf.components_
        X_approx = W @ H
        reconstruction_error = np.sum((X - X_approx) ** 2)
        penalty = n_components * penalty_weight
        score = reconstruction_error + penalty
        reconstruction_errors.append(reconstruction_error)
        scores.append(score)
        
    # Plot reconstruction error and score over n_components
    if plot_reconstruction_error == True:
        fig, (ax1, ax2) = plt.subplots(2, 1, figsize=(10, 8))
        
        ax1.plot(range(1, max_components + 1), reconstruction_errors)
        ax1.set_xlabel('Number of Components')
        ax1.set_ylabel('Reconstruction Error')
        ax1.set_title('Reconstruction Error vs. Number of Components')
        
        ax2.plot(range(1, max_components + 1), scores)
        ax2.set_xlabel('Number of Components')
        ax2.set_ylabel('Score (Reconstruction Error + Penalty)')
        ax2.set_title('Score vs. Number of Components')
        
        plt.tight_layout()
        plt.show()
    
    # Find the optimal number of components
    optimal_n_components = np.argmin(scores) + 1
    # print(f"Optimal number of components: {optimal_n_components}")
    
    return optimal_n_components

def process_bound_times(bounds, beat_grid_times, duration):
    """
    Process the bounds indices and convert them to times using beat_grid_times.
    Include 0.0 as the starting time and the song duration as the ending time.
    Remove the second bound if the first bound is 0 and the difference between the first and second bounds is <= 7.
    If the difference between the last and second-to-last bound times is <= 8, merge the last bound with the second-to-last bound.
    """
    # Remove the second bound if the first bound is 0 and the difference between the first and second bounds is <= 7
    if bounds[0] == 0 and bounds[1] - bounds[0] <= 7:
        bounds = bounds[:1] + bounds[2:]

    # Convert bounds indices to times
    bound_times = [beat_grid_times[idx] for idx in bounds]

    # Include 0.0 as the starting time and the song duration as the ending time
    if bound_times[0] != 0:
        bound_times = [0.0] + bound_times + [duration]
    else:
        bound_times = bound_times + [duration]

    # Merge the last bound with the second-to-last bound if the difference between them is <= 8
    if len(bound_times) > 2 and bound_times[-1] - bound_times[-2] <= 8:
        bound_times = bound_times[:-2] + bound_times[-1:]

    return bound_times

In [5]:
# Constants
hop_length = 128 
sr = 12000
df = pd.read_csv('../data/clean_labeled.csv') 
segment_data = []

progress_bar = tqdm(df["SongID"].unique(), desc="Processing songs", unit="song", leave=True)
for song_id in progress_bar:
    # Load audio and extract features
    audio_file = f'../data/audio/{song_id}.mp3'
    y, _ = librosa.load(audio_file, sr=sr)
    y_harm, y_perc = librosa.effects.hpss(y)
    onset_env = librosa.onset.onset_strength(y=y_perc, sr=sr, hop_length=hop_length)
    duration = librosa.get_duration(y=y_perc, sr=sr)
    tempo, beats = librosa.beat.beat_track(onset_envelope=onset_env, sr=sr, hop_length=hop_length)
    beat_grid_frames = create_beat_grid(beats, sr=sr, hop_length=hop_length, tempo=tempo, duration=duration, unit='frames')
    beat_grid_times = create_beat_grid(beats, sr=sr, hop_length=hop_length, tempo=tempo, duration=duration, unit='time')

    # Extract segmentation bounds for normal chromagram
    chromagram = librosa.feature.chroma_cqt(y=y_harm, sr=sr, hop_length=hop_length, bins_per_octave=24)
    chroma_sync = librosa.util.sync(chromagram, beat_grid_frames, aggregate=np.mean)
    chroma_sync = MinMaxScaler().fit_transform(chroma_sync.T).T
    optimal_n_components = find_optimal_n_components(chroma_sync.T, plot_reconstruction_error=False)
    nmf = NMF(n_components=optimal_n_components, max_iter=5000, init='nndsvd')
    chroma_acts = nmf.fit_transform(chroma_sync.T).T
    chroma_acts = (chroma_acts - chroma_acts.min(axis=1, keepdims=True)) / (chroma_acts.max(axis=1, keepdims=True) - chroma_acts.min(axis=1, keepdims=True))
    bounds_chroma_acts = librosa.segment.agglomerative(chroma_acts, 9)

    # Extract bound times
    bound_times_chroma_acts = process_bound_times(bounds_chroma_acts, beat_grid_times, duration)

    # Create segments
    for segment_num, (start, end) in enumerate(zip(bound_times_chroma_acts[:-1], bound_times_chroma_acts[1:]), start=1):
        start_beat_idx = np.argmin(np.abs(beat_grid_times - start))
        end_beat_idx = np.argmin(np.abs(beat_grid_times - end))
        segment_data.append({
            'SongID': song_id,
            'segment_num': segment_num,
            'segment_start': start,
            'segment_end': end,
            'duration': end - start,
            'start_beat_idx': start_beat_idx,
            'end_beat_idx': end_beat_idx,
            'num_beats': end_beat_idx - start_beat_idx,
            'num_components': optimal_n_components,
            'seg_type': 'chroma_acts'
        })

df_segments = pd.DataFrame(segment_data)
df_segments.to_csv('../data/agglom_chroma_results.csv', index=False)

Processing songs:   0%|          | 0/332 [00:00<?, ?song/s]