In [22]:
import librosa
import numpy as np
from skimage.feature import local_binary_pattern, hog
from src.music_recommender.config import Config
from pathlib import Path
from typing import Tuple, Dict, Any
from sklearn.model_selection import RandomizedSearchCV, GridSearchCV, train_test_split
from xgboost import XGBRegressor
from sklearn.svm import SVR
from sklearn.ensemble import RandomForestRegressor
from sklearn.pipeline import Pipeline
from sklearn.base import BaseEstimator, TransformerMixin
import pandas as pd
import librosa.display
import matplotlib.pyplot as plt
import joblib
from tqdm import tqdm
from sklearn.metrics import mean_squared_error, r2_score
from loguru import logger
from scipy.signal import find_peaks
import hashlib


pd.set_option("display.max_columns", None)
pd.set_option("display.max_colwidth", None)

In [23]:
cfg = Config()
asp = cfg.paths.processed / "audio"
prc = cfg.paths.processed
intr = cfg.paths.interim

In [24]:
class SpectrogramExtractor(BaseEstimator, TransformerMixin):
    def __init__(
        self,
        new_sr: int = 22050,
        new_ch: int = 1,
        n_fft: int = 2048,
        hop_length: int = 512,
        n_mels: int = 40,
        target_duration: float = 30.0,
        cache_dir: Path = intr / "spectrogram_cache",
        save_cache: bool = False,
    ) -> None:
        super().__init__()
        self.new_ch = new_ch
        self.new_sr = new_sr
        self.n_fft = n_fft
        self.hop_length = hop_length
        self.n_mels = n_mels
        self.target_duration = target_duration
        self.cache_dir = cache_dir
        self.save_cache = save_cache
        cache_dir.mkdir(parents=True, exist_ok=True)

    @staticmethod
    def _resample(
        aud_sr: Tuple[np.ndarray, float], new_sr: int
    ) -> Tuple[np.ndarray, float]:
        aud, sr = aud_sr
        if sr == new_sr:
            return aud, sr
        if aud.ndim == 1:
            res_aud = librosa.resample(aud, orig_sr=sr, target_sr=new_sr)
        else:
            res_aud = np.stack(
                [
                    librosa.resample(channel, orig_sr=sr, target_sr=new_sr)
                    for channel in aud
                ]
            )
        return res_aud, new_sr

    @staticmethod
    def _rechannel(
        aud_sr: Tuple[np.ndarray, float], new_ch: int
    ) -> Tuple[np.ndarray, float]:
        aud, sr = aud_sr
        n_ch = 1 if aud.ndim == 1 else aud.shape[0]
        if n_ch == new_ch:
            return aud_sr
        if new_ch == 1:
            res_aud = np.mean(aud, axis=0, keepdims=True)
            return res_aud, sr
        if new_ch == 2:
            res_aud = np.stack([aud, aud])
            return res_aud, sr
        else:
            raise ValueError(f"Unsupported number of channels: {new_ch}")

    @staticmethod
    def _pad_or_truncate(
        aud_sr: Tuple[np.ndarray, float], target_duration: float
    ) -> Tuple[np.ndarray, float]:
        aud, sr = aud_sr

        target_samples = int(sr * target_duration)
        current_samples = aud.shape[-1] if aud.ndim > 1 else len(aud)

        if current_samples > target_samples:
            if aud.ndim > 1:
                return aud[:, :target_samples], sr
            else:
                return aud[:target_samples], sr
        elif current_samples < target_samples:
            pad_samples = target_samples - current_samples
            if aud.ndim > 1:
                pad_width = ((0, 0), (0, pad_samples))
            else:
                pad_width = (0, pad_samples)
            return np.pad(aud, pad_width, mode="constant", constant_values=0), sr
        else:
            return aud, sr

    def _get_cache_path(self, audio_path: Path) -> Path:
        params_hash = f"{self.n_fft}_{self.hop_length}_{self.n_mels}_{self.new_sr}_{self.new_ch}_{self.target_duration}"
        return self.cache_dir / f"{audio_path.stem}_{params_hash}.npz"

    def _load_spectr(
        self,
        audio_path: Path,
        n_fft: int = 2048,
        hop_length: int = 512,
        n_mels: int = 40,
    ) -> Dict[str, Any]:
        if self.save_cache:
            cache_path = self._get_cache_path(audio_path)
            if cache_path.exists():
                cached = np.load(cache_path)
                return {
                    "mel_spectrogram": cached["mel_spectrogram"],
                    "stft_spectrogram": cached["stft_spectrogram"],
                    "audio": cached["audio"],
                    "sr": float(cached["sr"]),
                    "path": audio_path,
                }
        try:
            aud_sr = librosa.load(audio_path)
            aud_sr = self._pad_or_truncate(
                aud_sr=aud_sr, target_duration=self.target_duration
            )
            aud_sr = self._rechannel(aud_sr=aud_sr, new_ch=self.new_ch)
            aud_sr = self._resample(aud_sr=aud_sr, new_sr=self.new_sr)
            aud, sr = aud_sr
            
            # Compute STFT for features that need it
            stft = librosa.stft(aud, n_fft=n_fft, hop_length=hop_length)
            stft_mag = np.abs(stft)
            
            # Compute mel spectrogram
            mel_spect = librosa.feature.melspectrogram(
                y=aud, sr=sr, n_fft=n_fft, hop_length=hop_length, n_mels=n_mels
            )
            mel_spect_db = librosa.power_to_db(mel_spect, ref=np.max)

            if self.save_cache:
                cache_path = self._get_cache_path(audio_path)
                np.savez_compressed(
                    cache_path, 
                    mel_spectrogram=mel_spect_db, 
                    stft_spectrogram=stft_mag,
                    audio=aud, 
                    sr=sr
                )

            return {
                "mel_spectrogram": mel_spect_db,
                "stft_spectrogram": stft_mag,
                "audio": aud,
                "sr": sr,
                "path": audio_path
            }
        except Exception as e:
            raise ValueError(f"Error loading audio from {audio_path}: {e}")

    def fit(self, X, y=None):
        return self

    def transform(self, X) -> np.ndarray:
        results = []
        for path in tqdm(X, desc="Loading spectrograms"):
            result = self._load_spectr(
                path,
                n_fft=self.n_fft,
                hop_length=self.hop_length,
                n_mels=self.n_mels,
            )
            results.append(result)
        return np.array(results, dtype=object)

