# Assignment 3: Hidden Markov Models

---

## Task 1) Isolated Word Recognition

In this assignment, we'll be revising word recognition, this time using Hidden Markov Models (HMM).
As with [assignment 1](https://github.com/seqlrn/assignments/tree/master/1-dynamic-programming), we'll be using the [free spoken digits](https://github.com/Jakobovski/free-spoken-digit-dataset) dataset.
We will be using the [`pandas`](https://pandas.pydata.org/docs/) library for data handling and [`hmmlearn`](https://hmmlearn.readthedocs.io/en/latest/index.html) library for HMMs which depends on `numpy`.
Install the `pandas` and `hmmlearn` packages in your working environment and get familiar with these modules.


### Data

Download Zohar Jackson's [free spoken digit](https://github.com/Jakobovski/free-spoken-digit-dataset) dataset.
There's no need to clone, feel free to use a revision, like [v1.0.10](https://github.com/Jakobovski/free-spoken-digit-dataset/archive/refs/tags/v1.0.10.tar.gz).
The file naming convention is `{digitLabel}_{speakerName}_{index}.wav`.

### Basic Setup

As you can learn from the [tutorial](https://hmmlearn.readthedocs.io/en/latest/tutorial.html#), `hmmlearn` provides us with the base implementation of Hidden Markov Models; we'll be using the `hmm.GaussianHMM`, which implements HMMs with a single Gaussian emission probability per state.
For a starter, build a basic isolated word recognizer that uses a separate model for each digit.

*In this Jupyter Notebook, we will provide the steps to solve this task and give hints via functions & comments. However, code modifications (e.g., function naming, arguments) and implementation of additional helper functions & classes are allowed. The code aims to help you get started.*

---

In [None]:
# Dependencies
import os
import librosa as lr
import numpy as np
import pandas as pd
from hmmlearn import hmm
import requests
import tarfile
import math
import re
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import confusion_matrix
from functools import reduce
import random

### Prepare the Data

1.1 To facilitate the selection of samples for speakers and digits, consider how you can store the data within a `pandas.DataFrame`.

1.2 Compute the MFCC features for the complete data set (3000 recordings; use `n_mfcc=13`).

1.3 Apply per-speaker feature normalization (e.g., standardization).

In [None]:
NUM_SAMPLES = 50 # recordings per speaker & digit
DIGITS = [0, 1, 2, 3, 4, 5, 6, 7, 8, 9]
SPEAKERS = ["george", "jackson", "lucas", "nicolas", "theo", "yweweler"]

DIGITS_URL = "https://github.com/Jakobovski/free-spoken-digit-dataset/archive/refs/tags/v1.0.10.tar.gz"
DATA_PATH = "data"
DIGITS_TARBALL_PATH = DATA_PATH + "/free-spoken-digit-dataset-1.0.10.tar.gz"
DIGITS_PATH = DATA_PATH + "/free-spoken-digit-dataset-1.0.10"
RECORDINGS_PATH = DIGITS_PATH + "/recordings"

if not os.path.exists(DIGITS_TARBALL_PATH):
    with open(DIGITS_TARBALL_PATH, "wb") as fp:
        fp.write(requests.get(DIGITS_URL).content)
if not os.path.exists(DIGITS_PATH):
    with tarfile.open(DIGITS_TARBALL_PATH) as tar:
        tar.extractall(DATA_PATH)

In [None]:
### Notice: a good default value is 25ms for FFT window and 10ms for hop length
### Notice: be careful as librosa takes the number of samples as input!      

N_MFCC = 13
HOP_LENGTH_S =0.01
WINDOW_LENGTH_S = 0.025

def compute_features(file):
    """Computes the features for a recording file."""
    ### YOUR CODE HERE
    
    signal, sr = lr.load(file)
    return lr.feature.mfcc(
        y=signal,
        sr=sr,
        n_mfcc=N_MFCC,
        hop_length=round(HOP_LENGTH_S * sr),
        n_fft=round(WINDOW_LENGTH_S * sr)
    )
    
    ### END YOUR CODE


def load_dataframe(input_dir: str) -> pd.DataFrame:
    """Loads the recordings into a pandas.DataFrame."""
    ### YOUR CODE HERE

    digits = []
    speakers = []
    indices = []
    features = []
    for file in os.listdir(input_dir):
        m = re.match(r"^(\d)_(\w+)_(\d+).wav$", file)
        if m is not None:
            digits.append(int(m.group(1)))
            speakers.append(m.group(2))
            indices.append(int(m.group(3)))
            features.append(compute_features(f"{input_dir}/{file}"))
    return pd.DataFrame({"digit": digits, "speaker": speakers, "index": indices, "features": features})
    
    ### END YOUR CODE


def normalize_features(dataframe: pd.DataFrame) -> pd.DataFrame:
    """Applies per-speaker feature normalization."""
    ### YOUR CODE HERE
    
    dataframe = dataframe.copy()
    for speaker in dataframe["speaker"].unique():
        is_speaker = dataframe["speaker"] == speaker
        trn = np.vstack([dataframe["features"][i].T for i in filter(lambda ind: is_speaker[ind], range(len(is_speaker)))])
        scaler = StandardScaler()
        scaler.fit(trn)
        for i in filter(lambda ind: is_speaker[ind], range(len(is_speaker))):
            dataframe.loc[i, "features"] = (scaler.transform(dataframe["features"][i].T).T,) # type: ignore
    return dataframe
    
    ### END YOUR CODE

In [None]:
INPUT_DIR = RECORDINGS_PATH
dataframe = load_dataframe(input_dir=INPUT_DIR)
dataframe_w_norm = normalize_features(dataframe=dataframe)

### Notice: just for test purposes

# print("Num recordings: {}".format(len(dataframe)))
# for speaker in SPEAKERS:
#     print("### {}".format(speaker))
#     data_speaker = dataframe[dataframe["speaker"] == speaker]

#     print(data_speaker["digit"].value_counts())
#     print()

### Train and Evaluate

2.1 Implement a 6-fold [cross-validation](https://en.wikipedia.org/wiki/Cross-validation_(statistics)) loop for the 6 speakers to (later) figure out, which test speaker performs best/worst. That is, each speaker acts as test speaker while the others are used for training (with each possible combination).

2.2 Inside the cross-validation loop, train an individual HMM with linear topology for each digit. There are several points to consider:

*The [`fit`](https://github.com/hmmlearn/hmmlearn/blob/38b3cece4a6297e978a204099ae6a0a99555ec01/lib/hmmlearn/base.py#L439) expects features to be sequential in a single array with `X` as (n_train_samples, n_features). Furthermore, we need to pass the lengths of each recording into the function with`lengths` as (n_samples,):*

```python
### you can flatten the features of the train data as follows

# input: [(rec_samples_1, n_feats), ..., (rec_samples_N, n_feats)]
# output: (all_rec_samples, n_feats)
features = [features for features in dataframe["features"].values]
flatten = np.concatenate(features, axis=0)

lengths = np.array([...])

# train HMM
hmm.fit(X=flatten, lengths=lengths)
```

*For the HMM, it is necessary to choose a meaningful number of states. How many states (`n_components`) do you choose, and why?*

*With respect to the used `hmmlearn` library. How can you enforce a linear topology?*

*You might find that certain digits perform particularly bad; what could be a reason and how to mitigate it?*
    
2.3 Compute the [confusion matrix](https://en.wikipedia.org/wiki/Confusion_matrix) for each speaker and for the overall dataset by combining the predictions of the cross-validation. You can use the [`scikit-learn`](https://scikit-learn.org/stable/auto_examples/model_selection/plot_confusion_matrix.html) library.

2.4 Additional experiment: Compare the results without and with per-speaker feature normalization. How does the performance change?

In [None]:
### TODO:
### 1. set the `n_components` for all digits (choose a meaningful number of states)

n_comps = {i: 0 for i in DIGITS}

### YOUR CODE HERE

for i in DIGITS:
    n_comps[i] = 8

### END YOUR CODE

In [None]:
### TODO: 
### 1. implement the 6-fold cross-validation loop
### 2. allocate and initialize the HMMs, one for each digit; set a linear topology
### 3. train the HMMs using the fit method; data needs to be concatenated
### 4. evaluate the trained models on the test speaker; how do you decide which word
###    was spoken?

### YOUR CODE HERE

STAY_PROB = 0.8

def init_model(n_components: int, stay_prob: float) -> hmm.GaussianHMM:
    model = hmm.GaussianHMM(n_components, covariance_type="diag", init_params="cm", params="cmt")
    startprob = np.zeros((n_components,))
    startprob[0] = 1
    model.startprob_ = startprob
    transmat = np.zeros((n_components, n_components))
    for i in range(n_components - 1):
        transmat[i, i] = stay_prob
        transmat[i, i + 1] = 1 - stay_prob
    transmat[-1, -1] = 1
    model.transmat_ = transmat
    return model

### END YOUR CODE

In [None]:
models = {}
confusion_matrices: dict[str, np.ndarray] = {}
accuracies: dict[str, float] = {}
precisions: dict[str, np.ndarray] = {}
recalls: dict[str, np.ndarray] = {}
f1_scores: dict[str, np.ndarray] = {}

for speaker in SPEAKERS:
    test = dataframe_w_norm[dataframe_w_norm["speaker"] == speaker]
    fold_models = {i: init_model(n, STAY_PROB) for i, n in n_comps.items()}
    models[speaker] = fold_models
    for i in DIGITS:
        train = list(dataframe_w_norm[(dataframe_w_norm["speaker"] != speaker) & (dataframe_w_norm["digit"] == i)]["features"])
        fold_models[i].fit(np.vstack([x.T for x in train]), [x.shape[1] for x in train])
    y = np.empty((len(test),), dtype=np.int32)
    y_pred = np.empty((len(test),), dtype=np.int32)
    for i, d in enumerate(test["digit"]):
        y[i] = d
    for i, x, in enumerate(test["features"]):
        scores = np.empty((len(DIGITS,)))
        for d in DIGITS:
            scores[d], _ = fold_models[d].decode(x.T)
        y_pred[i] = np.argmax(scores)
    
    C = confusion_matrix(y, y_pred)
    acc = (C.diagonal().sum() / C.sum()).item()
    prec = C.diagonal() / C.sum(axis=0)
    rec = C.diagonal() / C.sum(axis=1)
    f1 = 2 / (1 / (prec + 1e-24) + 1 / (rec + 1e-24))
    confusion_matrices[speaker] = C
    accuracies[speaker] = acc
    precisions[speaker] = prec
    recalls[speaker] = rec
    f1_scores[speaker] = f1

In [None]:
### TODO: 
### 1. based on the results, compute and display the confusion matrix for 
###    each test speaker 
### 2. compute and display the confusion matrix for the overall dataset

### YOUR CODE HERE

C_sum = reduce(lambda a, b: a + b, confusion_matrices.values())
acc_sum = C_sum.diagonal().sum() / C_sum.sum()
prec_sum = C_sum.diagonal() / C_sum.sum(axis=0)
rec_sum = C_sum.diagonal() / C_sum.sum(axis=1)
f1_sum = 2 / (1 / (prec_sum + 1e-24) + 1 / (rec_sum + 1e-24))

for i, speaker, in enumerate(SPEAKERS):
    print(f"cross validation report fold {i + 1} / {len(SPEAKERS)}, speaker {speaker}")
    print("confusion matrix:")
    print(confusion_matrices[speaker])
    print(f"accuracy: {accuracies[speaker]}")
    print(f"precisions: {precisions[speaker]}")
    print(f"recalls: {recalls[speaker]}")
    print(f"F1 scores: {f1_scores[speaker]}")
    print()

print("overall report")
print("confusion matrix:")
print(C_sum)
print(f"accuracy: {acc_sum}")
print(f"precisions: {prec_sum}")
print(f"recalls: {rec_sum}")
print(f"F1 scores: {f1_sum}")

### END YOUR CODE

---

## Task 2) Decoding Sequences of Digits

The example above can't handle sequences of spoken digits.
In this part of the assignment, you'll build a basic decoder that is able to decode arbitrary sequences of digits (without a prior, though).
The `decode` method in `hmmlearn` only works for a single HMM.
There are two ways how to solve this assignment:

- Construct a "meta" HMM from the previously trained digit HMMs, by allowing state transitions from one digit to another; the resulting HMM can be decoded using the existing `decode` method (don't forget to re-map the state ids to the originating digit).

- (Optional) Implement a real (time-synchronous) decoder using beam search. The straight-forward way is to maintain a (sorted) list of active hypotheses (ie. state history and current log-likelihood) that is first expanded and then pruned in each time step. The tricky part is at the "end" of a model: do you loop or expand new words?

---

### Generate Test Sequences

3.1 Generate a few test sequences of random length in between 3 and 6 digits; use [`numpy.random.randint`](https://numpy.org/doc/stable/reference/random/generated/numpy.random.randint.html) and be sure to also retain the digits sequence since we need to compute edit distance between reference and hypotheses later.

In [None]:
def create_digit_sequence(speaker_dataframe: pd.DataFrame, min_digits: int, max_digits: int) -> tuple[list[int], np.ndarray]:
    """
    Creates a sequence of spoken digits from a speaker and returns the
    features and reference label.
    """
    ### YOUR CODE HERE
    
    digits = [random.randint(DIGITS[0], DIGITS[-1]) for _ in range(round(random.randint(min_digits, max_digits)))]
    segments = []
    for d in digits:
        options = speaker_dataframe[speaker_dataframe["digit"] == d]["features"]
        segments.append(options[random.randint(0, len(options))])
    return digits, np.hstack(segments)
    
    ### END YOUR CODE

In [None]:
### Notice: just for test purposes

# speaker = "george"
# data_george = dataframe[dataframe["speaker"] == speaker]
# for i in range(20):
#     data_seq, digits = create_digit_sequence(data_george)
#     print("Digits: {}".format(digits))

### Create "meta" HMM

4.1 Combine the previously trained HMMs to a single "meta" HMM, altering the transition probabilities to make a circular graph that allows each word to follow another.

4.2 Implement a method that converts a state sequence relating to the meta HMM into a sequence of actual digits.

4.3 Decode your test sequences and compute the [word error rate](https://en.wikipedia.org/wiki/Word_error_rate) (WER) with [JiWER](https://pypi.org/project/jiwer/) (install the package in your working environment).

4.4 Compute an overall WER; ie. over the cross-validation.

4.5 (Optional) Implement a basic time-synchronous beam search; how do the results compare to the above viterbi decoding in terms of accuracy and time?

In [None]:
### YOUR CODE HERE

def meta_hmm(models: list[hmm.GaussianHMM]) -> tuple[hmm.GaussianHMM, list[int]]:
    c = sum(m.n_components for m in models)
    feature_dim = models[0].means_.shape[1]
    covars = np.empty((c, feature_dim, feature_dim))
    means = np.empty((c, feature_dim))
    start_probs = np.zeros(c)
    transmat = np.zeros((c, c))
    start_prob = 1 / len(models)
    trans_prob = (1 - STAY_PROB) / len(models)
    start_idcs = []
    
    i = 0
    for m in models:
        start_idcs.append(i)
        j = i + m.n_components
        covars[i:j, :, :] = m.covars_
        means[i:j, :] = m.means_
        start_probs[i] = start_prob
        transmat[i:j-1, i:j] = m.transmat_[:-1, :]
        transmat[j, j] = STAY_PROB
        k = 0
        for m_ in models:
            transmat[j, k] = trans_prob
            k += m_.n_components
        i = j

    result = hmm.GaussianHMM(c, models[0].covariance_type, params="")
    result.startprob_ = start_probs
    result.transmat_ = transmat
    result.covars_ = covars
    result.means_ = means
    return result, start_idcs

class SequenceDecoder:
    @property
    def keys(self) -> list[int]:
        return list(self.keys)

    def __init__(self, models: dict[int, hmm.GaussianHMM]):
        self.__keys = list(models.keys())        
        self.__model, self.__start_idcs = meta_hmm([models[k] for k in self.keys])
        self.__lookup = {self.__start_idcs[i]: self.__keys[i] for i in range(len(self.__keys))}

    def decode(self, sequence: np.ndarray) -> list[int]:
        result = []
        _, states = self.__model.decode(sequence)
        it = iter(states)
        last = it.next()
        result.append(self.__lookup[last])
        for s in it:
            if s != last and s in self.__lookup:
                result.append(self.__lookup[s])
            last = s
        return result


### END YOUR CODE

