# Feature Extraction

ML and DL models need to be fed with features. In this notebook, we will extract some characteristics from audio files that can be used to train these kinds of models.

In [1]:
import pandas as pd
import librosa
import numpy as np
import pickle
import os

from scipy.signal import get_window
from audiomentations import Compose, AddGaussianNoise, TimeStretch, PitchShift
from sklearn.model_selection import train_test_split

SEED = 2706

We have built a metadata file that will help us with the feature extraction. However, for representational issues we will delete the rows corresponding with the emotions 'Calm' and 'Surprised'.

In [2]:
audios = pd.read_csv("./audio_data/complete_metadata.csv")
audios = audios[~((audios['Emotion'] == 'Calm') | (audios['Emotion'] == 'Surprised'))]
audios

Unnamed: 0,Dataset,Path,Emotion,Sex,Emotional_level
0,CREMA-D,./audio_data/crema-d/1001_DFA_ANG_XX.wav,Angry,Male,XX
1,CREMA-D,./audio_data/crema-d/1001_DFA_DIS_XX.wav,Disgust,Male,XX
2,CREMA-D,./audio_data/crema-d/1001_DFA_FEA_XX.wav,Fearful,Male,XX
3,CREMA-D,./audio_data/crema-d/1001_DFA_HAP_XX.wav,Happy,Male,XX
4,CREMA-D,./audio_data/crema-d/1001_DFA_NEU_XX.wav,Neutral,Male,XX
...,...,...,...,...,...
9907,MESD,./audio_data/mesd/Sadness_M_B_ira.wav,Sad,Male,XX
9908,MESD,./audio_data/mesd/Sadness_M_B_llanto.wav,Sad,Male,XX
9909,MESD,./audio_data/mesd/Sadness_M_B_locura.wav,Sad,Male,XX
9910,MESD,./audio_data/mesd/Sadness_M_B_metralleta.wav,Sad,Male,XX


Now we have to make some adjustment to data to standardise them. We have to make the audios be sampled at the same rate, normalise them (since they may not have been recorded at the same conditions) and unify the channel. Duration will not be changed for reasons explained later.

In [3]:
def process_audio(path: str, target_sr: int = 16000) -> np.ndarray:
    """
    Load and preprocess an audio file.

    Steps:
        - Load audio as mono
        - Resample to target_sr
        - Normalize by RMS energy
        - (Optionally) adjust to a fixed duration with trimming or padding

    Args:
        path: Path to the audio file.
        target_sr: Target sampling rate.

    Returns:
        np.ndarray: Preprocessed audio signal.
    """
    # Load audio and force mono channel
    audio, sr = librosa.load(path, sr=None, mono=True)

    # Resample if the sampling rate does not match target_sr
    if sr != target_sr:
        audio = librosa.resample(audio, orig_sr=sr, target_sr=target_sr)
        sr = target_sr

    # Normalize using RMS energy to ensure consistent loudness
    rms = np.sqrt(np.mean(audio**2))
    if rms > 0:
        audio = audio / rms

    return audio


## Feature Extraction

Now, let's start to extract some features from the audios. We will explore the following propierties:
* Total Spectrum Power: $\log{\int_0^{w_0}|F(\omega)|^2\,d\omega}$, where $|F(\omega)|^2$ is the power at frequency $\omega$ Hz and $\omega_0$ is half the sampling rate.
* Subband Powers: For intervals $[0, \omega_0/8],~[\omega_0/8, \omega_0/4],~[\omega_0/4, \omega_0/2],~[\omega_0/2, \omega_0]$, let $L_j,~H_j$ be the intervals extreme values for $j\in\{1,2,3,4\}$. Then, the subband powers are defined as the spectrum power for each interval.
* Brightness: Is the spectrum centroid, defined as $\omega_C=\int_0^{\omega_0}\omega\cdot|F(\omega)|^2\,d\omega\left/\int_0^{\omega_0}|F(\omega)|^2\,d\omega\right.$.
* Bandwidth: $B=\sqrt{\int_0^{\omega_0}(\omega-\omega_0)\cdot|F(\omega)|^2\,d\omega}\left/\sqrt{\int_0^{\omega_0}|F(\omega)|^2\,d\omega}\right.$.
* Pitch Frequency: $P(x)=x$ if $x>P_0$ and $P(x)=0$ in any other case, for a fixed $P_0$ chosen empirically.
* MFCC: Coefficients of the Fast Fourier Transform filtered by the Mel function. We will be using Librosa to extract them.
* Zero-crossing rate: We will be using Librosa for this one too.

