# **Step 01: Setup Google Drive and Imports**

In [12]:
# Mount Google Drive
from google.colab import drive
drive.mount('/content/drive')


Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [24]:
import torch
import torch.nn as nn
import torch.nn.functional as F
import torch.optim as optim
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score
import pandas as pd
import json
import os
from pathlib import Path
from typing import Tuple
import numpy as np


In [14]:
# ============ CONFIG ============
DEVICE = torch.device("cuda" if torch.cuda.is_available() else "cpu")
EPOCHS = 15
BATCH_SIZE = 512
LR = 1e-3
FEATURES = ["voiced", "fricative", "nasal"]
NUM_LAYERS = 13
EMB_DIM = 768  # WavLM embedding size
HIDDEN_DIM = 200



# ============ PATHS =============

# Paths for dataset and results
ROOT_DIR = "/content/drive/MyDrive/00_RESEARCH_MSC_00/Final_Phonetic_Identification"
DATA_DIR = f"{ROOT_DIR}/01_layer_datasets"
MODEL_DIR = f"{ROOT_DIR}/02_Trained_models"
METRIC_DIR = f"{ROOT_DIR}/03_Evaluation_metrics_of_probes"

# Create output folders if they don't exist
import os
os.makedirs(MODEL_DIR, exist_ok=True)
os.makedirs(METRIC_DIR, exist_ok=True)

# **Step 02: Define the PyTorch MLP Model**

In [15]:
import torch
import torch.nn as nn

class MLPProbe(nn.Module):
    """
    One-hidden-layer MLP probe: 768 → 200 → 1 (with sigmoid)
    """
    def __init__(self, input_dim=EMB_DIM, hidden_dim=HIDDEN_DIM):
        super(MLPProbe, self).__init__()
        self.hidden = nn.Linear(input_dim, hidden_dim)
        self.output = nn.Linear(hidden_dim, 1)
        self.relu = nn.ReLU()
        self.sigmoid = nn.Sigmoid()  # Used only for inference

    def forward(self, x):
        x = self.hidden(x)
        x = self.relu(x)
        x = self.output(x)  # No sigmoid here for training (BCEWithLogits)
        return x



# **Step 3: Load Dataset for a Specific Layer + Feature for training and testing**

In [16]:
def load_dataset(layer: int, feature: str):
    """
    Load training and test data for a specific layer and feature.
    """
    train_df = pd.read_pickle(f"{DATA_DIR}/layer_{layer}_train.pkl")
    test_df = pd.read_pickle(f"{DATA_DIR}/layer_{layer}_test.pkl")

    X_train = torch.tensor(np.stack(train_df["embedding"]), dtype=torch.float32)
    X_test  = torch.tensor(np.stack(test_df["embedding"]), dtype=torch.float32)
    y_train = torch.tensor(train_df[feature].values, dtype=torch.float32).unsqueeze(1)
    y_test  = torch.tensor(test_df[feature].values, dtype=torch.float32).unsqueeze(1)

    return X_train.to(DEVICE), y_train.to(DEVICE), X_test.to(DEVICE), y_test.to(DEVICE)


# **Step 4: Train the MLP Model**

In [17]:
import torch
import torch.nn as nn
from torch.utils.data import DataLoader, TensorDataset

def train_model(model: nn.Module,
                X_train: torch.Tensor,
                y_train: torch.Tensor,
                epochs: int = 25,
                batch_size: int = 512,
                lr: float = 1e-3) -> nn.Module:
    """
    Trains a 1-hidden-layer MLP binary classifier using PyTorch.

    Parameters:
    -----------
    model : nn.Module
        The MLP model to be trained (input → hidden → output).

    X_train : torch.Tensor
        Training features (shape: [num_samples, 768]).

    y_train : torch.Tensor
        Binary labels (shape: [num_samples, 1]), values 0 or 1.

    epochs : int
        Number of training epochs (full passes over the dataset).

    batch_size : int
        Number of samples per mini-batch.

    lr : float
        Learning rate for the Adam optimizer.

    Returns:
    --------
    model : nn.Module
        The trained model with updated weights.
    """

    # Loss function: Binary cross-entropy with sigmoid built-in
    criterion = nn.BCEWithLogitsLoss()

    # Optimizer: Adam adjusts learning based on gradients
    optimizer = torch.optim.Adam(model.parameters(), lr=lr)

    # Combine input features and labels into a single dataset
    dataset = TensorDataset(X_train, y_train)

    # Load the dataset in mini-batches (shuffled for better learning)
    dataloader = DataLoader(dataset, batch_size=batch_size, shuffle=True)

    # Set model in training mode (important for things like dropout or batchnorm)
    model.train()

    # Enable gradient calculation (required during training)
    with torch.set_grad_enabled(True):
        # Loop over the number of full passes over the dataset
        for epoch in range(epochs):
            total_loss = 0  # Keep track of total loss in this epoch

            # Go through the dataset in mini-batches
            for xb, yb in dataloader:
                optimizer.zero_grad()        # Reset gradients from the last step
                logits = model(xb)           # Forward pass: get raw predictions (logits)
                loss = criterion(logits, yb) # Calculate loss between prediction and truth
                loss.backward()              # Backward pass: compute gradients
                optimizer.step()             # Update weights using gradients

                total_loss += loss.item()    # Track total loss

            # Print average loss for this epoch
            avg_loss = total_loss / len(dataloader)
            print(f"Epoch {epoch+1}/{epochs} - Loss: {avg_loss:.4f}")

    return model  # Return the trained model


