# Monte Carlo Subsampling

## Setup

In [1]:
from __future__ import division
from __future__ import print_function
from os import path
from pathlib import Path
import os, glob, torch,torchaudio, re
from python_speech_features import delta
from python_speech_features import mfcc
import matplotlib.pyplot as plt
import numpy as np
import scipy.io.wavfile as wav
import sys
import speech_dtw.qbe as qbe
from transformers import WavLMModel
from sklearn.decomposition import PCA
import random
import statistics
from collections import Counter

sys.path.append("..")
sys.path.append(path.join("..", "utils"))

SAMPLE_RATE = 16000 
WAVLM_LAYER_INDEX = 6

device = "cpu"
model = WavLMModel.from_pretrained("microsoft/wavlm-base-plus").to(device).eval()

def cmvn(X):
    # X: [T, D] NumPy
    mu = X.mean(axis=0, keepdims=True)
    sd = X.std(axis=0, keepdims=True)
    return (X - mu) / (sd + 1e-8)

def getWavLMFeatures(file): #A function which extracts MFCCs features from a given audio file
    sig, rate = torchaudio.load(file) #Reads the audio file, extracting the sample rate and signal data (as an array)

    #Check if sampled as correct sampling rate, if not - resample
    if rate != SAMPLE_RATE:
        print("Resampling", file ,"at 16kHz.\n")
        sig = torchaudio.functional.resample(sig, rate, SAMPLE_RATE)

    #Extracts layer 6 features
    sig = sig.to(device)
    with torch.inference_mode():
        out = model(sig, output_hidden_states=True)
        features = out.hidden_states[WAVLM_LAYER_INDEX].squeeze(0)  # [T, D] torch

    #Convert to numpy
    features = features.numpy()

    #Apply CMVN
    features = cmvn(features) #Applies cepstral mean and variance normalization to features
    
    return features

def getMinimumCost(queryFile, templateFile, FeatureType):
    #Loading the features
    if FeatureType == "MFCCs":
        queryFeatures = getMFCCsFeatures(queryFile) #Extract features for query data
    elif FeatureType == "WavLM":
        queryFeatures = getWavLMFeatures(queryFile) #Extract features for query data
        
    templateFeatures = torch.load(templateFile) #Load the template's feature file
    templateFeatures = templateFeatures["features"].numpy() #Extract the features as numpy arrays

    #Make both feature sets 2D, float64, contiguous:
    queryFeatures = np.ascontiguousarray(queryFeatures, dtype=np.float64)
    templateFeatures = np.ascontiguousarray(templateFeatures, dtype=np.float64)

    distance = qbe.dtw_sweep_min(queryFeatures, templateFeatures) #Calculate the minimum sweeping DTW distance between the two feature sets

    return distance

  from .autonotebook import tqdm as notebook_tqdm


## Monte Carlo Function Definition

In [2]:
# ---------------- Speaker / template indexing ----------------

def label_from_filename(p: Path):
    """
    Extract the digit label from filename like '3_CAM001_05.pt' -> 3.
    Adjust if your naming is different.
    """
    parts = p.stem.split("_")
    return int(parts[0]) if parts and parts[0].isdigit() else None

def index_templates_by_folder(templateFolder: str):
    """
    Assumes layout:
      templateFolder/
        C01/
          0_*.pt
          1_*.pt
          ...
        C02/
          ...
    Returns:
      template_info: dict[path_str] -> (label:int, speaker_id:str)
      speakers: sorted list of unique speaker IDs (folder names)
    """
    template_info = {}
    speakers = set()

    root = Path(templateFolder)
    # one level of speaker folders: C01, C02, ...
    for speaker_dir in root.iterdir():
        if not speaker_dir.is_dir():
            continue
        spk = speaker_dir.name  # e.g., "C01"
        # collect all .pt files under this speaker
        for pt_path in speaker_dir.rglob("*.pt"):
            lab = label_from_filename(pt_path)
            if lab is None:
                continue
            template_info[str(pt_path)] = (lab, spk)
            speakers.add(spk)

    speakers = sorted(list(speakers))
    if not speakers:
        raise ValueError(f"No speakers found under {templateFolder}. Check the path and file names.")
    return template_info, speakers

# ---------------- Core evaluation ----------------

def evaluate_with_template_subset(testFolder: str, subset_templates: dict, ks, feature_name="WavLM"):
    correct = {k: 0 for k in ks}
    total = 0
    class_counts = Counter()
    correct_per_class = {k: Counter() for k in ks}

    template_labels = {tp: lab for tp, (lab, _) in subset_templates.items()}

    for testFile in Path(testFolder).rglob("*.wav"):
        parts = testFile.stem.split("_")
        prefix = parts[0]
        true_label = int(prefix) if prefix.isdigit() else "No Number"
        class_counts[true_label] += 1
        total += 1

        distances = []
        for temp_path, lab in template_labels.items():
            dist = getMinimumCost(str(testFile), temp_path, feature_name)
            distances.append((dist, lab))

        if not distances:
            continue

        distances.sort(key=lambda x: x[0])

        for k in ks:
            if k <= 0:
                continue
            knn = distances[:k]
            labels = [lab for d, lab in knn]
            predicted_label = Counter(labels).most_common(1)[0][0]
            if predicted_label == true_label:
                correct[k] += 1
                correct_per_class[k][true_label] += 1

    results = {}
    for k in ks:
        accuracy = (correct[k] / total) * 100 if total > 0 else 0.0
        print(f"\n[Trial] Classification Results for k={k}:")
        print("Class\tCounts\tCorrect per Class")
        classes = sorted([c for c in class_counts if isinstance(c, int)]) + (["No Number"] if "No Number" in class_counts else [])
        for cls in classes:
            print(f"{cls}\t{class_counts.get(cls, 0)}\t{correct_per_class[k].get(cls, 0)}")
        results[k] = {
            "accuracy": accuracy,
            "class_counts": dict(class_counts),
            "correct_per_class": dict(correct_per_class[k])
        }
    return results