Since we are processing audio signals, their properties change within its duration, because they are not stationary waves. In order to capture better the features, we will use Hamming windows. This is why we do not need to pad the audios with silence.

In [4]:
# Total Spectrum Power
def total_spectrum_power(signal: np.ndarray, **kwargs) -> float:
    """
    Compute the total spectral power of a signal.

    The function calculates the power spectral density using the FFT,
    integrates it across frequencies, and returns the logarithm of
    the total spectral power.

    Args:
        signal: Input audio signal.
        **kwargs: Optional keyword arguments.
            - sr (int): Sampling rate of the signal (default is 16000).

    Returns:
        float: Logarithm of the total spectral power.
    """
    sr = kwargs.get("sr", 16000)

    # Compute spectral power density using the real FFT
    power = np.abs(np.fft.rfft(signal))**2

    # Frequency bins corresponding to the FFT
    freq = np.fft.rfftfreq(n=len(signal), d=1 / sr)

    # Frequency resolution (delta between adjacent frequency bins)
    delta_freq = freq[1] - freq[0]

    # Integrate spectral power over all frequencies
    sigma = np.sum(power) * delta_freq

    # Return log of total power with numerical stability
    return np.log(sigma + 1e-10)


In [5]:
# Subband powers
def subband_powers(signal: np.ndarray, n_intervals: int = 4, **kwargs) -> list[float]:
    """
    Compute logarithmic power for multiple frequency subbands.

    The signal spectrum is divided into logarithmically spaced subbands
    up to the Nyquist frequency (sr/2). For each subband, the power is
    computed by integrating the spectral density, and the logarithm of
    that power is returned.

    Args:
        signal: Input audio signal.
        n_intervals: Number of logarithmic subbands to compute (default is 4).
        **kwargs: Optional keyword arguments.
            - sr (int): Sampling rate of the signal (default is 16000).
            - w_0 (float): Maximum frequency limit (default is sr/2).

    Returns:
        list[float]: Logarithmic power of each subband.
    """
    sr = kwargs.get("sr", 16000)
    w_0 = kwargs.get("w_0", sr / 2)

    # Define logarithmically spaced frequency intervals
    intervals = (
        [(0, w_0 / 2 ** (n_intervals - 1))]
        + [
            (w_0 / 2 ** (n_intervals - i), w_0 / 2 ** (n_intervals - i - 1))
            for i in range(1, n_intervals)
        ]
    )

    # Compute spectral power density
    power = np.abs(np.fft.rfft(signal))**2

    # Frequency bins corresponding to FFT
    freq = np.fft.rfftfreq(n=len(signal), d=1 / sr)

    log_band_power = []
    for (L_i, H_i) in intervals:
        # Select frequency bins within the subband
        band_mask = (freq >= L_i) & (freq <= H_i)

        # Frequency resolution
        delta_f = freq[1] - freq[0]

        # Integrate spectral power within subband
        band_power = np.sum(power[band_mask]) * delta_f

        # Store logarithmic subband power with numerical stability
        log_band_power.append(np.log(band_power + 1e-10))

    return log_band_power

In [6]:
# Brightness
def brightness(signal: np.ndarray, **kwargs) -> np.ndarray:
    """
    Compute the spectral centroid of a signal, a measure of its "brightness."

    The spectral centroid represents the center of mass of the spectrum,
    which correlates with the perceived brightness of the sound.

    Args:
        signal: Input audio signal.
        **kwargs: Optional keyword arguments.
            - sr (int): Sampling rate of the signal (default is 16000).
            - n_fft (int): Length of the FFT window (default is 512).
            - hop_length (int): Number of samples between successive frames (default is 160).
            - window (str or tuple or np.ndarray): Window function for FFT (default is "hann").

    Returns:
        np.ndarray: Spectral centroid of the signal (shape: [1, n_frames]).
    """
    # Compute spectral centroid using librosa
    return librosa.feature.spectral_centroid(
        y=signal,
        sr=kwargs.get("sr", 16000),
        n_fft=kwargs.get("n_fft", 512),
        hop_length=kwargs.get("hop_length", 160),
        window=kwargs.get("window", "hann")
    )

