<a href="https://colab.research.google.com/github/Bosy-Ayman/DSAI-456-Speech/blob/main/assignment-solutions/assignment%204/Assignment4_Speech.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **GMM for Speaker Identification – Implementation Assignment**

##  Objectives
The goal of this assignment is to:
- Implement the **Expectation-Maximization (EM) Algorithm**
- Build **Gaussian Mixture Models (GMMs)** for speaker recognition
- Gain skills in iterative algorithm development and probabilistic modeling

---

## Dataset
We will use the **VCTK Corpus** from Kaggle:
- Contains speech recordings from **110 speakers**
- Each speaker reads about **400 English sentences**
- Single-sentence samples → suitable for classification where we train **one GMM per speaker**
https://www.kaggle.com/datasets/pratt3000/vctk-corpus
---

##  Task Requirements

###  Download Dataset
- Download the **VCTK Corpus** from Kaggle
- Organize recordings by speaker (each class = one speaker)

---

###  Feature Extraction
Extract useful audio features, such as:
- **MFCCs**
- **Mel Filter Banks**

> MFCCs are recommended for speaker recognition

---

###   Implement EM Algorithm
You **must implement EM yourself**
( Not allowed: sklearn GaussianMixture or other libraries)

Follow **slide 22 in Lecture 4**, including:
- Initialize GMM parameters:  
  - Mixing weights (πₖ)
  - Means (μₖ)
  - Covariance matrices (Σₖ)
- Iteratively update using:
  - **E-Step** → compute γ(zₙₖ)
  - **M-Step** → update parameters based on γ values
- Stop when convergence criteria is reached

---

###  Train One GMM per Speaker
- Each speaker has **multiple utterances** → combine extracted features
- Train a **separate GMM** for each speaker
- Store estimated parameters for testing

---

###   Speaker Prediction (Evaluation)
For each test audio sample:
1. Extract MFCC features
2. Compute the **log likelihood** of the sample under each speaker’s GMM
3. **Classification** → choose the speaker with the **highest likelihood**

---

##  Final Deliverables
- Code for:
  - Feature extraction
  - EM for GMM training
  - Speaker identification testing
- Accuracy evaluation
- Short explanation/report on results

---

##  Bonus (Optional)
- Experiment with different number of mixtures (e.g., K = 8, 16, 32)
- Compare MFCCs vs. Mel filter bank performance

---


In [None]:
import os
import numpy as np
import librosa
from sklearn.cluster import KMeans
from tqdm import tqdm
import math
import kagglehub


In [None]:
path = kagglehub.dataset_download("pratt3000/vctk-corpus")

print("Path to dataset files:", path)

Using Colab cache for faster access to the 'vctk-corpus' dataset.
Path to dataset files: /kaggle/input/vctk-corpus


In [None]:
KAGGLE_DOWNLOAD_PATH = "/kaggle/input/vctk-corpus"
DATASET_PATH = os.path.join(KAGGLE_DOWNLOAD_PATH, "VCTK-Corpus", "VCTK-Corpus", "wav48")

In [None]:
N_SPEAKERS = 10
N_MFCC = 13
N_MIXTURES = 8  # Number of GMM components (K)
MAX_ITER = 20
TOLERANCE = 1e-4

# **Steps**
*Audio → MFCCs → Train GMM per Speaker → Log-Likelihood Test → Prediction*

# GMM Log-Likelihood

In [None]:
def multivariate_gaussian_log_pdf(X, mean, cov):

    D = X.shape[1]
    diff = X - mean
    try:
        sign, logdet = np.linalg.slogdet(cov)
    except np.linalg.LinAlgError:
        cov_reg = cov + np.eye(D) * 1e-6
        sign, logdet = np.linalg.slogdet(cov_reg)

    if sign <= 0:
        return -np.inf * np.ones(X.shape[0])

    try:
        inv_cov = np.linalg.inv(cov)
    except np.linalg.LinAlgError:
        inv_cov = np.linalg.inv(cov + np.eye(D) * 1e-6)

    mahalanobis = -0.5 * np.sum(diff @ inv_cov * diff, axis=1)

    log_pdf = -0.5 * D * np.log(2 * np.pi) - 0.5 * logdet + mahalanobis
    return log_pdf


In [None]:

def compute_gmm_log_likelihood(X, weights, means, covs):

    K = len(weights)
    N = X.shape[0]
    log_probs = np.zeros((N, K))

    for k in range(K):
        log_probs[:, k] = np.log(weights[k] + 1e-10) + multivariate_gaussian_log_pdf(X, means[k], covs[k])

    max_log_probs = np.max(log_probs, axis=1, keepdims=True)
    log_sum_exp = max_log_probs + np.log(np.sum(np.exp(log_probs - max_log_probs), axis=1, keepdims=True))

    total_log_likelihood = np.sum(log_sum_exp)
    return total_log_likelihood

