In [1]:
!pip install python_speech_features
!pip install pydub librosa scipy
!pip install librosa scipy tqdm





[notice] A new release of pip is available: 23.2.1 -> 24.3.1
[notice] To update, run: python.exe -m pip install --upgrade pip





[notice] A new release of pip is available: 23.2.1 -> 24.3.1
[notice] To update, run: python.exe -m pip install --upgrade pip





[notice] A new release of pip is available: 23.2.1 -> 24.3.1
[notice] To update, run: python.exe -m pip install --upgrade pip


In [2]:
import os
import librosa
import numpy as np
from sklearn.preprocessing import StandardScaler
from python_speech_features import mfcc, delta

In [4]:
# !pip install librosa numpy sklearn tqdm

import os
import librosa
import numpy as np
import glob
from sklearn.preprocessing import StandardScaler
from sklearn.decomposition import IncrementalPCA
import json
from sklearn.model_selection import train_test_split
import pickle
from tqdm import tqdm

input_dir = r'C:\Users\User\Documents\NUS\CS5242\Final Project\EDA_libraspeech\LibriSpeech\train-clean-100'  # Use raw strings
output_dir = r'C:\Users\User\Documents\NUS\CS5242\Final Project\EDA_libraspeech\processed_data' 
transcriptions = {}

os.makedirs(output_dir, exist_ok=True)
def load_transcriptions(input_dir):
    for root, _, _ in os.walk(input_dir):
        for trans_file in glob.glob(os.path.join(root, "*.trans.txt")):
            with open(trans_file, "r") as f:
                for line in f:
                    parts = line.strip().split(" ", 1)
                    if len(parts) == 2:
                        file_id, transcription = parts
                        transcriptions[file_id] = transcription.lower() 

load_transcriptions(input_dir)
audio_files = []
file_ids = []
for root, _, files in os.walk(input_dir):
    for file in files:
        if file.endswith('.flac'):
            file_path = os.path.join(root, file)
            file_id = os.path.splitext(file)[0] 
            if file_id in transcriptions:
                audio_files.append(file_path)
                file_ids.append(file_id)

train_files, test_files, train_file_ids, test_file_ids = train_test_split(
    audio_files, file_ids, test_size=0.2, random_state=42
)

print(f"Total files: {len(audio_files)}, Training: {len(train_files)}, Testing: {len(test_files)}")

def process_file(file_path, file_id):
    try:
        y, sr = librosa.load(file_path, sr=16000)
        y, _ = librosa.effects.trim(y)
        y = y - np.mean(y)

        mfcc_features = librosa.feature.mfcc(y=y, sr=sr, n_mfcc=13)
        delta_features = librosa.feature.delta(mfcc_features, order=1)
        delta2_features = librosa.feature.delta(mfcc_features, order=2)
        features = np.vstack((mfcc_features, delta_features, delta2_features)).T

        return features, file_id
    except Exception as e:
        print(f"Error processing {file_path}: {e}")
        return None, file_id

batch_size = 1000  

scaler = StandardScaler()
ipca = IncrementalPCA(n_components=20)

def training_data_generator(files, file_ids, batch_size):
    total_files = len(files)
    for i in range(0, total_files, batch_size):
        batch_files = files[i:i+batch_size]
        batch_file_ids = file_ids[i:i+batch_size]
        batch_features = []
        batch_lengths = []
        for file_path, file_id in zip(batch_files, batch_file_ids):
            features, fid = process_file(file_path, file_id)
            if features is not None:
                batch_features.append(features)
                batch_lengths.append(len(features))
            else:
                print(f"Failed to process {file_id}")
        if batch_features:
            batch_features_concat = np.concatenate(batch_features, axis=0)
            yield batch_features_concat

print("Processing and fitting StandardScaler and IncrementalPCA on training data...")

for batch_features in tqdm(training_data_generator(train_files, train_file_ids, batch_size), total=int(len(train_files)/batch_size)+1):
    scaler.partial_fit(batch_features)