# ---------------- Monte Carlo over speaker folders ----------------

def monte_carlo_speaker_subsampling(
    testFolder: str,
    templateFolder: str,
    ks=(1,3,5),
    speaker_counts=(2,4,6,8),
    n_trials=4,
    feature_name="WavLM",
    base_seed=42
):
    """
    Randomly sample m speaker folders (e.g., C01, C02, ...), use only their templates,
    evaluate on the fixed test set, repeat n_trials times. Summarize mean±stdev per k.
    """
    template_info, all_speakers = index_templates_by_folder(templateFolder)

    out = {}
    for m in speaker_counts:
        m_eff = min(m, len(all_speakers))
        print(f"\n=== Monte Carlo: using {m_eff} speaker(s) (requested {m}) ===")
        trial_records = []

        for trial_idx in range(n_trials):
            seed = base_seed + (m * 1000) + trial_idx
            rng = random.Random(seed)
            chosen_speakers = rng.sample(all_speakers, m_eff)

            subset = {tp: (lab, spk)
                      for tp, (lab, spk) in template_info.items()
                      if spk in chosen_speakers}

            print(f"\n--- Trial {trial_idx+1}/{n_trials} | m={m_eff}, seed={seed} ---")
            print(f"Speakers: {chosen_speakers}")

            results = evaluate_with_template_subset(
                testFolder=testFolder,
                subset_templates=subset,
                ks=ks,
                feature_name=feature_name
            )

            acc_by_k = {k: results[k]["accuracy"] for k in ks}
            trial_records.append({
                "seed": seed,
                "speakers": chosen_speakers,
                "acc_by_k": acc_by_k
            })

        # Summaries
        summary = {}
        for k in ks:
            series = [t["acc_by_k"][k] for t in trial_records]
            mean = statistics.mean(series) if series else 0.0
            stdev = statistics.pstdev(series) if len(series) > 1 else 0.0
            summary[k] = {"mean": mean, "stdev": stdev}

        print(f"\n>>> Summary for m={m_eff} speakers over {n_trials} trials:")
        for k in ks:
            s = summary[k]
            print(f"  k={k}: mean={s['mean']:.2f}%  stdev={s['stdev']:.2f}%")

        out[m_eff] = {"trials": trial_records, "summary": summary}

    return out

Usage:

In [3]:
ks_to_test = [3]
results = monte_carlo_speaker_subsampling(
    testFolder="TestingData/OnlyNumbers/English",
    templateFolder="TrainingData/TrainingFeatures/WavLMBase+/English/",
    ks=ks_to_test,
    speaker_counts=(1,2,3,4,5,6,7,8,9),
    n_trials=4,
    feature_name="WavLM", 
    base_seed=42
)


=== Monte Carlo: using 1 speaker(s) (requested 1) ===

--- Trial 1/4 | m=1, seed=1042 ---
Speakers: ['C04']





[Trial] Classification Results for k=3:
Class	Counts	Correct per Class
0	9	1
1	10	0
2	9	4
3	10	0
4	12	0
5	9	0
6	10	0
7	12	8
8	10	8
9	11	7

--- Trial 2/4 | m=1, seed=1043 ---
Speakers: ['C10']

[Trial] Classification Results for k=3:
Class	Counts	Correct per Class
0	9	7
1	10	0
2	9	0
3	10	3
4	12	10
5	9	8
6	10	6
7	12	3
8	10	6
9	11	3

--- Trial 3/4 | m=1, seed=1044 ---
Speakers: ['C04']

[Trial] Classification Results for k=3:
Class	Counts	Correct per Class
0	9	1
1	10	0
2	9	4
3	10	0
4	12	0
5	9	0
6	10	0
7	12	8
8	10	8
9	11	7

--- Trial 4/4 | m=1, seed=1045 ---
Speakers: ['C01']

[Trial] Classification Results for k=3:
Class	Counts	Correct per Class
0	9	7
1	10	9
2	9	3
3	10	4
4	12	6
5	9	3
6	10	7
7	12	1
8	10	5
9	11	7

>>> Summary for m=1 speakers over 4 trials:
  k=3: mean=37.75%  stdev=10.50%

=== Monte Carlo: using 2 speaker(s) (requested 2) ===

--- Trial 1/4 | m=2, seed=2042 ---
Speakers: ['C06', 'C04']

[Trial] Classification Results for k=3:
Class	Counts	Correct per Class
0	9	1
1	10	3
2	

Note, that in the report calculations, the standard deviation have been recalculated. This code outputs the population standard deviation, whereas the sample standard deviation is more appropriate as there are only four samples.