In [11]:
import os
import numpy as np
from sklearn.mixture import GaussianMixture
from sklearn.neighbors import KNeighborsClassifier
from sklearn.preprocessing import LabelEncoder
from sklearn.metrics import classification_report
from sklearn.model_selection import train_test_split
import joblib
from tqdm import tqdm
import pandas as pd
import librosa

Config

In [12]:
SR = 48000  # Sample rate
N_MFCC = 20  # MFCC features
GMM_COMPONENTS = 32  # For faster training; use 512 in production
IVECTOR_DIM = 100  # i-vector size (total variability space)
MFCC_DIR = r"trials\features\mfcc_extra_stats_2000_n" # r"trials\features\48k_mfcc_extra_hfcc_extra"
Y_DIR = r"trials\features\mfcc_extra_stats_2000_n" # r"trials\features\48k_mfcc_extra_hfcc_extra"

filtered_metadata_path = os.path.join(".", "data", "filtered_data_labeled.tsv")
audio_dir = os.path.join(".", "data", "filtered_clips")
df = pd.read_csv(filtered_metadata_path, sep='\t')

Utils

In [13]:
def load_mfccs(df):
    """
    Generator that yields MFCC features for each audio file in the dataframe.
    """
    mfccs_list = []
    for index, row in tqdm(df.iterrows(), total=len(df), desc="Loading MFCCs"):
        filename = row['path']
        filepath = os.path.join(audio_dir, filename)
        audio, sr = librosa.load(filepath, sr=SR)
        mfccs = librosa.feature.mfcc(y=audio, sr=sr, n_mfcc=N_MFCC).T
        mfccs_list.append(mfccs)
    return mfccs_list

def train_ubm(mfccs_list, n_components=GMM_COMPONENTS):
    all_feats = np.vstack(mfccs_list)
    ubm = GaussianMixture(n_components=n_components, covariance_type='diag', max_iter=100)
    ubm.fit(all_feats)
    return ubm

def train_t_matrix(mfccs_list, ubm, R=100, n_iter=5):
    K, D = ubm.means_.shape
    T = np.random.randn(K * D, R).astype(np.float32) * 0.1

    # Precompute per-utterance stats
    stats = []
    for mfccs in tqdm(mfccs_list, desc="Computing Baum-Welch stats"):
        N, F = compute_bw_stats(mfccs, ubm)
        # Convert to float32 to reduce memory usage
        stats.append((N.astype(np.float32), F.astype(np.float32)))

    for iteration in range(n_iter):
        T_num = np.zeros((K * D, R), dtype=np.float32)
        T_den = np.zeros((R, R), dtype=np.float32)

        for N, F in tqdm(stats, desc=f"T-matrix EM iter {iteration+1}"):
            # Centered stats
            S = np.zeros((K, D), dtype=np.float32)
            for k in range(K):
                S[k] = F[k] - N[k] * ubm.means_[k]
            S = S.flatten()

            # Inverse sigma (diagonal covs)
            sigma = ubm.covariances_.reshape(K, D).flatten().astype(np.float32) + 1e-6
            inv_sigma = 1. / sigma
            T_invSigma = T.T * inv_sigma[None, :]

            # E-step: compute posterior for i-vector w
            cov_w = np.linalg.inv(T_invSigma @ T + np.eye(R, dtype=np.float32))
            mean_w = cov_w @ (T_invSigma @ S)

            # M-step accumulators - memory-efficient implementation
            # Avoid creating large temporary arrays with np.outer
            for r in range(R):
                T_num[:, r] += S * mean_w[r]
            
            T_den += N.sum() * (np.outer(mean_w, mean_w) + cov_w)

        # Update T
        T = T_num @ np.linalg.inv(T_den)

    return T, stats

def compute_bw_stats(mfccs, ubm):
    if mfccs.ndim == 1:
        mfccs = mfccs.reshape(1, -1)
    responsibilities = ubm.predict_proba(mfccs)
    N = np.sum(responsibilities, axis=0)  # [K]
    F = np.dot(responsibilities.T, mfccs)  # [K, D]
    return N, F