for batch_features in tqdm(training_data_generator(train_files, train_file_ids, batch_size), total=int(len(train_files)/batch_size)+1):
    batch_features_scaled = scaler.transform(batch_features)
    ipca.partial_fit(batch_features_scaled)

with open(os.path.join(output_dir, 'scaler.pkl'), 'wb') as f:
    pickle.dump(scaler, f)
with open(os.path.join(output_dir, 'ipca.pkl'), 'wb') as f:
    pickle.dump(ipca, f)
print("Scaler and IncrementalPCA models saved.")

def process_transform_save(file_path, file_id):
    features, fid = process_file(file_path, file_id)
    if features is not None:
        features_scaled = scaler.transform(features)
        features_pca = ipca.transform(features_scaled)
        output_file = os.path.join(output_dir, f"{file_id}_features.npy")
        np.save(output_file, features_pca)
    else:
        print(f"Failed to process {file_id}")

print("Processing, transforming, and saving training features...")
for file_path, file_id in tqdm(zip(train_files, train_file_ids), total=len(train_files)):
    process_transform_save(file_path, file_id)

print("Processing, transforming, and saving testing features...")
for file_path, file_id in tqdm(zip(test_files, test_file_ids), total=len(test_files)):
    process_transform_save(file_path, file_id)

train_transcriptions = {file_id: transcriptions[file_id] for file_id in train_file_ids}
test_transcriptions = {file_id: transcriptions[file_id] for file_id in test_file_ids}

with open(os.path.join(output_dir, "train_transcriptions.json"), "w") as f:
    json.dump(train_transcriptions, f)

with open(os.path.join(output_dir, "test_transcriptions.json"), "w") as f:
    json.dump(test_transcriptions, f)

print("Transcriptions saved.")
print("Preprocessing complete.")


Total files: 28539, Training: 22831, Testing: 5708
Processing and fitting StandardScaler and IncrementalPCA on training data...


100%|██████████| 23/23 [06:43<00:00, 17.54s/it]
100%|██████████| 23/23 [07:00<00:00, 18.27s/it]


Scaler and IncrementalPCA models saved.
Processing, transforming, and saving training features...


100%|██████████| 22831/22831 [08:33<00:00, 44.49it/s]


Processing, transforming, and saving testing features...


100%|██████████| 5708/5708 [02:07<00:00, 44.65it/s]


Transcriptions saved.
Preprocessing complete.


In [3]:
import os
import numpy as np
import json
from hmmlearn import hmm
import pickle
import multiprocessing
import nltk

nltk.download('cmudict')
from nltk.corpus import cmudict

output_dir = r'C:\Users\User\Documents\NUS\CS5242\Final Project\EDA_libraspeech\processed_data'

# Global variables
X_train = []
lengths_train = []
y_train = []

with open(os.path.join(output_dir, "train_transcriptions.json"), "r") as f:
    train_transcriptions = json.load(f)

pronunciation_dict = cmudict.dict()

# todo: to refine here?
phoneme_groups = {
    'vowels': ['AA', 'AE', 'AH', 'AO', 'AW', 'AY', 'EH', 'ER', 'EY', 'IH', 'IY', 'OW', 'OY', 'UH', 'UW'],
    'stops': ['P', 'B', 'T', 'D', 'K', 'G'],
    'fricatives': ['F', 'V', 'TH', 'DH', 'S', 'Z', 'SH', 'ZH', 'HH'],
    'affricates': ['CH', 'JH'],
    'nasals': ['M', 'N', 'NG'],
    'liquids_glides': ['L', 'R', 'W', 'Y'],
    'silence': ['SIL']
}

phoneme_to_group = {}
for group, phonemes in phoneme_groups.items():
    for phoneme in phonemes:
        phoneme_to_group[phoneme] = group

def words_to_phoneme_groups(words, pronunciation_dict, phoneme_to_group):
    phoneme_groups_sequence = []
    for word in words:
        word = word.lower()
        if word in pronunciation_dict:
            phonemes = pronunciation_dict[word][0]
            for phoneme in phonemes:
                phoneme = ''.join([char for char in phoneme if char.isalpha()])  # Remove stress markers
                phoneme = phoneme.upper()
                group = phoneme_to_group.get(phoneme)
                if group:
                    phoneme_groups_sequence.append(group)
                else:
                    pass
        else:
            pass
    return phoneme_groups_sequence