In [7]:
# Bandwidth
def bandwidth(signal: np.ndarray, **kwargs) -> float:
    """
    Compute the spectral bandwidth of a signal.

    The spectral bandwidth is a measure of the spread of the spectrum
    around the origin, weighted by frequency and spectral power.

    Args:
        signal: Input audio signal.
        **kwargs: Optional keyword arguments.
            - sr (int): Sampling rate of the signal (default is 16000).
            - w_0 (float): Maximum frequency for normalization (default is sr/2).

    Returns:
        float: Spectral bandwidth of the signal.
    """
    sr = kwargs.get("sr", 16000)
    w_0 = kwargs.get("w_0", sr / 2)

    # Compute spectral power density
    power = np.abs(np.fft.rfft(signal))**2

    # Frequency bins corresponding to the FFT
    freq = np.fft.rfftfreq(n=len(signal), d=1 / sr)

    # Frequency resolution (spacing between FFT bins)
    delta_freq = freq[1] - freq[0]

    # Compute weighted spectral spread and normalize by w_0
    sigma = np.sum(freq * power) * delta_freq * w_0

    # Return the square root of the weighted sum as bandwidth
    return np.sqrt(sigma)

In [8]:
# Pitch
def pitch(signal: np.ndarray, **kwargs) -> np.ndarray:
    """
    Estimate the pitch (fundamental frequency) of an audio signal using autocorrelation.

    The function frames the signal, removes the DC component from each frame,
    computes the normalized autocorrelation, and estimates the pitch for each frame
    based on the location of the maximum autocorrelation within a valid period range.

    Args:
        signal: Input audio signal.
        **kwargs: Optional keyword arguments.
            - sr (int): Sampling rate of the signal (default is 16000).
            - n_fft (int): Frame length for analysis (default is 512).
            - hop_length (int): Hop length between frames (default is 160).
            - threshold (float): Minimum normalized autocorrelation to consider a valid pitch (default is 0.3).
            - min_freq (float): Minimum detectable frequency in Hz (default is 50).
            - max_freq (float): Maximum detectable frequency in Hz (default is 500).

    Returns:
        np.ndarray: Estimated pitch track (Hz) for each frame. Frames with no detected pitch are 0.
    """
    sr = kwargs.get("sr", 16000)
    frame_length = kwargs.get("n_fft", 512)
    hop_length = kwargs.get("hop_length", 160)
    threshold = kwargs.get("threshold", 0.3)
    min_freq = kwargs.get("min_freq", 50)
    max_freq = kwargs.get("max_freq", 500)

    # Convert frequency limits to period limits in samples
    min_period = int(sr / max_freq)
    max_period = int(sr / min_freq)

    # Frame the signal for analysis
    frames = librosa.util.frame(signal, frame_length=frame_length, hop_length=hop_length).T
    pitch_track = []

    for frame in frames:
        # Remove DC component
        frame = frame - np.mean(frame)

        # Skip silent frames
        if np.all(frame == 0):
            pitch_track.append(0)
            continue

        # Compute normalized autocorrelation
        r = librosa.autocorrelate(frame)
        r = r / (np.max(np.abs(r)) + 1e-9)

        # Ignore lags outside the valid pitch range
        r[:min_period] = 0
        r[max_period:] = 0

        # Find the lag corresponding to maximum autocorrelation
        T = np.argmax(r)
        p = r[T]

        # Estimate pitch if autocorrelation exceeds threshold
        pitch_value = 1 / T if p > threshold else 0
        pitch_track.append(pitch_value)

    return np.array(pitch_track)