def extract_ivec(N, F, ubm, T):
    """
    Extract an i-vector using full per-component covariances.
    N: [K] - zero order stats
    F: [K, D] - first order stats
    T: [K*D, R] - total variability matrix
    """
    K, D = ubm.means_.shape
    R = T.shape[1]

    # Flattened UBM means and covariances
    m = ubm.means_.flatten()
    covs = ubm.covariances_.reshape(K, D)  # still diagonal, but per component
    T_blocks = T  # shape: (K*D, R)

    # Compute centered stats
    F_dev = (F - N[:, None] * ubm.means_).flatten()  # (K*D,)

    # Construct precision matrix (inverse of covariance)
    inv_sigma = 1.0 / covs.flatten()  # (K*D,)
    T_transpose_Sigma_inv = T_blocks.T * inv_sigma[None, :]  # (R, K*D)

    # Compute posterior covariance of i-vector (R x R)
    cov_i = np.linalg.inv(T_transpose_Sigma_inv @ T_blocks + np.eye(R))

    # Compute posterior mean of i-vector (R,)
    mean_i = cov_i @ (T_transpose_Sigma_inv @ F_dev)

    return mean_i

Main Pipeline

In [20]:
from sklearn.model_selection import train_test_split

def main():
    print("Loading Splits...")
    # df_train, df_temp = train_test_split(df, test_size=0.10, random_state=42)
    # df_val, df_test = train_test_split(df_temp, test_size=0.5, random_state=42)

    print("Loading MFCCs...")
    # Stratified split to train and test
    mfccs_all = joblib.load(os.path.join(MFCC_DIR, "X.joblib"))

    print("Loading labels...")
    # y_train = df_train['label'].values
    # y_val = df_val['label'].values
    # y_test = df_test['label'].values
    y_all = joblib.load(os.path.join(Y_DIR, "y.joblib"))

    mfccs_train, mfccs_val = train_test_split(mfccs_all, test_size=0.1, random_state=42, stratify=y_all) 
    y_train, y_val = train_test_split(y_all, test_size=0.1, random_state=42, stratify=y_all)

    print("Training UBM...")
    ubm = train_ubm(mfccs_train)

    print("Training T-matrix...")
    T, stats = train_t_matrix(mfccs_train, ubm, R=IVECTOR_DIM, n_iter=5)

    print("Extracting i-vectors...")
    ivecs = []
    for (N, F) in tqdm(stats, desc="Extracting i-vectors"):
        ivec = extract_ivec(N, F, ubm, T)
        ivecs.append(ivec)
    ivecs = np.vstack(ivecs)

    print("Training classifier...")
    # Train classifier using the extracted i-vectors
    clf = KNeighborsClassifier(n_neighbors=4)
    clf.fit(ivecs, y_train)

    # For test data, need to extract i-vectors first
    print("Loading test MFCCs...")
    # mfcc_val = load_mfccs(df_val)

    val_stats = []
    for mfcc in tqdm(mfccs_val, desc="Computing test stats"):
        N, F = compute_bw_stats(mfcc, ubm)
        val_stats.append((N, F))

    val_ivecs = []
    for (N, F) in tqdm(val_stats, desc="Extracting test i-vectors"):
        ivec = extract_ivec(N, F, ubm, T)
        val_ivecs.append(ivec)
    val_ivecs = np.vstack(val_ivecs)

    # Evaluate on i-vectors
    print("Evaluating classifier...")
    preds = clf.predict(val_ivecs)
    print(classification_report(y_val, preds, target_names=["M_20s", "F_20s", "M_50s", "F_50s"]))

if __name__ == "__main__":
    main()

Loading Splits...
Loading MFCCs...
Loading labels...
Training UBM...
Training T-matrix...


Computing Baum-Welch stats: 100%|██████████| 7200/7200 [00:09<00:00, 759.56it/s]
T-matrix EM iter 1: 100%|██████████| 7200/7200 [05:31<00:00, 21.71it/s]
T-matrix EM iter 2: 100%|██████████| 7200/7200 [05:25<00:00, 22.12it/s]
T-matrix EM iter 3: 100%|██████████| 7200/7200 [05:25<00:00, 22.14it/s]
T-matrix EM iter 4: 100%|██████████| 7200/7200 [05:26<00:00, 22.07it/s]
T-matrix EM iter 5: 100%|██████████| 7200/7200 [05:35<00:00, 21.45it/s]


Extracting i-vectors...


Extracting i-vectors: 100%|██████████| 7200/7200 [03:05<00:00, 38.81it/s]


Training classifier...
Loading test MFCCs...


Computing test stats: 100%|██████████| 800/800 [00:01<00:00, 675.73it/s]
Extracting test i-vectors: 100%|██████████| 800/800 [00:22<00:00, 35.87it/s]


Evaluating classifier...
              precision    recall  f1-score   support

       M_20s       0.62      0.69      0.65       200
       F_20s       0.71      0.81      0.75       200
       M_50s       0.69      0.60      0.64       200
       F_50s       0.76      0.65      0.70       200

    accuracy                           0.69       800
   macro avg       0.69      0.69      0.69       800
weighted avg       0.69      0.69      0.69       800

