
# Personalized Contrastive Learning for Dysarthric Keyword Recognition

## Contrastive Learning Development

This notebook is meant to contain the code used in James Dana's final project for the calss DATA-0297 in Spring 2025.


## Goals

In [None]:
from google.colab import drive

#from datasets import Dataset
from transformers import Wav2Vec2Model, Wav2Vec2Processor
import torch
import torch
import torch.nn as nn
from torch.utils.data import Dataset, DataLoader

import librosa

import pandas as pd
import numpy as np

import time
from tqdm import tqdm

import matplotlib.pyplot as plt
import seaborn as sns

from sklearn.metrics import confusion_matrix, classification_report
from sklearn.neighbors import KNeighborsClassifier
from sklearn.metrics import accuracy_score, f1_score

In [None]:
# Mount Drive.
drive.mount('/content/drive')

# Load only the audio files we need.
df_torgo = pd.read_parquet("/content/drive/MyDrive/torgo_data/flat_torgo.parquet")

# Load the train/holdout metadata.
df_train = pd.read_csv("/content/drive/MyDrive/torgo_data/train_data.csv")
df_test_gen = pd.read_csv("/content/drive/MyDrive/torgo_data/holdout_gen.csv")
df_test_per = pd.read_csv("/content/drive/MyDrive/torgo_data/holdout_per.csv")
df_triplets = pd.read_csv("/content/drive/MyDrive/torgo_data/triplet_data.csv")

Mounted at /content/drive


In [None]:
df_train.head()

Unnamed: 0,path,speaker_id,gender,speech_status,transcription,speech_id
0,F03_1_arrayMic_0201.wav,F03,female,dysarthria,air,1_0201
1,F03_1_headMic_0201.wav,F03,female,dysarthria,air,1_0201
2,F03_1_arrayMic_0038.wav,F03,female,dysarthria,air,1_0038
3,F03_1_headMic_0038.wav,F03,female,dysarthria,air,1_0038
4,F03_1_arrayMic_0013.wav,F03,female,dysarthria,knew,1_0013


In [None]:
# triple (ha) check there's no overlap between paths.
triplet_paths = set(df_triplets["anchor"].tolist() + df_triplets["positive"].tolist() + df_triplets["negative"].tolist())
train_paths = set(df_train["path"].tolist())
gen_paths = set(df_test_gen["path"].tolist())
per_paths = set(df_test_per["path"].tolist())

# assert there's full overlap between triplet_paths and train_paths.
assert triplet_paths.intersection(train_paths) == triplet_paths

# assert there's no overlap between train, gen, and per_paths
assert train_paths.intersection(gen_paths) == set()
assert train_paths.intersection(per_paths) == set()
assert gen_paths.intersection(per_paths) == set()

In [None]:
import numpy as np

array_lengths = df_torgo["audio_array_length"].values

cutoff_90 = np.percentile(array_lengths, 90)
cutoff_95 = np.percentile(array_lengths, 95)
cutoff_98 = np.percentile(array_lengths, 98)

print(f"90% of samples <= {cutoff_90} samples ({cutoff_90/16000:.2f} sec)")
print(f"95% of samples <= {cutoff_95} samples ({cutoff_95/16000:.2f} sec)")
print(f"98% of samples <= {cutoff_98} samples ({cutoff_98/16000:.2f} sec)")

90% of samples <= 52800.0 samples (3.30 sec)
95% of samples <= 57600.0 samples (3.60 sec)
98% of samples <= 62760.0 samples (3.92 sec)


To optimize handling of files, it's good to set a reasonable cuttoff/padding that covers as many entries as reasonably possible. Given these results, a cutoff at 60000 is reasonable.

In [None]:
np.mean(df_torgo["audio_array_length"].values<=60000)

np.float64(0.9664948453608248)

#### Define: Triplet Loss Module, TripletAudioDataset

In [None]:
from torch import nn

processor = Wav2Vec2Processor.from_pretrained("facebook/wav2vec2-base")

class TripletLoss(nn.Module):
    def __init__(self, margin=1):
        super(TripletLoss, self).__init__()
        self.margin = margin
        self.loss_fn = nn.TripletMarginLoss(margin=self.margin, p=2)

    def forward(self, anchor, positive, negative):
        return self.loss_fn(anchor, positive, negative)