train_phoneme_sequences = {}
for file_id, transcription in train_transcriptions.items():
    words = transcription.strip().split()
    phoneme_groups_sequence = words_to_phoneme_groups(words, pronunciation_dict, phoneme_to_group)
    train_phoneme_sequences[file_id] = phoneme_groups_sequence

#mapping from phoneme groups to labels
unique_phoneme_groups = set()
for groups in train_phoneme_sequences.values():
    unique_phoneme_groups.update(groups)

phoneme_group_to_label = {group: idx for idx, group in enumerate(sorted(unique_phoneme_groups))}
label_to_phoneme_group = {idx: group for group, idx in phoneme_group_to_label.items()}

with open(os.path.join(output_dir, "phoneme_group_to_label.json"), "w") as f:
    json.dump(phoneme_group_to_label, f)

def align_phonemes_to_frames(num_frames, phonemes):
    frames_per_phoneme = max(int(num_frames / len(phonemes)), 1)
    labels = []
    for phoneme in phonemes:
        labels.extend([phoneme] * frames_per_phoneme)
    if len(labels) < num_frames:
        labels.extend([phonemes[-1]] * (num_frames - len(labels)))
    else:
        labels = labels[:num_frames]
    return labels

for file_id in train_transcriptions.keys():
    features = np.load(os.path.join(output_dir, f"{file_id}_features.npy"))
    num_frames = features.shape[0]
    
    phoneme_groups = train_phoneme_sequences[file_id]
    
    labels = align_phonemes_to_frames(num_frames, phoneme_groups)
    labels = [phoneme_group_to_label[group] for group in labels]
    
    X_train.append(features)
    lengths_train.append(num_frames)
    y_train.extend(labels)

def train_phoneme_group_model(args):
    group, label = args
    try:
        group_features = []
        for i, features in enumerate(X_train):
            start = sum(lengths_train[:i])
            end = start + lengths_train[i]
            labels = y_train[start:end]
            
            group_indices = [idx for idx, l in enumerate(labels) if l == label]
            if group_indices:
                group_features.extend(features[group_indices])
        
        group_features = np.array(group_features)
        
        if len(group_features) > 0:
            model = hmm.GaussianHMM(n_components=10, covariance_type='diag', n_iter=100)
            model.fit(group_features)
            with open(os.path.join(output_dir, f"{group}_model.pkl"), "wb") as f:
                pickle.dump(model, f)
            print(f"Trained model for group {group}")
        else:
            print(f"No data for group {group}")
    except Exception as e:
        print(f"Error training model for group {group}: {e}")

if __name__ == '__main__':
    group_args = [(group, label) for group, label in phoneme_group_to_label.items()]
    
    #train models sequentially since parallel is not working apparently
    for args in group_args:
        train_phoneme_group_model(args)



[nltk_data] Downloading package cmudict to
[nltk_data]     C:\Users\User\AppData\Roaming\nltk_data...
[nltk_data]   Package cmudict is already up-to-date!
Model is not converging.  Current: -2835955.849360123 is not greater than -2835955.5610560817. Delta is -0.2883040411397815


Trained model for group affricates
Trained model for group fricatives


Model is not converging.  Current: -28676227.404248703 is not greater than -28675989.35240077. Delta is -238.0518479347229


Trained model for group liquids_glides


Model is not converging.  Current: -32491031.962262094 is not greater than -32490690.52566058. Delta is -341.43660151585937


Trained model for group nasals
Trained model for group stops


Model is not converging.  Current: -96825341.2409979 is not greater than -96822600.59224094. Delta is -2740.6487569510937


Trained model for group vowels


In [4]:
import numpy as np

