In [2]:
!pip install hmmlearn


Collecting hmmlearn
  Downloading hmmlearn-0.3.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl.metadata (3.0 kB)
Downloading hmmlearn-0.3.3-cp312-cp312-manylinux_2_17_x86_64.manylinux2014_x86_64.whl (165 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m166.0/166.0 kB[0m [31m4.2 MB/s[0m eta [36m0:00:00[0m
[?25hInstalling collected packages: hmmlearn
Successfully installed hmmlearn-0.3.3


In [4]:
import os
import glob
import numpy as np
import librosa
from sklearn.model_selection import train_test_split
from hmmlearn.hmm import GaussianHMM
from scipy.stats import multivariate_normal
import warnings
warnings.filterwarnings("ignore", category=DeprecationWarning)

# -----------------------------
# CONFIG
# -----------------------------
SAMPLE_RATE = 16000   # TIMIT uses 16k
N_MFCC = 13
HMM_COV_TYPE = 'diag'  # 'diag' or 'full'
RANDOM_STATE = 42

# -----------------------------
# HELPERS: Data loading & feature extraction
# -----------------------------
def load_phn_file(phn_path):
    """
    Parse a TIMIT .phn file. .phn lines: <start_sample> <end_sample> <phoneme_label>
    Returns list of tuples (start_sample, end_sample, phoneme_label)
    """
    items = []
    with open(phn_path, 'r') as f:
        for line in f:
            parts = line.strip().split()
            if len(parts) != 3:
                continue
            s, e, p = int(parts[0]), int(parts[1]), parts[2]
            items.append((s, e, p))
    return items

def extract_mfcc(wav_path, sr=SAMPLE_RATE, n_mfcc=N_MFCC, hop_length=160, n_fft=400):
    """
    Load wav and return MFCC feature matrix (T x n_mfcc)
    """
    x, sr_loaded = librosa.load(wav_path, sr=sr)
    mfcc = librosa.feature.mfcc(x, sr=sr, n_mfcc=n_mfcc, n_fft=n_fft, hop_length=hop_length)
    # transpose to (T, n_mfcc)
    return mfcc.T

def load_timit_word_files(timit_root, words_list=None, max_per_word=None):
    """
    Load TIMIT where each WAV has a .phn. Return dict:
      {word: [(wav_path, phn_path), ...]}
    This function assumes filename conventions: audio .WAV and .PHN with same basename.
    Customize to your layout if necessary.
    """
    pairs = {}
    # Search for all .wav files recursively
    wav_files = glob.glob(os.path.join(timit_root, '**', '*.wav'), recursive=True)
    for wav in wav_files:
        phn = os.path.splitext(wav)[0] + '.phn'
        if not os.path.exists(phn):
            continue
        # Try to find the word label for this file. TIMIT sentences contain words.
        # For this assignment, user likely pre-split each file to single word; otherwise you'd need to parse transcripts.
        # We'll use parent folder name as a proxy label if words_list not provided.
        word_label = os.path.basename(os.path.dirname(wav))
        if words_list is not None and word_label not in words_list:
            continue
        pairs.setdefault(word_label, []).append((wav, phn))
    if max_per_word:
        for w in list(pairs.keys()):
            pairs[w] = pairs[w][:max_per_word]
    return pairs

# -----------------------------
# HMM training utilities
# -----------------------------
def make_obs_list_for_word(pairs, feature_extractor=extract_mfcc):
    """
    For a list of (wav, phn) pairs, return list of observation arrays (T_i x D)
    """
    obs = []
    for wav, phn in pairs:
        feat = feature_extractor(wav)  # (T, D)
        obs.append(feat)
    return obs

def train_gaussian_hmm(observations, n_components=5, cov_type=HMM_COV_TYPE, n_iter=100):
    """
    Train a GaussianHMM on a list of observation sequences (list of arrays T_i x D).
    Returns trained model.
    """
    # hmmlearn expects concatenated data + lengths
    X = np.vstack(observations)
    lengths = [o.shape[0] for o in observations]
    model = GaussianHMM(n_components=n_components, covariance_type=cov_type,
                        n_iter=n_iter, random_state=RANDOM_STATE, verbose=False)
    model.fit(X, lengths)
    return model

# -----------------------------
# Emission log-probabilities (for Gaussian HMM states)
# -----------------------------
def emission_logprob(obs, means, covars, cov_type='diag'):
    """
    Compute emission log-probabilities:
      obs: (T, D)
      means: (n_states, D)
      covars: depends on cov_type:
        - diag: (n_states, D)
        - full: (n_states, D, D)
    Returns log_B: (T, n_states) where log_B[t, j] = log P(obs[t] | state=j)
    """
    T = obs.shape[0]
    n_states = means.shape[0]
    D = obs.shape[1]
    log_B = np.zeros((T, n_states))
    for j in range(n_states):
        mu = means[j]
        if cov_type == 'diag':
            cov = np.diag(covars[j]) if covars.shape[1] == D else np.diag(covars[j])
            # use multivariate_normal with cov
            rv = multivariate_normal(mean=mu, cov=np.diag(covars[j]), allow_singular=True)
        else:  # 'full'
            rv = multivariate_normal(mean=mu, cov=covars[j], allow_singular=True)
        # compute logpdf for all timesteps
        log_B[:, j] = rv.logpdf(obs)
    return log_B

# -----------------------------
# Your own Forward algorithm (log-space)
# -----------------------------
def forward_log(start_logprob, trans_logprob, log_emlik):
    """
    Log-forward algorithm (alpha in log-space).
    start_logprob: (n_states,)
    trans_logprob: (n_states, n_states) -- log of transition matrix
    log_emlik: (T, n_states) -- log emission likelihoods per timestep
    Returns:
      alpha_logs: (T, n_states), log_likelihood (scalar)
    """
    T, n_states = log_emlik.shape
    alpha = np.full((T, n_states), -np.inf)
    # init
    alpha[0] = start_logprob + log_emlik[0]
    for t in range(1, T):
        # For numeric stability use log-sum-exp
        prev = alpha[t-1]  # (n_states,)
        # compute next alpha[t, j] = logsum_k ( alpha[t-1,k] + trans_log[k,j] ) + log_emlik[t, j]
        for j in range(n_states):
            tmp = prev + trans_logprob[:, j]  # shape (n_states,)
            # log-sum-exp:
            m = np.max(tmp)
            if np.isfinite(m):
                alpha[t, j] = np.log(np.sum(np.exp(tmp - m))) + m + log_emlik[t, j]
            else:
                alpha[t, j] = -np.inf
    # log-likelihood = log-sum-exp over alpha[T-1]
    m = np.max(alpha[-1])
    if np.isfinite(m):
        loglik = np.log(np.sum(np.exp(alpha[-1] - m))) + m
    else:
        loglik = -np.inf
    return alpha, loglik

# -----------------------------
# Your own Viterbi algorithm (log-space)
# -----------------------------
def viterbi_log(start_logprob, trans_logprob, log_emlik):
    """
    Log-Viterbi to compute most likely state path.
    Returns path (length T) and path log-prob.
    """
    T, n_states = log_emlik.shape
    delta = np.full((T, n_states), -np.inf)  # best score up to t in state j
    psi = np.zeros((T, n_states), dtype=int)  # backpointers

    delta[0] = start_logprob + log_emlik[0]
    psi[0] = -1
    for t in range(1, T):
        for j in range(n_states):
            scores = delta[t-1] + trans_logprob[:, j]  # candidate scores
            best_prev = np.argmax(scores)
            delta[t, j] = scores[best_prev] + log_emlik[t, j]
            psi[t, j] = best_prev

    # backtrack
    path = np.zeros(T, dtype=int)
    path[T-1] = np.argmax(delta[T-1])
    for t in reversed(range(1, T)):
        path[t-1] = psi[t, path[t]]
    # path score
    path_score = delta[T-1, path[T-1]]
    return path, path_score

# -----------------------------
# Utilities for converting to log-space and using model params
# -----------------------------
def model_to_log_params(model):
    """
    Extract model parameters and convert to log-space
    """
    startprob = model.startprob_
    trans = model.transmat_
    start_log = np.log(startprob + 1e-300)
    trans_log = np.log(trans + 1e-300)
    means = model.means_  # (n_states, D)
    covars = model.covars_
    cov_type = model.covariance_type
    return start_log, trans_log, means, covars, cov_type

# -----------------------------
# Example pipeline
# -----------------------------
def example_pipeline(timit_root, words_to_model=None, max_files_per_word=20, n_hmm_states=5):
    """
    Full example pipeline:
     - Loads data for a small set of words (or directory labels)
     - Extracts MFCCs
     - Trains one GaussianHMM per word
     - Inspects the model
     - Runs own Forward and Viterbi on a held-out test sample
    """
    print("Scanning dataset...")
    pairs_by_word = load_timit_word_files(timit_root, words_list=words_to_model, max_per_word=max_files_per_word)
    models = {}
    for word, pairs in pairs_by_word.items():
        print(f"\nWord: {word}  |  files: {len(pairs)}")
        if len(pairs) < 2:
            print("  - Not enough data, skipping.")
            continue
        obs = make_obs_list_for_word(pairs)
        # train-test split for demonstration
        obs_train, obs_test = train_test_split(obs, test_size=0.2, random_state=RANDOM_STATE)
        n_states = n_hmm_states
        print(f"  - Training HMM with n_components={n_states}")
        model = train_gaussian_hmm(obs_train, n_components=n_states)
        models[word] = (model, obs_test)
        # Inspect
        print("  startprob shape:", model.startprob_.shape)
        print("  transmat shape:", model.transmat_.shape)
        print("  means shape:", model.means_.shape)
        print("  covars shape:", model.covars_.shape)
    # demo evaluation on a single test sample for first model
    if models:
        demo_word = next(iter(models.keys()))
        model, obs_test_list = models[demo_word]
        test_obs = obs_test_list[0]
        print(f"\nDemo on word '{demo_word}' test sample. Observations shape: {test_obs.shape}")

        # compute emission logprobs
        start_log, trans_log, means, covars, cov_type = model_to_log_params(model)
        log_eml = emission_logprob(test_obs, means, covars, cov_type)
        alpha, loglik = forward_log(start_log, trans_log, log_eml)
        path, path_score = viterbi_log(start_log, trans_log, log_eml)

        print(f"Forward log-likelihood: {loglik:.3f}")
        print(f"Viterbi path length: {len(path)}  Path score (log): {path_score:.3f}")
        print("Viterbi state sequence (first 20):", path[:20])
    else:
        print("No models trained. Check your dataset path and layout.")

    return models

# -----------------------------
# Main guard for script
# -----------------------------
if __name__ == "__main__":
    import argparse
    parser = argparse.ArgumentParser()
    parser.add_argument("--timit_root", type=str, required=False,
                        help="Path to TIMIT root (or any directory with .wav and .phn pairs).")
    parser.add_argument("--demo", action="store_true", help="Run demo pipeline (requires timit_root).")
    parser.add_argument("--n_states", type=int, default=7, help="HMM states per model")
    args = parser.parse_known_args()[0]

    if args.demo:
        if args.timit_root is None:
            print("Provide --timit_root path where .wav and .phn are located.")
        else:
            example_pipeline(args.timit_root, max_files_per_word=30, n_hmm_states=args.n_states)
    else:
        print("Script loaded. Use --demo --timit_root PATH to run the demo pipeline.")

Script loaded. Use --demo --timit_root PATH to run the demo pipeline.