# Define a Triplet Dataset.
class TripletAudioDataset(torch.utils.data.Dataset):
    def __init__(self, df_triplets, df_torgo, target_length=60000):
        # store triplets information
        self.df_triplets = df_triplets
        self.target_length = target_length

        # Build a fast lookup: filename -> audio array
        self.audio_lookup = {
            row["audio_path"]: torch.tensor(row["audio_array"], dtype=torch.float32)
            for idx, row in df_torgo.iterrows()
        }

    def __len__(self):
        return len(self.df_triplets)

    def _process_audio(self, audio):
        length = audio.shape[0]
        if length < self.target_length:
            # Pad with zeros
            pad_amount = self.target_length - length
            audio = torch.nn.functional.pad(audio, (0, pad_amount))
        elif length > self.target_length:
            # Random crop
            start_idx = torch.randint(0, length - self.target_length + 1, (1,)).item()
            audio = audio[start_idx:start_idx + self.target_length]
        return audio

    def __getitem__(self, idx):
        row = self.df_triplets.iloc[idx]
        anchor_audio = self._process_audio(self.audio_lookup[row["anchor"]])
        positive_audio = self._process_audio(self.audio_lookup[row["positive"]])
        negative_audio = self._process_audio(self.audio_lookup[row["negative"]])

        return {
            "anchor": anchor_audio,
            "positive": positive_audio,
            "negative": negative_audio
        }

import torch