def viterbi_decode(features, emission_models, transition_matrix, start_prob):
    T = features.shape[0]
    N = len(emission_models)
    log_prob = np.zeros((T, N))
    backpointer = np.zeros((T, N), dtype=int)

    for s in range(N):
        group = label_to_phoneme_group[s]
        emission_log_prob = emission_models[group].score(features[0].reshape(1, -1))
        log_prob[0, s] = np.log(start_prob[s]) + emission_log_prob

    for t in range(1, T):
        for s in range(N):
            group = label_to_phoneme_group[s]
            emission_log_prob = emission_models[group].score(features[t].reshape(1, -1))
            transition_log_probs = log_prob[t - 1] + np.log(transition_matrix[:, s])
            best_prev_state = np.argmax(transition_log_probs)
            log_prob[t, s] = transition_log_probs[best_prev_state] + emission_log_prob
            backpointer[t, s] = best_prev_state

    best_path = np.zeros(T, dtype=int)
    best_path[T - 1] = np.argmax(log_prob[T - 1])
    for t in range(T - 2, -1, -1):
        best_path[t] = backpointer[t + 1, best_path[t + 1]]

    return best_path


In [5]:
from collections import defaultdict
import numpy as np

bigram_counts = defaultdict(lambda: defaultdict(int))
for groups in train_phoneme_sequences.values():
    for i in range(len(groups) - 1):
        bigram_counts[groups[i]][groups[i + 1]] += 1

phoneme_groups = sorted(phoneme_group_to_label.keys())
n_states = len(phoneme_groups)
transition_matrix = np.zeros((n_states, n_states))

for i, group_i in enumerate(phoneme_groups):
    total = sum(bigram_counts[group_i].values())
    if total > 0:
        for j, group_j in enumerate(phoneme_groups):
            count = bigram_counts[group_i][group_j]
            transition_matrix[i, j] = count / total
    else:
        transition_matrix[i, :] = 1 / n_states


In [6]:
import os
import numpy as np
import json
import pickle
import editdistance

with open(os.path.join(output_dir, "test_transcriptions.json"), "r") as f:
    test_transcriptions = json.load(f)

with open(os.path.join(output_dir, "phoneme_group_to_label.json"), "r") as f:
    phoneme_group_to_label = json.load(f)
label_to_phoneme_group = {int(idx): group for group, idx in phoneme_group_to_label.items()}

X_test = []
lengths_test = []
test_phoneme_sequences = {}

for file_id, transcription in test_transcriptions.items():
    features = np.load(os.path.join(output_dir, f"{file_id}_features.npy"))
    X_test.append(features)
    lengths_test.append(features.shape[0])

    words = transcription.strip().split()
    phoneme_groups_sequence = words_to_phoneme_groups(words, pronunciation_dict, phoneme_to_group)
    test_phoneme_sequences[file_id] = phoneme_groups_sequence

emission_models = {}
phoneme_groups = sorted(phoneme_group_to_label.keys())
n_states = len(phoneme_groups)

for group in phoneme_groups:
    model_path = os.path.join(output_dir, f"{group}_model.pkl")
    with open(model_path, "rb") as f:
        emission_models[group] = pickle.load(f)

predicted_sequences = {}
start_prob = np.full(n_states, 1 / n_states)

for idx, features in enumerate(X_test):
    best_path = viterbi_decode(features, emission_models, transition_matrix, start_prob)
    predicted_phoneme_groups = [label_to_phoneme_group[state] for state in best_path]

    collapsed_sequence = [predicted_phoneme_groups[0]]
    for group in predicted_phoneme_groups[1:]:
        if group != collapsed_sequence[-1]:
            collapsed_sequence.append(group)

    file_id = list(test_transcriptions.keys())[idx]
    predicted_sequences[file_id] = collapsed_sequence

total_errors = 0
total_length = 0

for file_id in test_transcriptions.keys():
    predicted = predicted_sequences[file_id]
    ground_truth = test_phoneme_sequences[file_id]

    distance = editdistance.eval(predicted, ground_truth)
    total_errors += distance
    total_length += len(ground_truth)

phoneme_group_error_rate = total_errors / total_length
print(f"Phoneme Group Error Rate: {phoneme_group_error_rate * 100:.2f}%")


Phoneme Group Error Rate: 179.66%