In [9]:
# MFCC
def mfcc(signal: np.ndarray, n_mfcc: int = 20, **kwargs) -> np.ndarray:
    """
    Compute the mean and std of Mel-frequency cepstral coefficients (MFCCs) of an audio signal.

    Returns:
        np.ndarray: Concatenated mean and std MFCC coefficients (shape: [2 * n_mfcc]).
    """
    # Compute MFCCs using librosa
    mfccs = librosa.feature.mfcc(
        y=signal,
        sr=kwargs.get("sr", 16000),
        n_mfcc=n_mfcc,
        n_fft=kwargs.get("n_fft", 512),
        hop_length=kwargs.get("hop_length", 160),
        window=kwargs.get("window", "hann")
    )

    # Transpose to shape (frames, n_mfcc)
    mfccs_T = mfccs.T

    # Compute mean and std across frames
    mean = np.mean(mfccs_T, axis=0)
    std = np.std(mfccs_T, axis=0)

    return {'Mean': mean, "SD": std}

In [10]:
# Zero-crossing rate
def zeros_rate(signal: np.ndarray, **kwargs) -> np.ndarray:
    """
    Compute the zero-crossing rate of an audio signal.

    The zero-crossing rate measures the rate at which the signal changes
    sign, which is often used as a simple measure of signal noisiness or
    percussiveness.

    Args:
        signal: Input audio signal.
        **kwargs: Optional keyword arguments.
            - frame_length (int): Length of the analysis frame (default is 400).
            - hop_length (int): Number of samples between successive frames (default is 160).

    Returns:
        np.ndarray: Zero-crossing rate for each frame (shape: [1, n_frames]).
    """
    # Compute zero-crossing rate using librosa
    return librosa.feature.zero_crossing_rate(
        y=signal,
        frame_length=kwargs.get("frame_length", 400),
        hop_length=kwargs.get("hop_length", 160)
    )

In [11]:
# Chroma_stft
def chroma(signal: np.ndarray, **kwargs) -> np.ndarray:
    """
    Compute the mean chroma feature of an audio signal.

    Chroma features represent the energy distribution across the 12
    pitch classes of the musical octave, often used in music analysis
    and key detection.

    Args:
        signal: Input audio signal.
        **kwargs: Optional keyword arguments.
            - sr (int): Sampling rate of the signal (default is 16000).
            - n_fft (int): FFT window size (default is 512).
            - hop_length (int): Hop length between frames (default is 160).
            - window (str or tuple or np.ndarray): Window function for FFT (default is "hann").

    Returns:
        np.ndarray: Mean chroma features across all frames (shape: [12]).
    """
    # Compute magnitude STFT
    stft = np.abs(librosa.stft(signal))

    # Compute chroma features from the STFT
    chroma_features = librosa.feature.chroma_stft(
        S=stft,
        sr=kwargs.get("sr", 16000),
        n_fft=kwargs.get("n_fft", 512),
        hop_length=kwargs.get("hop_length", 160),
        window=kwargs.get("window", "hann")
    )

    # Return mean chroma across frames
    return {"Mean": np.mean(chroma_features.T, axis=0), "SD": np.std(chroma_features.T, axis=0)}

In [12]:
# Mel Spectrogram
def mel_spectro(signal: np.ndarray, **kwargs) -> np.ndarray:
    """
    Compute the mean Mel-spectrogram of an audio signal.

    The Mel-spectrogram represents the signal's short-term power spectrum
    mapped onto the Mel scale, which is perceptually motivated. Useful
    for speech and audio analysis.

    Args:
        signal: Input audio signal.
        **kwargs: Optional keyword arguments.
            - sr (int): Sampling rate of the signal (default is 16000).
            - n_fft (int): FFT window size (default is 512).
            - hop_length (int): Hop length between frames (default is 160).
            - window (str or tuple or np.ndarray): Window function for FFT (default is "hann").

    Returns:
        np.ndarray: Mean Mel-spectrogram across all frames.
    """
    # Compute Mel-spectrogram using librosa
    mel_spec = librosa.feature.melspectrogram(
        y=signal,
        sr=kwargs.get("sr", 16000),
        n_fft=kwargs.get("n_fft", 512),
        hop_length=kwargs.get("hop_length", 160),
        window=kwargs.get("window", "hann")
    )

    # Return mean Mel-spectrogram across frames
    return {"Mean": np.mean(mel_spec.T, axis=0), "SD": np.std(mel_spec.T, axis=0)}