The secret `HF_TOKEN` does not exist in your Colab secrets.
To authenticate with the Hugging Face Hub, create a token in your settings tab (https://huggingface.co/settings/tokens), set it as secret in your Google Colab and restart your session.
You will be able to reuse this secret in all of your notebooks.
Please note that authentication is recommended but still optional to access public models or datasets.


preprocessor_config.json:   0%|          | 0.00/159 [00:00<?, ?B/s]

tokenizer_config.json:   0%|          | 0.00/163 [00:00<?, ?B/s]

config.json:   0%|          | 0.00/1.84k [00:00<?, ?B/s]



vocab.json:   0%|          | 0.00/291 [00:00<?, ?B/s]

special_tokens_map.json:   0%|          | 0.00/85.0 [00:00<?, ?B/s]

In [None]:
import random

In [None]:
def run_model_no_curriculum(params, seedval):
    # set seed.
    torch.manual_seed(seedval)
    random.seed(seedval)
    np.random.seed(seedval)
    if torch.cuda.is_available():
      torch.cuda.manual_seed_all(seedval)

    print("Running Model For:")
    print(params)
    print()
    # Define the Data Subset
    df_subset = df_triplets[df_triplets['triplet_type'].isin(params['triplet_types'])]
    print(f"Size of Input Data: {df_subset.shape}")

    # Initialize Dataset
    dataset_trip = TripletAudioDataset(df_subset, df_torgo, target_length=60000)
    dataloader = torch.utils.data.DataLoader(dataset_trip, batch_size=16, shuffle=True)

    # Check if GPU is available, otherwise use CPU
    device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

    # Load the model and move it to the GPU (if available)
    model = Wav2Vec2Model.from_pretrained("facebook/wav2vec2-base").to(device)

    # Set Up Optimizer + Loss function
    optimizer = torch.optim.Adam(model.parameters(), lr=params['learning_rate'])
    loss_fn = TripletLoss(margin=params['margin'])

    print("#########################\nTraining Model ...")
    loss_list = []
    start_time = time.time()
    # Run Training Procedure.
    for epoch in range(params['epochs']):
      model.train()
      running_loss = 0.0
      batch_iterator = tqdm(dataloader, desc=f"Epoch [{epoch + 1}/{params['epochs']}]", leave=False)
      for batch in batch_iterator:
          batch = {key: value.to(device) for key, value in batch.items()}
          optimizer.zero_grad()

          # Forward Pass: Get embeddings from the model
          if params['compress']:
            print("compressing!!!")
            anchor_embeddings = model(input_values=batch['anchor']).last_hidden_state.mean(dim=1)
            positive_embeddings = model(input_values=batch['positive']).last_hidden_state.mean(dim=1)
            negative_embeddings = model(input_values=batch['negative']).last_hidden_state.mean(dim=1)
          else:
            anchor_embeddings = model(input_values=batch['anchor']).last_hidden_state
            positive_embeddings = model(input_values=batch['positive']).last_hidden_state
            negative_embeddings = model(input_values=batch['negative']).last_hidden_state

          # Loss: Compute Triplet loss
          loss = loss_fn(anchor_embeddings, positive_embeddings, negative_embeddings)

          # Back : Backpropagate
          loss.backward()
          optimizer.step()

          running_loss += loss.item()
          # Update tqdm description dynamically (optional)
          batch_iterator.set_postfix(loss=loss.item())

      avg_loss = running_loss / len(dataloader)
      loss_list.append(avg_loss)
      print(f"Epoch [{epoch + 1}/{params['epochs']}], Loss: {avg_loss:.4f}")
    end_time = time.time()
    total_time = end_time - start_time
    print(f"\n Evaluated {len(df_subset) * params['epochs']} triplets files in {total_time:.2f} seconds.")
    print(f"Avg time per triplet: {total_time / (len(df_subset) * params['epochs']):.4f} seconds")

    print(f"Loss Over Time: {loss_list}")

    print("#########################\nEvaluating Model ...")
    model.eval()  # Set the model to evaluation mode
    batch_size = 32  # Set batch size depending on your available GPU memory
    start_time = time.time()

    all_embeddings = []
    batch_speech = []
    path_list = []

    for ii in range(len(df_torgo)): # iterate over parquet
      speech, path = df_torgo.loc[ii,'audio_array'], df_torgo.loc[ii,'audio_path']
      batch_speech.append(speech)
      path_list.append(path)
      if len(batch_speech) == batch_size or ii == len(df_torgo) - 1:
        #print(f"Processing batch {ii // batch_size + 1}/{len(df_torgo) // batch_size + 1}")
        inputs = processor(batch_speech, sampling_rate=16000, max_length=60000,
                          return_tensors="pt", padding=True, truncation=True)

        # Move inputs to GPU
        inputs = {key: value.to(device) for key, value in inputs.items()}

        # Process embeddings
        with torch.no_grad():
            hidden = model(**inputs).last_hidden_state
        pooled = hidden.mean(dim=1).squeeze()
        all_embeddings.append(pooled)

        # Reset the batch.
        batch_speech = []

    end_time = time.time()
    total_time = end_time - start_time
    print(f"\nEmbedded {len(all_embeddings) * batch_size}ish audio files in {total_time:.2f} seconds.")
    print(f"Avg time per file: {total_time / (len(all_embeddings) * batch_size):.4f} seconds")

    # Set up matrix, dataframe
    embedding_matrix_cpu_np = np.vstack([embedding.cpu().numpy() for embedding in all_embeddings])
    df_embeds = pd.DataFrame({
        'path': path_list,
        'embedding': embedding_matrix_cpu_np.tolist()
    })
    return df_embeds

def eval_embeddings(df_param,param_dict):
    # 0. Load Metadata
    df_train = pd.read_csv("/content/drive/MyDrive/torgo_data/train_data.csv")
    df_test_gen = pd.read_csv("/content/drive/MyDrive/torgo_data/holdout_gen.csv")
    df_test_per = pd.read_csv("/content/drive/MyDrive/torgo_data/holdout_per.csv")
    df_train = pd.merge(df_train, df_param, on='path', how='left')
    df_test_gen = pd.merge(df_test_gen, df_param, on='path', how='left')
    df_test_per = pd.merge(df_test_per, df_param, on='path', how='left')

    # 1. Set up x/y for eval, training
    x_tr = np.vstack(df_train['embedding'].values)  # Stack all embeddings into a matrix (shape: n_samples, embedding_dim)
    y_tr = df_train['transcription']
    x_gen = np.vstack(df_test_gen['embedding'].values)  # Stack all embeddings into a matrix (shape: n_samples, embedding_dim)
    y_gen = df_test_gen['transcription']
    x_per = np.vstack(df_test_per['embedding'].values)  # Stack all embeddings into a matrix (shape: n_samples, embedding_dim)
    y_per = df_test_per['transcription']

    # 2. fit to train.
    knn = KNeighborsClassifier(n_neighbors=8) # 9 words
    knn.fit(x_tr, y_tr)

    # 3. predict on train/gen/per
    y_hat_tr  = knn.predict(x_tr)
    y_hat_gen = knn.predict(x_gen)
    y_hat_per = knn.predict(x_per)

    # 4. get subsets
    y_tr_dys = y_tr[df_train['speech_status'] == 'dysarthria']
    y_tr_non_dys = y_tr[df_train['speech_status'] == 'healthy']
    y_gen_dys = y_gen[df_test_gen['speech_status'] == 'dysarthria']
    y_gen_non_dys = y_gen[df_test_gen['speech_status'] == 'healthy']
    y_per_dys = y_per[df_test_per['speech_status'] == 'dysarthria']
    y_per_non_dys = y_per[df_test_per['speech_status'] == 'healthy']
    y_hat_tr_dys = y_hat_tr[df_train['speech_status'] == 'dysarthria']
    y_hat_tr_non_dys = y_hat_tr[df_train['speech_status'] == 'healthy']
    y_hat_gen_dys = y_hat_gen[df_test_gen['speech_status'] == 'dysarthria']
    y_hat_gen_non_dys = y_hat_gen[df_test_gen['speech_status'] == 'healthy']
    y_hat_per_dys = y_hat_per[df_test_per['speech_status'] == 'dysarthria']
    y_hat_per_non_dys = y_hat_per[df_test_per['speech_status'] == 'healthy']

    # Get accuracies
    acc_tr = accuracy_score(y_tr, y_hat_tr)
    acc_tr_dys = accuracy_score(y_tr_dys, y_hat_tr_dys)
    acc_tr_non_dys = accuracy_score(y_tr_non_dys, y_hat_tr_non_dys)
    acc_gen = accuracy_score(y_gen, y_hat_gen)
    acc_gen_dys = accuracy_score(y_gen_dys, y_hat_gen_dys)
    acc_gen_non_dys = accuracy_score(y_gen_non_dys, y_hat_gen_non_dys)
    acc_per = accuracy_score(y_per, y_hat_per)
    acc_per_dys = accuracy_score(y_per_dys, y_hat_per_dys)
    acc_per_non_dys = accuracy_score(y_per_non_dys, y_hat_per_non_dys)

    f1_tr = f1_score(y_tr, y_hat_tr, average="weighted")
    f1_tr_dys = f1_score(y_tr_dys, y_hat_tr_dys, average="weighted")
    f1_tr_non_dys = f1_score(y_tr_non_dys, y_hat_tr_non_dys, average="weighted")
    f1_gen = f1_score(y_gen, y_hat_gen, average="weighted")
    f1_gen_dys = f1_score(y_gen_dys, y_hat_gen_dys, average="weighted")
    f1_gen_non_dys = f1_score(y_gen_non_dys, y_hat_gen_non_dys, average="weighted")
    f1_per = f1_score(y_per, y_hat_per, average="weighted")
    f1_per_dys = f1_score(y_per_dys, y_hat_per_dys, average="weighted")
    f1_per_non_dys = f1_score(y_per_non_dys, y_hat_per_non_dys, average="weighted")
    # print something i can paste into markdown to make a table.
    # Ideal structure:
    #
    print(param_dict)
    print("| Dataset      | Subset     | Accuracy     | F1-Score  |")
    print("|--------------|------------|--------------|-----------|")
    print(f"| **Train**    | All        | {round(acc_tr,4)}       | {round(f1_tr,4)}        |")
    print(f"|              | Dysarthria | {round(acc_tr_dys,4)}       | {round(f1_tr_dys,4)}   |")
    print(f"|              | Healthy    | {round(acc_tr_non_dys,4)}       | {round(f1_tr_non_dys,4)}     |")
    print(f"| **Hold-Gen** | All        | {round(acc_gen,4)}       | {round(f1_gen,4)}     |")
    print(f"|              | Dysarthria | {round(acc_gen_dys,4)}       | {round(f1_gen_dys,4)}     |")
    print(f"|              | Healthy    | {round(acc_gen_non_dys,4)}       | {round(f1_gen_non_dys,4)}     |")
    print(f"| **Hold-Per** | All        | {round(acc_per,4)}       | {round(f1_per,4)}     |")
    print(f"|              | Dysarthria | {round(acc_per_dys,4)}       | {round(f1_per_dys,4)}     |")
    print(f"|              | Healthy    | {round(acc_per_non_dys,4)}       | {round(f1_per_non_dys,4)}     |")


def eval_embeddings_cm(df_param,param_dict,itr_type):
    # 0. Load Metadata
    df_train = pd.read_csv("/content/drive/MyDrive/torgo_data/train_data.csv")
    df_test_gen = pd.read_csv("/content/drive/MyDrive/torgo_data/holdout_gen.csv")
    df_test_per = pd.read_csv("/content/drive/MyDrive/torgo_data/holdout_per.csv")
    df_train = pd.merge(df_train, df_param, on='path', how='left')
    df_test_gen = pd.merge(df_test_gen, df_param, on='path', how='left')
    df_test_per = pd.merge(df_test_per, df_param, on='path', how='left')

    # 1. Set up x/y for eval, training
    x_tr = np.vstack(df_train['embedding'].values)  # Stack all embeddings into a matrix (shape: n_samples, embedding_dim)
    y_tr = df_train['transcription']
    x_gen = np.vstack(df_test_gen['embedding'].values)  # Stack all embeddings into a matrix (shape: n_samples, embedding_dim)
    y_gen = df_test_gen['transcription']
    x_per = np.vstack(df_test_per['embedding'].values)  # Stack all embeddings into a matrix (shape: n_samples, embedding_dim)
    y_per = df_test_per['transcription']

    # 2. fit a KNN to classify the embeddings.
    knn = KNeighborsClassifier(n_neighbors=8) # 8 words
    knn.fit(x_tr, y_tr)

    num_classes = len(np.unique(y_tr))

    # 2. fit to train.
    knn = KNeighborsClassifier(n_neighbors=num_classes) # 9 words
    knn.fit(x_tr, y_tr)

    # 3. predict on train/gen/per
    y_hat_tr  = knn.predict(x_tr)
    y_hat_tr_dys = y_hat_tr[df_train['speech_status'] == 'dysarthria']
    y_hat_tr_non_dys = y_hat_tr[df_train['speech_status'] == 'healthy']
    y_tr_dys = y_tr[df_train['speech_status'] == 'dysarthria']
    y_tr_non_dys = y_tr[df_train['speech_status'] == 'healthy']
    y_hat_gen = knn.predict(x_gen)
    y_hat_gen_dys = y_hat_gen[df_test_gen['speech_status'] == 'dysarthria']
    y_hat_gen_non_dys = y_hat_gen[df_test_gen['speech_status'] == 'healthy']
    y_gen_dys = y_gen[df_test_gen['speech_status'] == 'dysarthria']
    y_gen_non_dys = y_gen[df_test_gen['speech_status'] == 'healthy']
    y_hat_per = knn.predict(x_per)
    y_hat_per_dys = y_hat_per[df_test_per['speech_status'] == 'dysarthria']
    y_hat_per_non_dys = y_hat_per[df_test_per['speech_status'] == 'healthy']
    y_per_dys = y_per[df_test_per['speech_status'] == 'dysarthria']
    y_per_non_dys = y_per[df_test_per['speech_status'] == 'healthy']

    df_results_tr_all = pd.DataFrame({
        'y_true': y_tr,
        'y_hat': y_hat_tr,
        'status': 'all',
        'd_type': 'train'
    })
    df_results_tr_dys = pd.DataFrame({
        'y_true': y_tr_dys,
        'y_hat': y_hat_tr_dys,
        'status': 'dysarthria',
        'd_type': 'train'
    })
    df_results_tr_non_dys = pd.DataFrame({
        'y_true': y_tr_non_dys,
        'y_hat': y_hat_tr_non_dys,
        'status': 'healthy',
        'd_type': 'train'
    })
    df_results_gen_all = pd.DataFrame({
        'y_true': y_gen,
        'y_hat': y_hat_gen,
        'status': 'all',
        'd_type': 'holdout-gen'
    })
    df_results_gen_dys = pd.DataFrame({
        'y_true': y_gen_dys,
        'y_hat': y_hat_gen_dys,
        'status': 'dysarthria',
        'd_type': 'holdout-gen'
    })
    df_results_gen_non_dys = pd.DataFrame({
        'y_true': y_gen_non_dys,
        'y_hat': y_hat_gen_non_dys,
        'status': 'healthy',
        'd_type': 'holdout-gen'
    })
    df_results_per_all = pd.DataFrame({
        'y_true': y_per,
        'y_hat': y_hat_per,
        'status': 'all',
        'd_type': 'holdout-per'
    })
    df_results_per_dys = pd.DataFrame({
        'y_true': y_per_dys,
        'y_hat': y_hat_per_dys,
        'status': 'dysarthria',
        'd_type': 'holdout-per'
    })
    df_results_per_non_dys = pd.DataFrame({
        'y_true': y_per_non_dys,
        'y_hat': y_hat_per_non_dys,
        'status': 'healthy',
        'd_type': 'holdout-per'
    })
    # concatentate
    df_concat = pd.concat([df_results_tr_all, df_results_tr_dys, df_results_tr_non_dys,
                           df_results_gen_all, df_results_gen_dys, df_results_gen_non_dys,
                           df_results_per_all, df_results_per_dys, df_results_per_non_dys])
    # Reshape for confusion matrix plot.
    df_concat['status'] = pd.Categorical(df_concat['status'], categories=['all','healthy', 'dysarthria'], ordered=True)
    df_concat['d_type'] = pd.Categorical(df_concat['d_type'], categories=['train', 'holdout-gen', 'holdout-per'], ordered=True)
    plot_classes = ["sip","air","yes","sigh","no","knew","slip","leak"]
    print(df_concat.head())
    def cmplot(x, y, **kwargs):
        cm = confusion_matrix(y,x,labels = plot_classes)
        cm_normalized = cm.astype('float') / cm.sum(axis=1)[:, np.newaxis]
        # plot confusion matrix
        cmap = sns.color_palette("Blues", as_cmap=True)
        plt.imshow(cm_normalized, interpolation='nearest', cmap=cmap)
        tick_marks = np.arange(len(plot_classes))
        plt.xticks(tick_marks, plot_classes, rotation=45, ha="right")
        plt.yticks(tick_marks, plot_classes)

    plt.figure(figsize=(15, 15))
    g = sns.FacetGrid(df_concat,row="status",col="d_type",sharex=False,sharey=False)
    g.map(cmplot,"y_hat","y_true")
    #g.set_titles("{col_name} data ({row_name})")
    plt.suptitle(f"Wav2Vec2 => KNN Model: Confusion Matrices\nContrastive Learning Model: Itr #{itr_type}")

    # set titles w/ scores
    for i, row_val in enumerate(g.row_names):
      for j, col_val in enumerate(g.col_names):
        ax = g.axes[i,j]
        df_facet = df_concat[(df_concat['status'] == row_val) & (df_concat['d_type'] == col_val)]
        acc = round(accuracy_score(df_facet['y_true'], df_facet['y_hat']),4)
        f1 = round(f1_score(df_facet['y_true'], df_facet['y_hat'], average='weighted'),4)
        title = f"{col_val} data ({row_val})\nAcc: {acc}, F1: {f1}"
        ax.set_title(title)
    plt.tight_layout()
    plt.savefig(f'/content/drive/MyDrive/torgo_data/CS_M1_I{itr_type}_heatmap.png', dpi=300)
    plt.show()

#eval_embeddings_cm(df_param,param_dict,"1")


In [None]:
param_dict = {
    'triplet_types':['1s', '1d', '2', '3s', '3d', '4s', '4d'],
    'margin':.5,
    'learning_rate':1e-5,
    'epochs':10,
    'compress':True
}
for ii in range(3):
  seedval = [100,200,300][ii]
  itr_str = str(ii+1)
  df_param = run_model_no_curriculum(param_dict,seedval)
  eval_embeddings(df_param,param_dict)
  eval_embeddings_cm(df_param,param_dict,itr_str)

Running Model For:
{'triplet_types': ['1s', '1d', '2', '3s', '3d', '4s', '4d'], 'margin': 0.5, 'learning_rate': 1e-05, 'epochs': 10, 'compress': True}

Size of Input Data: (1232, 13)




#########################
Training Model ...


Epoch [1/10]:   0%|          | 0/77 [00:00<?, ?it/s]

compressing!!!




KeyboardInterrupt: 