In [None]:
'''
This is a simple pipeline of the BiRA device's actual use case for pronunciation
evaluation. Take note that audio inputs will come from the device's microphone, and
the reference phonemes (denoted as variable 'ref') will come from the pronunciation lexicon.

Users will select what Filipino word they would like to practice first. By selecting
a word, the system will extract the reference phoneme sequence from the lexicon and ask
the user to try pronouncing the word. It will record the users' voice, extract its
features, and evaluate the pronunciation using the BiRA model.

The actual device's evaluation has a scoring system and remarks based on the reference
and predicted phoneme alignment in conjunction with the alignment result itself.
'''

In [None]:
!pip install python-Levenshtein

Collecting textgrid
  Downloading TextGrid-1.6.1.tar.gz (9.4 kB)
  Preparing metadata (setup.py) ... [?25l[?25hdone
Building wheels for collected packages: textgrid
  Building wheel for textgrid (setup.py) ... [?25l[?25hdone
  Created wheel for textgrid: filename=TextGrid-1.6.1-py3-none-any.whl size=10146 sha256=aa3f4ec0012903cd5aa9a18b8733648aadeacff8c264decc84de0c8c7bc5c689
  Stored in directory: /root/.cache/pip/wheels/7a/c5/96/5e43aa4c640995fbbb0b9a7b98e6007bfd777add3c7e56d70a
Successfully built textgrid
Installing collected packages: textgrid
Successfully installed textgrid-1.6.1
Collecting python-Levenshtein
  Downloading python_levenshtein-0.27.1-py3-none-any.whl.metadata (3.7 kB)
Collecting Levenshtein==0.27.1 (from python-Levenshtein)
  Downloading levenshtein-0.27.1-cp311-cp311-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.6 kB)
Collecting rapidfuzz<4.0.0,>=3.9.0 (from Levenshtein==0.27.1->python-Levenshtein)
  Downloading rapidfuzz-3.13.0-cp311-cp311-manylin

In [None]:
import re
import numpy as np
import librosa
import scipy.fftpack
import torch
import torch.nn as nn
from torch.serialization import add_safe_globals
from Levenshtein import distance as levenshtein

# LOAD MODEL

In [None]:
# phoneme to id dictionary
phoneme_to_id = {
    # vowels with stress symbols
    "AA0": 0, "AA1": 1, "AA2": 2, "AW0": 3, "AW1": 4, "AY0": 5, "AY1": 6,
    "EH0": 7, "EH1": 8, "ER0": 9, "EY1": 10,
    "IH1": 11, "IY0": 12, "IY1": 13, "IY2": 14,
    "OW0": 15, "OW1": 16, "OW2": 17, "OY0": 18, "OY1": 19,
    "UW0": 20, "UW1": 21, "UW2": 22,
    # Consonants (no stress markers)
    "B": 23, "D": 24, "F": 25, "G": 26, "H": 27, "JH": 28, "K": 29, "L": 30, "M": 31, "N": 32,
    "NG": 33, "P": 34, "R": 35, "S": 36, "SH": 37, "T": 38, "V": 39, "W": 40, "Y": 41, "Z": 42,
    "<BLANK>": 43
}

In [None]:
# Hyperparameters
input_dim = 41
hidden_dim = 128
output_dim = len(phoneme_to_id)
num_layers = 2
dropout = 0.4

In [None]:
device = torch.device("cuda" if torch.cuda.is_available() else "cpu")

In [None]:
class BiLSTM_CTC(nn.Module):
    def __init__(self, input_dim, hidden_dim, output_dim, num_layers=num_layers, dropout=dropout):
        super().__init__()

        self.hidden_dim = hidden_dim
        self.num_layers = num_layers

        self.lstm = nn.LSTM(input_dim, hidden_dim, num_layers, batch_first=True, bidirectional=True, dropout=dropout)
        self.fc = nn.Linear(hidden_dim * 2, output_dim)
        self.log_softmax = nn.LogSoftmax(dim=-1)

    def forward(self, x):
        h0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_dim).to(device)
        c0 = torch.zeros(self.num_layers * 2, x.size(0), self.hidden_dim).to(device)

        x, _ = self.lstm(x, (h0, c0))
        x = self.fc(x)
        return self.log_softmax(x)

In [None]:
# load model
add_safe_globals({'BiLSTM_CTC': BiLSTM_CTC})

trained_model = torch.load("/content/drive/path/to/model.pt", weights_only=False)
trained_model.to(device)
trained_model.eval()

BiLSTM_CTC(
  (lstm): LSTM(41, 128, num_layers=2, batch_first=True, dropout=0.4, bidirectional=True)
  (fc): Linear(in_features=256, out_features=44, bias=True)
  (log_softmax): LogSoftmax(dim=-1)
)

# FUNCTIONS FOR EVALUATION

In [None]:
def mel_filterbank(sr=16000, n_fft=512, n_mels=26, fmin=0, fmax=None):
    if fmax is None:
        fmax = sr // 2
    mel_points = np.linspace(librosa.hz_to_mel(fmin),
                             librosa.hz_to_mel(fmax),
                             n_mels + 2)
    hz_points = librosa.mel_to_hz(mel_points)
    bin_points = np.floor((n_fft + 1) * hz_points / sr).astype(int)

    filters = np.zeros((n_mels, int(n_fft // 2 + 1)))
    for m in range(1, n_mels + 1):
        f_m_minus = bin_points[m - 1]
        f_m = bin_points[m]
        f_m_plus = bin_points[m + 1]

        for k in range(f_m_minus, f_m):
            filters[m - 1, k] = (k - f_m_minus) / (f_m - f_m_minus)
        for k in range(f_m, f_m_plus):
            filters[m - 1, k] = (f_m_plus - k) / (f_m_plus - f_m)
    return filters

# mel filterbank frequency warping (VTLN)
def apply_piecewise_warp(filters, warp_factor, pivot_freq, sr=16000, n_fft=512):
    center_bin = int(np.floor((n_fft + 1) * pivot_freq / sr))

    warped_filters = np.zeros_like(filters)
    for i in range(filters.shape[0]):
        orig_bins = np.arange(filters.shape[1])
        warped_bins = np.where(
            orig_bins <= center_bin,
            orig_bins,
            center_bin + (orig_bins - center_bin) * warp_factor
        )
        warped_filters[i] = np.interp(orig_bins, warped_bins, filters[i], left=0, right=0)
    return warped_filters

def extract_mfcc_vtln(signal, sr=16000, warp_factor=0.85, n_mfcc=13, n_mels=26, n_fft=512, hop_length=160):
    pre_emphasis = 0.97
    emphasized = np.append(signal[0], signal[1:] - pre_emphasis * signal[:-1])
    stft = librosa.stft(emphasized, n_fft=n_fft, hop_length=hop_length, win_length=400, window='hamming')
    power_spec = np.abs(stft) ** 2
    fb = mel_filterbank(sr=sr, n_fft=n_fft, n_mels=n_mels)
    fb_warped = apply_piecewise_warp(fb, warp_factor, pivot_freq=1500, sr=sr, n_fft=n_fft)
    mel_spec = np.dot(fb_warped, power_spec[:int(n_fft // 2 + 1), :])
    mel_spec = mel_spec[1:-1, :]
    log_mel_spec = librosa.power_to_db(mel_spec)
    mfcc = scipy.fftpack.dct(log_mel_spec, axis=0, type=2, norm='ortho')[0:n_mfcc].T
    return mfcc

def extract_full_features(signal, sr=16000, warp_factor=0.85):
    mfcc = extract_mfcc_vtln(signal, sr, warp_factor=warp_factor)
    d_mfcc = librosa.feature.delta(mfcc)
    dd_mfcc = librosa.feature.delta(mfcc, order=2)
    f0, _, _ = librosa.pyin(signal, fmin=80, fmax=400, sr=sr, frame_length=1024, hop_length=160)
    f0 = np.nan_to_num(f0, nan=0.0).reshape(-1, 1)
    energy = librosa.feature.rms(y=signal, frame_length=400, hop_length=160).T

    min_len = min(mfcc.shape[0], d_mfcc.shape[0], dd_mfcc.shape[0], f0.shape[0], energy.shape[0])

    mfcc_stack = np.hstack([mfcc[:min_len], d_mfcc[:min_len], dd_mfcc[:min_len]])
    mean = np.mean(mfcc_stack, axis=0)
    std = np.std(mfcc_stack, axis=0) + 1e-10
    mfcc_cmvn = (mfcc_stack - mean) / std

    f0_part = f0[:min_len]
    energy_part = energy[:min_len]

    features = np.hstack([mfcc_cmvn, f0_part, energy_part])

    return features

In [None]:
# convert probability outputs into phoneme sequence
def ctc_greedy_decode(log_probs, blank=43, suppress_blanks=True):
    probs = torch.exp(log_probs)
    pred = torch.argmax(probs, dim=-1)

    decoded = []
    for b in range(pred.size(1)):
        sequence = []
        prev_token = -1
        for t in range(pred.size(0)):
            token = pred[t, b].item()
            if token != blank:
                if token != prev_token:
                    sequence.append(token)
            elif not suppress_blanks:
                sequence.append(blank)
            prev_token = token
        decoded.append(sequence)
    return decoded

In [None]:
# alignment with stress-aware matching
def strip_stress(p):
    return re.sub(r"\d", "", p)

def align_sequences(pred_seq, ref_seq):
    n, m = len(ref_seq), len(pred_seq)
    dp = np.zeros((n + 1, m + 1))
    backtrace = [[None]*(m + 1) for _ in range(n + 1)]

    # Initialize
    for i in range(n + 1):
        dp[i][0] = i
        backtrace[i][0] = 'del'
    for j in range(m + 1):
        dp[0][j] = j
        backtrace[0][j] = 'ins'
    backtrace[0][0] = None

    # Fill DP table
    for i in range(1, n + 1):
        for j in range(1, m + 1):
            if ref_seq[i-1] == pred_seq[j-1]:
                cost = 0
                op = 'ok'
            elif strip_stress(ref_seq[i-1]) == strip_stress(pred_seq[j-1]):
                cost = 1
                op = 'stress'
            else:
                cost = 1
                op = 'sub'

            options = [
                (dp[i-1][j-1] + cost, op),
                (dp[i-1][j] + 1, 'del'),
                (dp[i][j-1] + 1, 'ins'),
            ]
            dp[i][j], backtrace[i][j] = min(options, key=lambda x: x[0])

    # Backtrace
    i, j = n, m
    alignment = []
    while i > 0 or j > 0:
        op = backtrace[i][j]
        if op == 'ok' or op == 'stress' or op == 'sub':
            alignment.append((ref_seq[i-1], pred_seq[j-1], op))
            i -= 1
            j -= 1
        elif op == 'del':
            alignment.append((ref_seq[i-1], None, 'del'))
            i -= 1
        elif op == 'ins':
            alignment.append((None, pred_seq[j-1], 'ins'))
            j -= 1

    alignment.reverse()
    return alignment

def visualize_alignment(ref_seq, pred_seq, phoneme_to_letter=None):
    alignment = align_sequences(pred_seq, ref_seq)

    ref_line = "REF : "
    pred_line = "PRED: "
    mark_line = "      "

    for ref, pred, op in alignment:
        # Map phonemes using dictionary
        if phoneme_to_letter:
            ref = phoneme_to_letter.get(ref, ref) if ref is not None else None
            pred = phoneme_to_letter.get(pred, pred) if pred is not None else None

        ref_token = f"{ref:<5}" if ref is not None else "     "
        pred_token = f"{pred:<5}" if pred is not None else "     "

        if op == "ok":
            mark = "‚úÖ"
        elif op == "sub":
            mark = "üîÑ"
        elif op == "ins":
            mark = "‚ûï"
        elif op == "del":
            mark = "‚ûñ"
        elif op == "stress":
            mark = "‚ö†"
        else:
            mark = "?"

        ref_line += ref_token
        pred_line += pred_token
        mark_line += f"{mark:<5}"

    print(ref_line)
    print(pred_line)
    print(mark_line)

# PRONUNCIATION EVALUATION

In [None]:
# after obtaining user's voice record, load audio
signal, sr = librosa.load("/content/mais.wav", sr=16000)
#signal.shape

In [None]:
# feature extraction of voice record
features = extract_full_features(signal, sr=16000)
features_tensor = torch.tensor(features, dtype=torch.float32).unsqueeze(0).to(device)

# model will now evaluate the pronunciation
with torch.no_grad():
    log_probs = trained_model(features_tensor)
    decoded = ctc_greedy_decode(log_probs.permute(1, 0, 2), blank=43)

phoneme_to_letter = {
    "AA0": "A0", "AA1": "A1", "AA2": "A2", "EH0": "E0", "EH1": "E1", "IH1": "I1",
    "IY0": "I0", "IY1": "I1", "IY2": "I2", "UW0": "U0", "UW1": "U1", "UW2": "U2"
}

id2phoneme = {v: k for k, v in phoneme_to_id.items()}
# Reference transcription (based on the pronunciation lexicon)
ref = [phoneme_to_id[p] for p in ["M", "AA0", "IY1", "S"]] # mais
#ref = [phoneme_to_id[p] for p in ["K", "AY1", "G", "AA0", "N", "D", "AA1", "N", "AA1", "NG", "H", "IY1", "K", "AW0", "M", "OW1"]] # kay ganda ng hikaw mo
#ref = [phoneme_to_id[p] for p in ["S", "UW1", "S", "IY0"]] # susi
#ref = [phoneme_to_id[p] for p in ["AA0", "M", "P", "AA0", "L", "AA0", "Y", "AA1"]] # ampalaya
#ref = [phoneme_to_id[p] for p in ["OW1", "K", "R", "AA0"]] # okra

# visualizing alignment
pred_str = [id2phoneme[i] for i in decoded[0]]
ref_str = [id2phoneme[i] for i in ref]
visualize_alignment(ref_str, pred_str, phoneme_to_letter)

REF : M    A0        I1   S    
PRED: M    A0   I0   E1   S    
      ‚úÖ    ‚úÖ    ‚ûï    üîÑ    ‚úÖ    