# Feature Extraction (MFCCs)


In [None]:
def extract_features(audio_path, n_mfcc=N_MFCC):
        y, sr = librosa.load(audio_path, sr=None)
        mfccs = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=n_mfcc)
        mfccs = mfccs.T
        return mfccs


#[2] Collect Speaker Data


Collects all MFCCs for the specified number of speakers.

**Returns:** {speaker_id: combined_mfcc_data}


In [None]:

def collect_speaker_data(dataset_path, n_speakers):

    speaker_data = {}
    speaker_dirs = sorted([d for d in os.listdir(dataset_path) if os.path.isdir(os.path.join(dataset_path, d))])

    print(f"Collecting data for {min(n_speakers, len(speaker_dirs))} speakers...")

    for speaker_id in tqdm(speaker_dirs[:n_speakers], desc="Collecting features"):
        speaker_path = os.path.join(dataset_path, speaker_id)
        all_mfccs = []

        for filename in os.listdir(speaker_path):
            if filename.endswith(".wav"):
                audio_path = os.path.join(speaker_path, filename)
                mfccs = extract_features(audio_path, n_mfcc=N_MFCC)
                if mfccs is not None and mfccs.size > 0:
                    all_mfccs.append(mfccs)

        if all_mfccs:
            speaker_data[speaker_id] = np.vstack(all_mfccs)
        else:
            print(f"Warning: No valid data found for speaker {speaker_id}")

    return speaker_data

# Initialization Using K-Means

Initializes GMM parameters using KMeans clustering.
X: (N_samples, N_features)
K: Number of mixtures


In [None]:
def initialize_gmm_parameters(X, K):

    N, D = X.shape

    # 1. Initialize Means (mu_k)
    kmeans = KMeans(n_clusters=K, random_state=42, n_init=10, max_iter=100).fit(X)
    initial_means = kmeans.cluster_centers_

    # 2. Initialize Covariances
    global_cov = np.cov(X, rowvar=False)
    initial_covs = np.array([global_cov] * K)

    # 3. Initialize Weights (pi_k)
    counts = np.bincount(kmeans.labels_, minlength=K)
    initial_weights = counts / N

    initial_weights = np.maximum(initial_weights, 1e-10)
    initial_weights /= np.sum(initial_weights)

    return initial_weights, initial_means, initial_covs

# EM Algorithm



*   **E-Step (Expectation):** We guess which Gaussian each MFCC frame belongs to.

= "Probability that point i belongs to Gaussian k"

*  **M-Step (Maximization):**

  We update the Gaussian parameters:
  * Update means → average MFCCs assigned
  *  Update covariances → variation
  * Update weights → how many belong to each Gaussian
  





In [None]:
def em_gmm(X, K, max_iter=MAX_ITER, tolerance=TOLERANCE):

    N, D = X.shape

    weights, means, covs = initialize_gmm_parameters(X, K)
    log_likelihood_history = []

    #-------------- E-STEP ---------------
    for i in tqdm(range(max_iter)):
        log_probs = np.zeros((N, K))
        for k in range(K):

            log_probs[:, k] = np.log(weights[k]) + multivariate_gaussian_log_pdf(X, means[k], covs[k])

        max_log_probs = np.max(log_probs, axis=1, keepdims=True)
        log_denominator = max_log_probs + np.log(np.sum(np.exp(log_probs - max_log_probs), axis=1, keepdims=True))
        log_gamma = log_probs - log_denominator
        gamma = np.exp(log_gamma)

        # ---------------- M-STEP -------------
        N_k = np.sum(gamma, axis=0)

        #1. Update Weights (pi_k)
        new_weights = N_k / N

        #2. Update Covariances (Sigma_k)
        new_means = (gamma.T @ X) / N_k[:, np.newaxis] # (K, D)

        #2. Update Means (mu_k)
        new_covs = np.zeros((K, D, D))
        for k in range(K):
            diff = X - new_means[k]
            new_covs[k] = (diff.T * gamma[:, k]) @ diff / N_k[k]
            new_covs[k] += np.eye(D) * 1e-6

        weights, means, covs = new_weights, new_means, new_covs
        current_log_likelihood = np.sum(log_denominator)
        log_likelihood_history.append(current_log_likelihood)

        if i > 0:
            change = current_log_likelihood - log_likelihood_history[-2]
            if abs(change) < tolerance:
                print(f"EM converged at iteration {i} (Change: {change:.6f}).")
                break

    return weights, means, covs, log_likelihood_history

# Trains a separate GMM for each speaker.

Returns: {speaker_id: (weights, means, covs)}