# **Step 5: Evaluate the Model**

In [18]:
import torch
from sklearn.metrics import accuracy_score, precision_score, recall_score, f1_score

def evaluate_model(model: torch.nn.Module,
                   X_test: torch.Tensor,
                   y_test: torch.Tensor) -> dict:
    """
    Evaluates a trained binary classifier on test data and returns key metrics.

    Parameters:
    -----------
    model : nn.Module
        The trained PyTorch MLP model.

    X_test : torch.Tensor
        Test features (shape: [num_samples, 768]).

    y_test : torch.Tensor
        True binary labels (shape: [num_samples, 1]), values are 0 or 1.

    Returns:
    --------
    metrics : dict
        A dictionary containing accuracy, precision, recall, and F1-score.
    """

    # Put the model in evaluation mode (disables dropout, etc.)
    model.eval()

    # We don’t need to compute gradients during evaluation
    with torch.no_grad():
        # Get the model's output logits (before sigmoid)
        logits = model(X_test)

        # Apply sigmoid to get probabilities between 0 and 1
        probs = torch.sigmoid(logits)

        # Convert probabilities to binary predictions (1 if ≥ 0.5, else 0)
        preds = (probs >= 0.5).float()

    # Move predictions and labels to CPU so we can use sklearn
    y_true = y_test.cpu().numpy().ravel()
    y_pred = preds.cpu().numpy().ravel()

    # Compute evaluation metrics
    accuracy  = accuracy_score (y_true, y_pred)
    precision = precision_score(y_true, y_pred, zero_division=0)
    recall    = recall_score   (y_true, y_pred, zero_division=0)
    f1        = f1_score       (y_true, y_pred, zero_division=0)

    # Store all metrics in a dictionary
    metrics = {
        "accuracy": accuracy,
        "precision": precision,
        "recall": recall,
        "f1": f1
    }

    # Print them in a readable format
    print(f"📊 Evaluation Results - Acc: {accuracy:.3f}, Prec: {precision:.3f}, "
          f"Rec: {recall:.3f}, F1: {f1:.3f}")

    return metrics


# **Step 6: Save Model and Config AND Evaluation metric**

In [19]:
def save_model(model, layer: int, feature: str):
    layer_dir = Path(MODEL_DIR) / f"layer_{layer}"
    layer_dir.mkdir(parents=True, exist_ok=True)

    torch.save(model.state_dict(), layer_dir / f"{feature}_model.pt")
    with open(layer_dir / f"{feature}_config.json", "w") as f:
        json.dump({"input_dim": 768, "hidden_dim": 200}, f)


In [20]:
def save_metrics(all_metrics: dict, layer: int):
    with open(Path(METRIC_DIR) / f"layer_{layer}_metrics.json", "w") as f:
        json.dump(all_metrics, f, indent=2)


# **07. Step 07: Create layers vs f1 score/accuracy**

In [21]:
import pandas as pd
from pathlib import Path

def save_score_tables(metrics: list, save_dir: Path):
    """
    Creates and saves two CSV files: one for F1 scores and one for accuracy scores.

    Parameters:
    -----------
    metrics : list of dict
        The full list of metrics from evaluate_model() with keys:
        ['layer', 'feature', 'accuracy', 'f1', ...]

    save_dir : Path
        Directory to save the CSV files (e.g., METRIC_DIR)
    """

    # Create a list of unique layers and features
    layers = sorted(set(m["layer"] for m in metrics))
    features = ["voiced", "fricative", "nasal"]

    # Initialize empty DataFrames
    f1_df = pd.DataFrame(columns=["layer"] + features)
    acc_df = pd.DataFrame(columns=["layer"] + features)

    # Fill the DataFrames row by row
    for L in layers:
        row_f1 = {"layer": L}
        row_acc = {"layer": L}
        for feat in features:
            # Find the matching metric dict
            m = next((m for m in metrics if m["layer"] == L and m["feature"] == feat), None)
            if m:
                row_f1[feat] = round(m["f1"], 4)
                row_acc[feat] = round(m["accuracy"], 4)
        f1_df = pd.concat([f1_df, pd.DataFrame([row_f1])], ignore_index=True)
        acc_df = pd.concat([acc_df, pd.DataFrame([row_acc])], ignore_index=True)

    # Save the CSVs
    f1_path = save_dir / "f1_scores.csv"
    acc_path = save_dir / "accuracy_scores.csv"
    f1_df.to_csv(f1_path, index=False)
    acc_df.to_csv(acc_path, index=False)

    print(f"✅ F1 scores saved to: {f1_path}")
    print(f"✅ Accuracy scores saved to: {acc_path}")