In [13]:
# Hamming window for spectral power
def frame_signal(signal: np.ndarray, frame_length: int, hop_length: int) -> np.ndarray:
    """
    Split a signal into overlapping frames using stride tricks.

    Args:
        signal: Input audio signal.
        frame_length: Number of samples per frame.
        hop_length: Number of samples to advance between frames.

    Returns:
        np.ndarray: 2D array of frames with shape (n_frames, frame_length).
    """
    n_frames = 1 + (len(signal) - frame_length) // hop_length

    # Use numpy stride tricks to create a 2D view of overlapping frames
    frames = np.lib.stride_tricks.as_strided(
        signal,
        shape=(n_frames, frame_length),
        strides=(signal.strides[0] * hop_length, signal.strides[0])
    )

    return frames


def apply_function_per_frame(signal: np.ndarray, func, **kwargs) -> np.ndarray:
    """
    Apply a feature extraction function to each frame of a signal.

    The signal is split into overlapping frames, optionally windowed,
    and the given function is applied to each frame.

    Args:
        signal: Input audio signal.
        func: Function to apply to each frame.
        **kwargs: Optional keyword arguments.
            - frame_length (int): Number of samples per frame.
            - hop_length (int): Number of samples between frames.
            - window (str): Type of window to apply to each frame (default is "hann").

    Returns:
        np.ndarray: Array of features computed per frame.
    """
    frame_length = kwargs.get("frame_length")
    hop_length = kwargs.get("hop_length")
    window_type = kwargs.get("window", "hann")

    # Split signal into frames
    frames = frame_signal(signal, frame_length, hop_length)

    # Create window to apply to each frame
    hamming_window = get_window(window_type, frame_length, fftbins=True)

    features = []
    for frame in frames:
        # Apply window to frame and compute feature
        windowed = frame * hamming_window
        features.append(func(windowed, **kwargs))

    return np.array(features)

Finally, we can extract the features.

In [14]:
def extract(signal: np.ndarray, params: dict) -> list:
    """
    Extract a comprehensive set of audio features from a signal.

    Features extracted:
        - Total spectral power
        - Subband powers
        - Spectral brightness
        - Spectral bandwidth
        - Pitch track
        - MFCCs
        - Zero-crossing rate
        - Chroma features
        - Mel-spectrogram

    Args:
        signal: Input audio signal.
        params: Dictionary of optional parameters for feature extraction functions,
            such as 'sr', 'frame_length', 'hop_length', 'n_fft', 'window', etc.

    Returns:
        list: A list containing arrays for each feature in the order listed above.
    """
    # Compute features per frame or for the entire signal
    total_power = apply_function_per_frame(signal, total_spectrum_power, **params)
    subband_power = apply_function_per_frame(signal, subband_powers, **params)
    bright = brightness(signal, **params)
    band = apply_function_per_frame(signal, bandwidth, **params)
    pitches = pitch(signal, **params)
    coeff = mfcc(signal, n_mfcc=10, **params)
    zero_crossing = zeros_rate(signal, **params)
    chroma_stft = chroma(signal, **params)
    mel = mel_spectro(signal, **params)

    return [
        total_power,
        subband_power,
        bright,
        band,
        pitches,
        coeff,
        zero_crossing,
        chroma_stft,
        mel
    ]

To improve the performance of the models we will apply data augmentation. For each *selected* file, we will add some noise, strect its duration and lightly shift its pitch. It is important to explain that *selection* is necessary to avoid any data leakage: if we were to train with augmented files generated from test data, we would be learning something about test data, which is not the correct way to proceed. Thus, before extracting the features we will split the dataset.

In [15]:
# =============== AUGMENTATIONS ===============
augment = Compose([
    AddGaussianNoise(min_amplitude=0.001, max_amplitude=0.01, p=0.5),
    TimeStretch(min_rate=0.9, max_rate=1.1, p=0.5),
    PitchShift(min_semitones=-2, max_semitones=2, p=0.5),
])
# ===============================================

In [16]:
params = {
    "w_0": 8000,
    "sr": 16000,
    "frame_length": 400,
    "hop_length": 160,
    "n_fft": 400,
    "window": "hamming"
}

NUM_AUGMENTATIONS = 2  # cuántas veces aumentar cada audio
AUGMENTATION_ENABLED = True
SAVE_PATH = "./features/ML_features.pickle"

At this moment we have defined everything needed. We proceed then.