In [25]:
audio_data = pd.read_csv(prc / "matched_metadata.csv")
audio_data.head()

Unnamed: 0,track_id,track_title,artist_name,album_title,name,artists,album,year,release_date,danceability,energy,key,loudness,mode,speechiness,acousticness,instrumentalness,liveness,valence,tempo,duration_ms,time_signature
0,10,Freeway,Kurt Vile,Constant Hitmaker,Freeway,['Kurt Vile'],Constant Hitmaker,2008,2008-03-04,0.606,0.916,6,-8.162,1,0.0371,0.14,0.356,0.132,0.889,111.563,161173,4.0
1,237,Garbage and (Garbage and Fire),Barnacled,6,Garbage and (garbage On Fire),['Barnacled'],6,2003,2003-01-01,0.28,0.64,11,-7.799,0,0.123,0.349,0.675,0.136,0.0537,140.368,449467,4.0
2,238,France Attacks,Barnacled,6,France Attacks,['Barnacled'],6,2003,2003-01-01,0.192,0.411,2,-9.445,1,0.0655,0.539,0.709,0.0909,0.139,56.929,820707,4.0
3,459,Machines and Muscles,CAVE,Butthash,Machines and Muscles,['Cave'],Psychic Psummer,2009,2009-05-26,0.584,0.918,7,-9.883,1,0.0345,0.0254,0.77,0.348,0.114,108.305,236960,5.0
4,459,Machines and Muscles,CAVE,Butthash,Machines and Muscles,['Cave'],Release,2014,2014-10-21,0.415,0.646,2,-12.022,1,0.0399,0.0189,0.948,0.0965,0.123,93.887,303680,5.0