# **MAIN function**

In [22]:
def main():
    # Step 1: Define all 3 phonetic features you're probing
    FEATURES = ["voiced", "fricative", "nasal"]

    # Directory to save results (customize if needed)
    METRIC_DIR = Path("/content/drive/MyDrive/00_RESEARCH_MSC_00/Final_Phonetic_Identification/03_Evaluation_metrics_of_probes")

    # Store all metrics for all layers and features
    all_metrics = []

    # Step 2: Loop over all 13 WavLM layers (0 to 12)
    for layer in range(13):
        print(f"\n🔍 Processing Layer {layer}")
        layer_metrics = {}  # Dictionary to store metrics for all 3 features in this layer

        # Step 3: Loop over each binary feature (1 model per feature)
        for feature in FEATURES:
            print(f"  ▶ Training for Feature: {feature}")

            # Step 3.1: Load the dataset (X: 768D embeddings, y: 0/1 binary label)
            X_train, y_train, X_test, y_test = load_dataset(layer, feature)

            # Step 3.2: Initialize a fresh model
            model = MLPProbe(input_dim=768, hidden_dim=200)

            # Step 3.3: Train the model using the training data
            train_model(model, X_train, y_train)

            # Step 3.4: Evaluate the trained model using test data
            metrics = evaluate_model(model, X_test, y_test)

            # Step 3.5: Print the evaluation results
            print(f"     ➤ F1-score: {metrics['f1']:.3f}, Accuracy: {metrics['accuracy']:.3f}")

            # Step 3.6: Save the trained model
            save_model(model, layer, feature)

            # Step 3.7: Store this feature’s metrics under the layer
            layer_metrics[feature] = metrics

            # Step 3.8: Add full info to all_metrics (including layer + feature)
            metrics_with_meta = metrics.copy()
            metrics_with_meta.update({"layer": layer, "feature": feature})
            all_metrics.append(metrics_with_meta)

        # Step 4: Save the complete metrics for this layer (all 3 features) as JSON
        save_metrics(layer_metrics, layer)

    # ✅ FINAL STEP: Save overall F1 and accuracy tables as CSVs
    save_score_tables(all_metrics, METRIC_DIR)

    print("\n✅ All layers and features have been trained, evaluated, and saved successfully.")


In [25]:
main()


🔍 Processing Layer 0
  ▶ Training for Feature: voiced
Epoch 1/25 - Loss: 0.5476
Epoch 2/25 - Loss: 0.3047
Epoch 3/25 - Loss: 0.2093
Epoch 4/25 - Loss: 0.1755
Epoch 5/25 - Loss: 0.1585
Epoch 6/25 - Loss: 0.1462
Epoch 7/25 - Loss: 0.1365
Epoch 8/25 - Loss: 0.1291
Epoch 9/25 - Loss: 0.1220
Epoch 10/25 - Loss: 0.1166
Epoch 11/25 - Loss: 0.1110
Epoch 12/25 - Loss: 0.1067
Epoch 13/25 - Loss: 0.1036
Epoch 14/25 - Loss: 0.0981
Epoch 15/25 - Loss: 0.0946
Epoch 16/25 - Loss: 0.0904
Epoch 17/25 - Loss: 0.0861
Epoch 18/25 - Loss: 0.0835
Epoch 19/25 - Loss: 0.0800
Epoch 20/25 - Loss: 0.0770
Epoch 21/25 - Loss: 0.0744
Epoch 22/25 - Loss: 0.0718
Epoch 23/25 - Loss: 0.0693
Epoch 24/25 - Loss: 0.0669
Epoch 25/25 - Loss: 0.0647
📊 Evaluation Results - Acc: 0.871, Prec: 0.859, Rec: 0.909, F1: 0.883
     ➤ F1-score: 0.883, Accuracy: 0.871
  ▶ Training for Feature: fricative
Epoch 1/25 - Loss: 0.5213
Epoch 2/25 - Loss: 0.2737
Epoch 3/25 - Loss: 0.1888
Epoch 4/25 - Loss: 0.1570
Epoch 5/25 - Loss: 0.1368
Epo

  f1_df = pd.concat([f1_df, pd.DataFrame([row_f1])], ignore_index=True)
  acc_df = pd.concat([acc_df, pd.DataFrame([row_acc])], ignore_index=True)


✅ F1 scores saved to: /content/drive/MyDrive/00_RESEARCH_MSC_00/Final_Phonetic_Identification/03_Evaluation_metrics_of_probes/f1_scores.csv
✅ Accuracy scores saved to: /content/drive/MyDrive/00_RESEARCH_MSC_00/Final_Phonetic_Identification/03_Evaluation_metrics_of_probes/accuracy_scores.csv

✅ All layers and features have been trained, evaluated, and saved successfully.