In [17]:
# --------------------------------------------------
# 1. Train & Test split
# --------------------------------------------------
def stratified_split(
    df: "pd.DataFrame", y_col: str, test_frac: float = 0.3, seed: int = SEED
) -> "pd.DataFrame":
    """
    Perform a stratified train-test split on a DataFrame.

    Ensures that each class in `y_col` is represented proportionally
    in both the training and test sets.

    Args:
        df: Input DataFrame containing features and target column.
        y_col: Name of the target column used for stratification.
        test_frac: Fraction of samples to include in the test set (default is 0.2).
        seed: Random seed for reproducibility (default is 42).

    Returns:
        pd.DataFrame: DataFrame with an added 'split' column containing
                      values 'train' or 'test' for each row.
    """
    rng = np.random.default_rng(seed)

    # Shuffle the DataFrame
    df = df.sample(frac=1, random_state=seed).reset_index(drop=True)

    test_idx = []

    # Determine test indices for each class to preserve proportions
    for label, g in df.groupby(y_col):
        n_test = max(1, int(round(len(g) * test_frac)))
        test_idx.extend(g.sample(n=n_test, random_state=seed).index.tolist())

    # Initialize all rows as training set
    df["split"] = "train"

    # Assign test rows
    df.loc[test_idx, "split"] = "test"

    return df


audios = stratified_split(audios, y_col="Emotion", test_frac=0.2, seed=42)

In [18]:
# --------------------------------------------------
# 2. Feature extraction & Augmentations (Train)
# --------------------------------------------------
rows = []
for _, row in audios.iterrows():
    path, emotion, sex, split = row["Path"], row["Emotion"], row["Sex"], row["split"]
    signal = process_audio(path, target_sr=params["sr"])

    n_repeats = NUM_AUGMENTATIONS + 1 if (AUGMENTATION_ENABLED and split == "train") else 1
    for i in range(n_repeats):
        signal_aug = signal if i == 0 else augment(samples=signal, sample_rate=params["sr"])
        (total_power, subband_power, bright, band, pitches, coeff, zero_crossing,
         chroma_stft, mel) = extract(signal_aug, params)

        rows.append({
            "Emotion": emotion,
            "Sex": sex,
            "Split": split,
            "Augmented": i > 0,
            "Total_Spectrum_Power_mean": np.mean(total_power),
            "Total_Spectrum_Power_sd": np.std(total_power),
            "Subband_Power_1_mean": np.mean([f[0] for f in subband_power]),
            "Subband_Power_1_sd": np.std([f[0] for f in subband_power]),
            "Subband_Power_2_mean": np.mean([f[1] for f in subband_power]),
            "Subband_Power_2_sd": np.std([f[1] for f in subband_power]),
            "Subband_Power_3_mean": np.mean([f[2] for f in subband_power]),
            "Subband_Power_3_sd": np.std([f[2] for f in subband_power]),
            "Subband_Power_4_mean": np.mean([f[3] for f in subband_power]),
            "Subband_Power_4_sd": np.std([f[3] for f in subband_power]),
            "Brightness_mean": np.mean(bright),
            "Brightness_sd": np.std(bright),
            "Bandwidth_mean": np.mean(band) if isinstance(band, (np.ndarray, list)) else band,
            "Bandwidth_sd": np.std(band) if isinstance(band, (np.ndarray, list)) else 0,
            "Pitch_mean": np.mean(pitches[pitches > 0]) if np.any(pitches > 0) else 0,
            "Pitch_sd": np.std(pitches[pitches > 0]) if np.any(pitches > 0) else 0,
            "Zero_crossing_rate_mean": np.mean(zero_crossing),
            "Zero_crossing_rate_sd": np.std(zero_crossing),
            "MFCC_mean": coeff['Mean'],
            "MFCC_sd": coeff['SD'],
            "Chromogram_mean": chroma_stft['Mean'],
            "Chromogram_sd": chroma_stft['SD'],
            "Mel_spectrogram_mean": mel['Mean'],
            "Mel_spectrogram_sd": mel['SD']
        })

features_augmented = pd.DataFrame(rows)

  return pitch_tuning(


This features will be saved in a pickle file. They are in need for processing (for example, MFCC is winded). We will treat them later when preparing model training. 

In [19]:
with open(SAVE_PATH, "wb") as f:
    pickle.dump(features_augmented, f)