In [None]:
def train_gmm_models(speaker_data, K=N_MIXTURES):
    gmm_models = {}
    print(f"\nTraining GMMs (K={K}) for {len(speaker_data)} speakers...")

    for speaker_id, X in speaker_data.items():
        print(f"Training GMM for speaker {speaker_id} ({X.shape[0]} frames)...")
        if X.shape[0] < K:
            print(f"Skipping speaker {speaker_id}: not enough data frames.")
            continue

        weights, means, covs, _ = em_gmm(X, K)
        gmm_models[speaker_id] = (weights, means, covs)

    return gmm_models

## Speaker Prediction (Evaluation)

1. Compute log likelihood under each speaker's GMM.
2. Choose the speaker with the maximum log likelihood.

In [None]:

def predict_speaker(audio_features, trained_gmms):
    best_log_likelihood = -np.inf
    predicted_speaker = None

    for speaker_id, (weights, means, covs) in trained_gmms.items():
        log_likelihood = compute_gmm_log_likelihood(audio_features, weights, means, covs)

        if log_likelihood > best_log_likelihood:
            best_log_likelihood = log_likelihood
            predicted_speaker = speaker_id

    return predicted_speaker


# Evaluation

In [None]:

def evaluate_performance(test_data, trained_gmms):
    correct_predictions = 0
    total_samples = 0

    speaker_ids = list(test_data.keys())

    print("\n--- Evaluation Phase ---")
    for true_speaker_id in tqdm(speaker_ids, desc="Evaluating Speakers"):

        test_features = test_data.get(true_speaker_id)

        if test_features is None or test_features.size == 0:
            continue

        # 1. Predict speaker for the test features
        predicted_speaker_id = predict_speaker(test_features, trained_gmms)

        # 2. Classification
        if predicted_speaker_id == true_speaker_id:
            correct_predictions += 1

        total_samples += 1

    accuracy = (correct_predictions / total_samples) if total_samples > 0 else 0.0

    return accuracy, total_samples


---

In [None]:

speaker_mfcc_data = collect_speaker_data(DATASET_PATH, N_SPEAKERS)


Collecting data for 10 speakers...


Collecting features: 100%|██████████| 10/10 [01:48<00:00, 10.87s/it]


In [None]:
train_data = {}
test_data = {}

for speaker_id, all_frames in speaker_mfcc_data.items():
        split_idx = int(0.8 * all_frames.shape[0])
        train_data[speaker_id] = all_frames[:split_idx]
        test_data[speaker_id] = all_frames[split_idx:]

In [None]:
trained_gmms = train_gmm_models(train_data, K=N_MIXTURES)



Training GMMs (K=8) for 10 speakers...
Training GMM for speaker p225 (74240 frames)...


100%|██████████| 20/20 [00:08<00:00,  2.32it/s]


Training GMM for speaker p226 (117072 frames)...


100%|██████████| 20/20 [00:07<00:00,  2.68it/s]


Training GMM for speaker p227 (108445 frames)...


100%|██████████| 20/20 [00:05<00:00,  3.87it/s]


Training GMM for speaker p228 (113811 frames)...


100%|██████████| 20/20 [00:07<00:00,  2.78it/s]


Training GMM for speaker p229 (91193 frames)...


100%|██████████| 20/20 [00:04<00:00,  4.45it/s]


Training GMM for speaker p230 (115446 frames)...


100%|██████████| 20/20 [00:05<00:00,  3.57it/s]


Training GMM for speaker p231 (90292 frames)...


100%|██████████| 20/20 [00:05<00:00,  3.58it/s]


Training GMM for speaker p232 (94600 frames)...


100%|██████████| 20/20 [00:04<00:00,  4.15it/s]


Training GMM for speaker p233 (101736 frames)...


100%|██████████| 20/20 [00:05<00:00,  3.33it/s]


Training GMM for speaker p234 (91596 frames)...


100%|██████████| 20/20 [00:04<00:00,  4.59it/s]


In [None]:
accuracy, total_speakers_tested = evaluate_performance(test_data, trained_gmms)



--- Evaluation Phase ---


Evaluating Speakers: 100%|██████████| 10/10 [00:04<00:00,  2.03it/s]


In [None]:
print(f"Total Speakers Used: {N_SPEAKERS}")
print(f"Number of MFCCs (D): {N_MFCC}")
print(f"Number of Mixtures (K): {N_MIXTURES}")
print(f"EM Max Iterations: {MAX_ITER}")
print(f"Total Speakers Tested: {total_speakers_tested}")
print(f"Correctly Identified Speakers: {int(accuracy * total_speakers_tested)}")
print(f"Speaker Identification Accuracy: {accuracy * 100:.2f}%")


Total Speakers Used: 10
Number of MFCCs (D): 13
Number of Mixtures (K): 8
EM Max Iterations: 20
Total Speakers Tested: 10
Correctly Identified Speakers: 10
Speaker Identification Accuracy: 100.00%