In [26]:
X = audio_data["track_id"].map(lambda id: asp / f"{str(id).zfill(6)}.mp3")
y = audio_data[
    [
        "danceability",
        "energy",
        "key",
        "loudness",
        "mode",
        "speechiness",
        "acousticness",
        "instrumentalness",
        "liveness",
        "valence",
        "tempo",
    ]
]

In [27]:
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1)

In [28]:
aext = SpectrogramExtractor(save_cache=True)

In [None]:
import pickle


class StatsFeatureExtractor(BaseEstimator, TransformerMixin):
    def __init__(
        self,
        sr: float = 22050,
        hop_length: int = 512,
        n_fft: int = 2048,  
        n_mfcc: int = 13,   
        n_chroma: int = 12, 
        f0_min: float = 65.41,  
        f0_max: float = 2093.0,
        enable_cache: bool = True,
        cache_path: Path = intr / "stats_feat",
    ) -> None:
        super().__init__()
        self.sr = sr
        self.hop_length = hop_length
        self.n_fft = n_fft
        self.n_chroma = n_chroma
        self.n_mfcc = n_mfcc
        self.f0_min = f0_min
        self.f0_max = f0_max
        self.enable_cache = enable_cache
        self.cache_path = cache_path
        if self.enable_cache:
            self.cache_path.mkdir(parents=True, exist_ok=True)

    def _get_cache_key(self, audio_path: str) -> str:
        params_str = f"{audio_path}_{self.sr}_{self.hop_length}_{self.n_fft}_{self.n_chroma}_{self.n_mfcc}_{self.f0_min}_{self.f0_max}"
        return hashlib.md5(params_str.encode()).hexdigest()
    
    def _get_cache_file(self, cache_key: str) -> Path:
        return self.cache_path / f"{cache_key}.pkl"
    
    def _load_from_cache(self, cache_key: str):
        cache_file = self._get_cache_file(cache_key)
        if cache_file.exists():
            try:
                with open(cache_file, 'rb') as f:
                    return pickle.load(f)
            except:
                return None
        return None
    
    def _save_to_cache(self, cache_key: str, stats: dict):
        """Save features to cache"""
        cache_file = self._get_cache_file(cache_key)
        try:
            with open(cache_file, 'wb') as f:
                pickle.dump(stats, f)
        except:
            pass

    @staticmethod
    def make_stats(feature_array, features_name):
        return {
            f"{features_name}_mean": np.mean(feature_array),
            f"{features_name}_std": np.std(feature_array),
            f"{features_name}_min": np.min(feature_array),
            f"{features_name}_max": np.max(feature_array),
            f"{features_name}_median": np.median(feature_array),
            f"{features_name}_q25": np.percentile(feature_array, 25),
            f"{features_name}_q75": np.percentile(feature_array, 75),
        }

    @staticmethod
    def temporal_extract(
        audio: np.ndarray, stft_spectrogram: np.ndarray, sr: float, hop_length: int
    ):

        # Use STFT spectrogram for RMS
        rms = librosa.feature.rms(S=stft_spectrogram, hop_length=hop_length)
        zcr = librosa.feature.zero_crossing_rate(y=audio, hop_length=hop_length)

        envelope = rms[0]
        temporal_features = {}

        if len(envelope) > 0 and np.max(envelope) > 0:
            envelope_norm = envelope / np.max(envelope)

            # Attack time (time to reach 90% of max amplitude)
            attack_threshold = 0.9
            attack_frame = np.argmax(envelope_norm >= attack_threshold)
            temporal_features["attack_time"] = (attack_frame * hop_length) / sr

            # Temporal centroid (center of mass in time)
            times = np.arange(len(envelope_norm)) * hop_length / sr
            temporal_features["temporal_centroid"] = np.sum(times * envelope_norm) / (
                np.sum(envelope_norm) + 1e-8
            )
        else:
            temporal_features["attack_time"] = 0.0
            temporal_features["temporal_centroid"] = 0.0

        frame_features = {
            "zcr": zcr.flatten(),
            "rms": rms.flatten(),
        }

        frame_features.update(temporal_features)
        return frame_features
        
    @staticmethod
    def spectral_extract(
        audio: np.ndarray, stft_spectrogram: np.ndarray, sr: float, hop_length: int, n_mfcc: int
    ):

        spectral_centroid = librosa.feature.spectral_centroid(
            S=stft_spectrogram, sr=sr, hop_length=hop_length
        )
        spectral_bandwidth = librosa.feature.spectral_bandwidth(
            S=stft_spectrogram, sr=sr, hop_length=hop_length
        )
        spectral_rolloff = librosa.feature.spectral_rolloff(
            S=stft_spectrogram, sr=sr, hop_length=hop_length
        )
        spectral_flatness = librosa.feature.spectral_flatness(
            S=stft_spectrogram, hop_length=hop_length
        )
        spectral_flux = librosa.onset.onset_strength(S=stft_spectrogram, sr=sr)

        mfccs = librosa.feature.mfcc(
            S=librosa.power_to_db(stft_spectrogram**2),
            sr=sr,
            n_mfcc=n_mfcc,
            hop_length=hop_length,
        )

        return {
            "spectral_centroid": spectral_centroid.flatten(),
            "spectral_bandwidth": spectral_bandwidth.flatten(),
            "spectral_rolloff": spectral_rolloff.flatten(),
            "spectral_flatness": spectral_flatness.flatten(),
            "spectral_flux": spectral_flux,
            "mfccs": mfccs,  # Shape: (13, n_frames)
        }    
    @staticmethod
    def extract_rhythm_features(audio: np.ndarray, sr: float):
        tempo, beats = librosa.beat.beat_track(y=audio, sr=sr)
        onset_env = librosa.onset.onset_strength(y=audio, sr=sr)

        # Compute beat strength only if beats detected
        beat_strength = np.mean(onset_env[beats]) if len(beats) > 0 else 0.0
        onset_rate = len(beats) / (len(audio) / sr) if len(audio) > 0 else 0.0
        if isinstance(tempo,np.ndarray):
            tempo = float(np.median(tempo))
        else:
            tempo = float(tempo)
        return {
            "tempo": tempo,  # Scalar
            "beat_strength": beat_strength,  # Scalar
            "onset_rate": onset_rate,  # Scalar
            "onset_strength": onset_env,  # Time series
        }

    @staticmethod
    def extract_chroma_features(audio: np.ndarray, sr: float, hop_length: int,n_chroma:int):
        chroma = librosa.feature.chroma_stft(y=audio, sr=sr, hop_length=hop_length,n_chroma=n_chroma)

        return {
            "chroma": chroma,
        }

    @staticmethod
    def extract_hpss_features(audio: np.ndarray):
        y_harmonic, y_percussive = librosa.effects.hpss(audio)

        h_energy = np.mean(y_harmonic**2)
        p_energy = np.mean(y_percussive**2)

        return {
            "harmonic_percussive_ratio": h_energy / (p_energy + 1e-8),  # Scalar
            "harmonic_energy": h_energy,  # Scalar
            "percussive_energy": p_energy,  # Scalar
        }, y_harmonic  # Return harmonic component for further analysis

    @staticmethod
    def extract_harmonic_features(y_harmonic, sr, hop_length=512, n_fft=2048, f0_min=65.41, f0_max=2093.0):
        features = {}

        # Get fundamental frequency estimates
        f0, voiced_flag, voiced_probs = librosa.pyin(
            y_harmonic,
            fmin=f0_min,
            fmax=f0_max,
            sr=sr,
            hop_length=hop_length,
        )

        # Compute STFT
        stft = librosa.stft(y_harmonic, n_fft=n_fft, hop_length=hop_length)
        mag_spec = np.abs(stft)
        freqs = librosa.fft_frequencies(sr=sr, n_fft=n_fft)

        # Use only voiced frames
        valid_indices = np.where(voiced_flag)[0]
        valid_f0 = f0[voiced_flag]

        if len(valid_f0) > 0:
            inharmonicity_values = []
            t1_values = []
            t2_values = []
            t3_values = []

            for idx, f0_val in zip(valid_indices, valid_f0):
                if np.isnan(f0_val) or f0_val <= 0:
                    continue

                frame_spec = mag_spec[:, idx]

                # --- INHARMONICITY CALCULATION ---
                peaks, properties = find_peaks(
                    frame_spec, height=np.max(frame_spec) * 0.1
                )
                peak_freqs = freqs[peaks]
                peak_mags = properties["peak_heights"]

                if len(peak_freqs) > 0:
                    sorted_idx = np.argsort(peak_mags)[::-1]
                    peak_freqs = peak_freqs[sorted_idx[:10]]
                    peak_mags = peak_mags[sorted_idx[:10]]

                    deviations = []
                    for n in range(1, min(8, len(peak_freqs) + 1)):
                        expected_freq = f0_val * n
                        if expected_freq < freqs[-1]:
                            closest_idx = np.argmin(np.abs(peak_freqs - expected_freq))
                            deviation = (
                                np.abs(peak_freqs[closest_idx] - expected_freq)
                                / expected_freq
                            )
                            deviations.append(deviation)

                    if deviations:
                        inharmonicity_values.append(np.mean(deviations))

                # --- TRISTIMULUS CALCULATION ---
                harmonic_energies = []
                for n in range(1, 11):
                    harmonic_freq = f0_val * n
                    if harmonic_freq >= freqs[-1]:
                        break

                    bin_idx = np.argmin(np.abs(freqs - harmonic_freq))
                    start_bin = max(0, bin_idx - 3)
                    end_bin = min(len(frame_spec), bin_idx + 4)
                    energy = np.sum(frame_spec[start_bin:end_bin])
                    harmonic_energies.append(energy)

                if len(harmonic_energies) >= 5:
                    total_energy = np.sum(harmonic_energies) + 1e-8

                    t1 = harmonic_energies[0] / total_energy
                    t2 = np.sum(harmonic_energies[1:4]) / total_energy
                    t3 = np.sum(harmonic_energies[4:]) / total_energy

                    t1_values.append(t1)
                    t2_values.append(t2)
                    t3_values.append(t3)

            # Aggregate features
            features["inharmonicity"] = (
                np.mean(inharmonicity_values) if inharmonicity_values else 0.0
            )
            features["tristimulus_1"] = np.mean(t1_values) if t1_values else 0.0
            features["tristimulus_2"] = np.mean(t2_values) if t2_values else 0.0
            features["tristimulus_3"] = np.mean(t3_values) if t3_values else 0.0
            features["f0_mean"] = np.mean(valid_f0)
            features["f0_std"] = np.std(valid_f0)
            features["voiced_ratio"] = np.sum(voiced_flag) / len(voiced_flag)
        else:
            # No voiced frames
            features["inharmonicity"] = 0.0
            features["tristimulus_1"] = 0.0
            features["tristimulus_2"] = 0.0
            features["tristimulus_3"] = 0.0
            features["f0_mean"] = 0.0
            features["f0_std"] = 0.0
            features["voiced_ratio"] = 0.0

        return features
    def extract_all_features(self, audio: np.ndarray, stft_spectrogram: np.ndarray):
        all_features = {}

        all_features.update(
            self.temporal_extract(audio, stft_spectrogram, self.sr, self.hop_length)
        )

        all_features.update(
            self.spectral_extract(audio, stft_spectrogram, self.sr, self.hop_length, self.n_mfcc)  # ← Pass n_mfcc
        )

        all_features.update(self.extract_rhythm_features(audio, self.sr))

        all_features.update(
            self.extract_chroma_features(audio, self.sr, self.hop_length,self.n_chroma)
        )

        hpss_features, y_harmonic = self.extract_hpss_features(audio)
        all_features.update(hpss_features)

        all_features.update(
            self.extract_harmonic_features(y_harmonic, self.sr, self.hop_length, self.n_fft, self.f0_min, self.f0_max)  # ← Pass all params
        )

        return all_features
    
    def compute_statistics(self, features: Dict[str, Any]) -> Dict[str, float]:
        """Convert all features to statistics (scalars)"""
        stats = {}

        for key, val in features.items():
            # Handle different feature types
            if isinstance(val, (int, float, np.integer, np.floating)):
                # Already a scalar (tempo, attack_time, etc.)
                stats[key] = float(val)

            elif isinstance(val, np.ndarray):
                if val.ndim == 1:
                    # 1D time series (RMS, spectral_centroid, etc.)
                    stats.update(self.make_stats(val, key))

                elif val.ndim == 2:
                    # 2D features (MFCCs, chroma)
                    if key == "mfccs":
                        # MFCCs: shape (13, n_frames)
                        for i in range(val.shape[0]):  # Iterate over 13 coefficients
                            mfcc_i = val[i, :]
                            stats.update(self.make_stats(mfcc_i, f"mfcc_{i}"))

                    elif key == "chroma":
                        # Chroma: shape (12, n_frames)
                        for i in range(val.shape[0]):  # Iterate over 12 pitch classes
                            chroma_i = val[i, :]
                            stats.update(self.make_stats(chroma_i, f"chroma_{i}"))

                    else:
                        # Generic 2D handling
                        for i in range(val.shape[0]):
                            stats.update(self.make_stats(val[i, :], f"{key}_{i}"))

        return stats
    def fit(self, X, y=None):
        return self
    def transform(self, X):
        feature_vectors = []

        for item in tqdm(X, desc="Extracting features"):
            audio = item["audio"]
            stft_spectrogram = item["stft_spectrogram"]  # Use STFT, not mel
            audio_path = item["path"]
            
            stats = None
            
            if self.enable_cache:
                cache_key = self._get_cache_key(audio_path)
                stats = self._load_from_cache(cache_key)
            
            if stats is None:
                features = self.extract_all_features(audio, stft_spectrogram)
                stats = self.compute_statistics(features)
                
                if self.enable_cache:
                    cache_key = self._get_cache_key(audio_path)
                    self._save_to_cache(cache_key, stats)
            
            feature_vectors.append(stats)

        return pd.DataFrame(feature_vectors)

In [30]:
# Basic usage - extract features and train a model
from sklearn.pipeline import Pipeline
from sklearn.ensemble import RandomForestRegressor

# Extraction pipeline
extraction_pipeline = Pipeline([
    ('spectrogram', SpectrogramExtractor(
        new_sr=22050,
        save_cache=True
    )),
    ('features', StatsFeatureExtractor(
        sr=22050,
        enable_cache=True
    ))
])

# Extract features
X_train_extracted = extraction_pipeline.fit_transform(X_train)
X_test_extracted = extraction_pipeline.transform(X_test)

Loading spectrograms:  32%|███▏      | 493/1556 [02:23<05:09,  3.44it/s]


KeyboardInterrupt: 

In [None]:
type(X_train_extracted)

pandas.core.frame.DataFrame

In [None]:
# Train model on extracted features
model = RandomForestRegressor(n_estimators=100)
model.fit(X_train_extracted, y_train)

# Predict
predictions = model.predict(X_test_extracted)

# Evaluate
print(f"MSE: {mean_squared_error(y_test, predictions)}")
print(f"R²: {r2_score(y_test, predictions)}")

ValueError: The feature names should match those that were passed during fit.
Feature names seen at fit time, yet now missing:
- tempo
