In [2]:
import torch
import torch.nn as nn
from transformers import (
    EarlyStoppingCallback, Wav2Vec2FeatureExtractor, Wav2Vec2Processor, 
    TrainingArguments, Trainer, AutoConfig, AutoModelForCTC, Wav2Vec2PhonemeCTCTokenizer, AutoFeatureExtractor, Wav2Vec2Model, HubertModel, WavLMModel )
from datasets import load_from_disk
import numpy as np
from sklearn.decomposition import PCA
import matplotlib.pyplot as plt
from safetensors.torch import load_file
from tqdm import tqdm
import pandas as pd
from sklearn.metrics import mean_squared_error, r2_score


  from .autonotebook import tqdm as notebook_tqdm


In [None]:
# General-purpose pronunciation scoring model (non-CTC)
class PronunciationScoringModel(nn.Module):
    def __init__(self, model_name, model_save_path=None):
        super().__init__()
        config = AutoConfig.from_pretrained(model_name)
        if 'wav2vec2' in model_name:
            self.model = Wav2Vec2Model.from_pretrained(model_name, config=config)
        elif 'hubert' in model_name:
            self.model = HubertModel.from_pretrained(model_name, config=config)
        elif 'wavlm' in model_name:
            self.model = WavLMModel.from_pretrained(model_name, config=config)
        else:
            raise ValueError(f"Unsupported model type for model name: {model_name}")
        self.score_predictor = nn.Linear(config.hidden_size, 4)  # Additional linear layer for scoring

    def forward(self, input_values):
        outputs = self.model(input_values=input_values)
        last_hidden_state = outputs.last_hidden_state
        scores = self.score_predictor(last_hidden_state.mean(dim=1))
        return scores

In [None]:
# CTC-based pronunciation-scoring model
class PronunciationScoringModelforCTC(nn.Module):
    def __init__(self, model_name, model_save_path=None):
        super().__init__()
        
        config = AutoConfig.from_pretrained(model_name)
        self.model = AutoModelForCTC.from_pretrained(model_name)        
        self.score_predictor = nn.Linear(32, 4)
        
    def forward(self, input_values, labels=None, output_hidden_states=True, return_dict=True):
        outputs = self.model(input_values=input_values, 
                             output_hidden_states=output_hidden_states, 
                             return_dict=return_dict)
        # Extract frame-level logits and the last hidden layer
        logits = outputs.logits
        hidden_states = outputs.hidden_states[-1]  # last hidden states
        pooled_output = logits.mean(dim=1) # [batch, hidden states]
        scores = self.score_predictor(pooled_output).squeeze(-1)
        return scores

In [None]:
# Load preprocessed test dataset
test_ds_path = "/data2/haeyoung/speechocean762/preprocess/speechocean_test_ds"
test_ds = load_from_disk(test_ds_path)


In [None]:
model_name = "facebook/wav2vec2-large"
model_save_path = "/data2/haeyoung/finetuned/wav2vec2/general/01_wav2vec2-large/model.safetensors"

# Initialize model and load fine-tuned weights
model = PronunciationScoringModel(model_name)
model.load_state_dict(load_file(model_save_path))
model.to("cuda")
model.eval()
# Initialize corresponding feature extractor
feature_extractor = AutoFeatureExtractor.from_pretrained(model_name)


Some weights of Wav2Vec2Model were not initialized from the model checkpoint at facebook/wav2vec2-large and are newly initialized: ['wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original1', 'wav2vec2.encoder.pos_conv_embed.conv.parametrizations.weight.original0']
You should probably TRAIN this model on a down-stream task to be able to use it for predictions and inference.


In [None]:
def extract_scores(model, feature_extractor, batch):
    input_values = feature_extractor(batch["audio"]["array"], return_tensors="pt", sampling_rate=16000).input_values.to("cuda")
    with torch.no_grad():
        scores_pred  = model(input_values)
        scores_numpy = scores_pred.cpu().detach().numpy()
    return scores_numpy

def get_labels(batch):
    labels = np.zeros([4])
    labels[0] = batch['accuracy']
    labels[1] = batch['fluency']
    labels[2] = batch['prosodic']
    labels[3] = batch['total']
    
    return labels

def compute_metrics(preds, labels):
    mse = mean_squared_error(labels, preds, multioutput='raw_values')
    pcc = [np.corrcoef(labels[:, i], preds[:, i])[0, 1] if not np.isnan(np.corrcoef(labels[:, i], preds[:, i])[0, 1]) else 0 for i in range(preds.shape[1])]

    metrics = {}
    for i, (mse_val, pcc_val) in enumerate(zip(mse, pcc)):
        metrics[f"mse_{i}"] = mse_val
        metrics[f"pcc_{i}"] = pcc_val

    return metrics

In [8]:
# Extracting labels
labels = [get_labels(batch) for batch in tqdm(test_ds)]
all_labels = np.vstack(labels)

# Extracting predictions
preds = [extract_scores(model, feature_extractor, batch) for batch in tqdm(test_ds)]
all_preds = np.vstack(preds)


100%|██████████| 2500/2500 [00:01<00:00, 1569.14it/s]
100%|██████████| 2500/2500 [01:08<00:00, 36.68it/s]


In [9]:
# Compute metrics
metrics = compute_metrics(all_preds, all_labels)
print(metrics)

{'mse_0': 1.535458551992261, 'pcc_0': 0.6934507827453273, 'mse_1': 0.9483399665499579, 'pcc_1': 0.7925950153078747, 'mse_2': 0.9918817185076663, 'pcc_2': 0.7857253303544679, 'mse_3': 1.3997101610395375, 'pcc_3': 0.7283804352867991}
