In [1]:
#@title

# lab1_tools.py

import numpy as np


# - Functions given by the exercise --------------------------------------------

def tidigit2labels(tidigitsarray):
    """
    Return a list of labels including gender, speaker, digit and repetition information for each
    utterance in tidigitsarray. Useful for plots.
    """
    labels = []
    nex = len(tidigitsarray)
    for ex in range(nex):
        labels.append(tidigitsarray[ex]['gender'] + '_' + 
                      tidigitsarray[ex]['speaker'] + '_' + 
                      tidigitsarray[ex]['digit'] + '_' + 
                      tidigitsarray[ex]['repetition'])
    return labels


def dither(samples, level=1.0):
    """
    Applies dithering to the samples. Adds Gaussian noise to the samples to avoid numerical
        errors in the subsequent FFT calculations.

        samples: array of speech samples
        level: decides the amount of dithering (see code for details)

    Returns:
        array of dithered samples (same shape as samples)
    """
    return samples + level*np.random.normal(0,1, samples.shape)
    

def lifter(mfcc, lifter=22):
    """
    Applies liftering to improve the relative range of MFCC coefficients.

       mfcc: NxM matrix where N is the number of frames and M the number of MFCC coefficients
       lifter: lifering coefficient

    Returns:
       NxM array with lifeterd coefficients
    """
    nframes, nceps = mfcc.shape
    cepwin = 1.0 + lifter/2.0 * np.sin(np.pi * np.arange(nceps) / lifter)
    return np.multiply(mfcc, np.tile(cepwin, nframes).reshape((nframes,nceps)))


def hz2mel(f):
    """Convert an array of frequency in Hz into mel."""
    return 1127.01048 * np.log(f/700 +1)


def trfbank(fs, nfft, lowfreq=133.33, linsc=200/3., logsc=1.0711703, nlinfilt=13, nlogfilt=27, equalareas=False):
    """Compute triangular filterbank for MFCC computation.

    Inputs:
    fs:         sampling frequency (rate)
    nfft:       length of the fft
    lowfreq:    frequency of the lowest filter
    linsc:      scale for the linear filters
    logsc:      scale for the logaritmic filters
    nlinfilt:   number of linear filters
    nlogfilt:   number of log filters

    Outputs:
    res:  array with shape [N, nfft], with filter amplitudes for each column.
            (N=nlinfilt+nlogfilt)
    From scikits.talkbox"""
    # Total number of filters
    nfilt = nlinfilt + nlogfilt

    #------------------------
    # Compute the filter bank
    #------------------------
    # Compute start/middle/end points of the triangular filters in spectral
    # domain
    freqs = np.zeros(nfilt+2)
    freqs[:nlinfilt] = lowfreq + np.arange(nlinfilt) * linsc
    freqs[nlinfilt:] = freqs[nlinfilt-1] * logsc ** np.arange(1, nlogfilt + 3)
    if equalareas:
        heights = np.ones(nfilt)
    else:
        heights = 2./(freqs[2:] - freqs[0:-2])

    # Compute filterbank coeff (in fft domain, in bins)
    fbank = np.zeros((nfilt, nfft))
    # FFT bins (in Hz)
    nfreqs = np.arange(nfft) / (1. * nfft) * fs
    for i in range(nfilt):
        low = freqs[i]
        cen = freqs[i+1]
        hi = freqs[i+2]

        lid = np.arange(np.floor(low * nfft / fs) + 1,
                        np.floor(cen * nfft / fs) + 1, dtype=np.int)
        lslope = heights[i] / (cen - low)
        rid = np.arange(np.floor(cen * nfft / fs) + 1,
                        np.floor(hi * nfft / fs) + 1, dtype=np.int)
        rslope = heights[i] / (hi - cen)
        fbank[i][lid] = lslope * (nfreqs[lid] - low)
        fbank[i][rid] = rslope * (hi - nfreqs[rid])

    return fbank


In [2]:
#@title

# lab1_proto.py

import matplotlib.pyplot as plt
from scipy.signal import lfilter, hamming
from scipy.fftpack import fft
from scipy.fftpack.realtransforms import dct


# Function given by the exercise ----------------------------------

def mspec(samples, winlen = 400, winshift = 200, preempcoeff=0.97, nfft=512, samplingrate=20000):
    """Computes Mel Filterbank features.

    Args:
        samples: array of speech samples with shape (N,)
        winlen: lenght of the analysis window
        winshift: number of samples to shift the analysis window at every time step
        preempcoeff: pre-emphasis coefficient
        nfft: length of the Fast Fourier Transform (power of 2, >= winlen)
        samplingrate: sampling rate of the original signal

    Returns:
        N x nfilters array with mel filterbank features (see trfbank for nfilters)
    """
    frames = enframe(samples, winlen, winshift)
    preemph = preemp(frames, preempcoeff)
    windowed = windowing(preemph)
    spec = powerSpectrum(windowed, nfft)
    return logMelSpectrum(spec, samplingrate)


def mfcc(samples, winlen = 400, winshift = 200, preempcoeff=0.97, nfft=512, nceps=13, samplingrate=20000, liftercoeff=22):
    """Computes Mel Frequency Cepstrum Coefficients.

    Args:
        samples: array of speech samples with shape (N,)
        winlen: lenght of the analysis window
        winshift: number of samples to shift the analysis window at every time step
        preempcoeff: pre-emphasis coefficient
        nfft: length of the Fast Fourier Transform (power of 2, >= winlen)
        nceps: number of cepstrum coefficients to compute
        samplingrate: sampling rate of the original signal
        liftercoeff: liftering coefficient used to equalise scale of MFCCs

    Returns:
        N x nceps array with lifetered MFCC coefficients
    """
    mspecs = mspec(samples, winlen, winshift, preempcoeff, nfft, samplingrate)
    ceps = cepstrum(mspecs, nceps)
    return lifter(ceps, liftercoeff)


# Functions to be implemented ----------------------------------

def enframe(samples, winlen, winshift):
    """
    Slices the input samples into overlapping windows.

    Args:
        winlen: window length in samples.
        winshift: shift of consecutive windows in samples
    Returns:
        numpy array [N x winlen], where N is the number of windows that fit
        in the input signal
    """
    if len(samples) < winlen:
        raise ValueError('Too long winlen wrt input signal.')

    N = 1 + int((len(samples) - winlen) / winshift)     # first window + how many times the window can be shifted
    frames = np.zeros((N, winlen))

    # init window
    start = 0
    end = start + winlen

    for i in range(N):
        # save frame
        frames[i] = samples[start:end]

        # shift window
        start = start + winshift
        end = start + winlen
    return frames


def preemp(input, p=0.97):
    """
    Pre-emphasis filter.

    Args:
        input: array of speech frames [N x M] where N is the number of frames and
               M the samples per frame
        p: preemhasis factor (defaults to the value specified in the exercise)

    Output:
        output: array of pre-emphasised speech samples
    Note (you can use the function lfilter from scipy.signal)
    """
    # y[n] = x[n] - p*x[n-1]
    num = [1, -p]
    den = [1]
    return lfilter(num, den, input)


def windowing(input):
    """
    Applies hamming window to the input frames.

    Args:
        input: array of speech samples [N x M] where N is the number of frames and
               M the samples per frame
    Output:
        array of windowed speech samples [N x M]
    Note (you can use the function hamming from scipy.signal, include the sym=False option
    if you want to get the same results as in the example)
    """
    w = hamming(input.shape[1], sym=False)

    # window shape (for explanation)
    # plt.figure()
    # plt.plot(w)
    # plt.title('Hamming window')
    # plt.xlabel('sample')
    # plt.ylabel('amplitude')
    # plt.show()

    return input * w


def powerSpectrum(input, nfft):
    """
    Calculates the power spectrum of the input signal, that is the square of the modulus of the FFT

    Args:
        input: array of speech samples [N x M] where N is the number of frames and
               M the samples per frame
        nfft: length of the FFT
    Output:
        array of power spectra [N x nfft]
    Note: you can use the function fft from scipy.fftpack
    """
    return np.absolute(fft(input, n=nfft)) ** 2


def logMelSpectrum(input, samplingrate):
    """
    Calculates the log output of a Mel filterbank when the input is the power spectrum

    Args:
        input: array of power spectrum coefficients [N x nfft] where N is the number of frames and
               nfft the length of each spectrum
        samplingrate: sampling rate of the original signal (used to calculate the filterbank shapes)
    Output:
        array of Mel filterbank log outputs [N x nmelfilters] where nmelfilters is the number
        of filters in the filterbank
    Note: use the trfbank function provided in lab1_tools.py to calculate the filterbank shapes and
          nmelfilters
    """
    nfft = input.shape[1]
    filterbank = trfbank(samplingrate, nfft)

    # filters (for explanation)
    # plt.figure()
    # plt.plot(filterbank[::5])
    # plt.title('Filterbank')
    # plt.xlabel('frequency')
    # plt.ylabel('amplitude')
    # plt.show()

    return np.log(input @ filterbank.T)


def cepstrum(input, nceps):
    """
    Calulates Cepstral coefficients from mel spectrum applying Discrete Cosine Transform

    Args:
        input: array of log outputs of Mel scale filterbank [N x nmelfilters] where N is the
               number of frames and nmelfilters the length of the filterbank
        nceps: number of output cepstral coefficients
    Output:
        array of Cepstral coefficients [N x nceps]
    Note: you can use the function dct from scipy.fftpack.realtransforms
    """
    cepstrum = dct(input)
    cepstrum = cepstrum[:, :nceps]
    return cepstrum


def dtw(x, y, dist):
    """Dynamic Time Warping.

    Args:
        x, y: arrays of size NxD and MxD respectively, where D is the dimensionality
              and N, M are the respective lenghts of the sequences
        dist: distance function (can be used in the code as dist(x[i], y[j]))

    Outputs:
        d: global distance between the sequences (scalar) normalized to len(x)+len(y)
        LD: local distance between frames from x and y (NxM matrix)
        AD: accumulated distance between frames of x and y (NxM matrix)
        path: best path through AD

    Note that you only need to define the first output for this exercise.
    """
    N = x.shape[0]
    M = y.shape[0]

    # local distance between frames (frame-wise distance)
    LD = np.zeros((N, M))
    for i in range(N):
        for j in range(M):
            LD[i, j] = dist(x[i], y[j])

    # accumulated distances
    AD = np.zeros((N, M))
    pred = np.zeros((N, M, 2), dtype='uint8')   # the predecessor is represented as tuple of coordinates
    for i in range(N):
        for j in range(M):
            if i != 0 or j != 0:
                # find minimum and save predecessor
                candidates_pred = []
                ad_candidates_pred = []
                if i > 0:
                    candidates_pred.append((i-1, j))
                    ad_candidates_pred.append(AD[i-1, j])
                if i > 0 and j > 0:
                    candidates_pred.append((i-1, j-1))
                    ad_candidates_pred.append(AD[i-1, j-1])
                if j > 0:
                    candidates_pred.append((i, j-1))
                    ad_candidates_pred.append(AD[i, j-1])
                m = min(ad_candidates_pred)
                idx_candidate = ad_candidates_pred.index(m)
                pred[i, j] = candidates_pred[idx_candidate]
            else:
                m = 0

            # compute accumulated distance
            AD[i, j] = LD[i, j] + m

    # global distance
    d = AD[-1, -1] / (N + M)

    # best path (backtracking)
    path = [(N-1, M-1)]
    current = path[0]
    while current != (0, 0):
        path.insert(0, tuple(pred[current]))
        current = path[0]

    # show best path (debugging)
    # LD_ = LD.copy()
    # for node in path:
    #     LD_[node] = 0   # to be visible in black
    # plt.figure()
    # plt.pcolormesh(LD_)
    # plt.show()

    return d, LD, AD, path


In [3]:
#@title

# lab2_tools.py

import numpy as np


def logsumexp(arr, axis=0):
    """Computes the sum of arr assuming arr is in the log domain.
    Returns log(sum(exp(arr))) while minimizing the possibility of
    over/underflow.
    """
    arr = np.rollaxis(arr, axis)
    # Use the max to normalize, as with the log this is what accumulates
    # the less errors
    vmax = arr.max(axis=0)
    if vmax.ndim > 0:
        vmax[~np.isfinite(vmax)] = 0
    elif not np.isfinite(vmax):
        vmax = 0
    with np.errstate(divide="ignore"):
        out = np.log(np.sum(np.exp(arr - vmax), axis=0))
        out += vmax
        return out


def log_multivariate_normal_density_diag(X, means, covars):
    """Compute Gaussian log-density at X for a diagonal model

    Args:
        X: array like, shape (n_observations, n_features)
        means: array like, shape (n_components, n_features)
        covars: array like, shape (n_components, n_features)

    Output:
        lpr: array like, shape (n_observations, n_components)
    From scikit-learn/sklearn/mixture/gmm.py
    """
    n_samples, n_dim = X.shape
    lpr = -0.5 * (n_dim * np.log(2 * np.pi) + np.sum(np.log(covars), 1)
                  + np.sum((means ** 2) / covars, 1)
                  - 2 * np.dot(X, (means / covars).T)
                  + np.dot(X ** 2, (1.0 / covars).T))
    return lpr


In [4]:
#@title

# lab2_proto.py

def concatTwoHMMs(hmm1, hmm2):
    """ Concatenates 2 HMM models

    Args:
       hmm1, hmm2: two dictionaries with the following keys:
           name: phonetic or word symbol corresponding to the model
           startprob: M+1 array with priori probability of state
           transmat: (M+1)x(M+1) transition matrix
           means: MxD array of mean vectors
           covars: MxD array of variances

    D is the dimension of the feature vectors
    M is the number of emitting states in each HMM model (could be different for each)

    Output
       dictionary with the same keys as the input but concatenated models:
          startprob: K+1 array with priori probability of state
          transmat: (K+1)x(K+1) transition matrix
             means: KxD array of mean vectors
            covars: KxD array of variances

    K is the sum of the number of emitting states from the input models
   
    Example:
       twoHMMs = concatHMMs(phoneHMMs['sil'], phoneHMMs['ow'])

    See also: the concatenating_hmms.pdf document in the lab package
    """
    M1 = len(hmm1['startprob']) - 1
    M2 = len(hmm2['startprob']) - 1
    K = M1 + M2

    hmm_concat = {
        'name': hmm1['name'] + hmm2['name'],
        'startprob': np.zeros(K+1),
        'transmat': np.zeros((K+1, K+1)),
        'means': np.concatenate((hmm1['means'], hmm2['means'])),
        'covars': np.concatenate((hmm1['covars'], hmm2['covars']))
    }

    hmm_concat['startprob'][:M1] = hmm1['startprob'][:-1]
    hmm_concat['startprob'][M1:] = hmm1['startprob'][-1] * hmm2['startprob']

    hmm_concat['transmat'][:M1, :M1] = hmm1['transmat'][:-1, :-1]
    hmm_concat['transmat'][:M1, M1:] = hmm1['transmat'][:M1, -1].reshape(-1, 1) * hmm2['startprob'].reshape(1, -1)
    hmm_concat['transmat'][M1:, M1:] = hmm2['transmat']

    return hmm_concat


# this is already implemented, but based on concat2HMMs() above
def concatHMMs(hmmmodels, namelist):
    """ Concatenates HMM models in a left to right manner

    Args:
       hmmmodels: dictionary of models indexed by model name. 
       hmmmodels[name] is a dictionaries with the following keys:
           name: phonetic or word symbol corresponding to the model
           startprob: M+1 array with priori probability of state
           transmat: (M+1)x(M+1) transition matrix
           means: MxD array of mean vectors
           covars: MxD array of variances
       namelist: list of model names that we want to concatenate

    D is the dimension of the feature vectors
    M is the number of emitting states in each HMM model (could be
      different in each model)

    Output
       combinedhmm: dictionary with the same keys as the input but
                    combined models:
         startprob: K+1 array with priori probability of state
          transmat: (K+1)x(K+1) transition matrix
             means: KxD array of mean vectors
            covars: KxD array of variances

    K is the sum of the number of emitting states from the input models

    Example:
       wordHMMs['o'] = concatHMMs(phoneHMMs, ['sil', 'ow', 'sil'])
    """
    concat = hmmmodels[namelist[0]]
    for idx in range(1,len(namelist)):
        concat = concatTwoHMMs(concat, hmmmodels[namelist[idx]])
    return concat


def gmmloglik(log_emlik, weights):
    """Log Likelihood for a GMM model based on Multivariate Normal Distribution.

    Args:
        log_emlik: array like, shape (N, K).
            contains the log likelihoods for each of N observations and
            each of K distributions
        weights:   weight vector for the K components in the mixture

    Output:
        gmmloglik: scalar, log likelihood of data given the GMM model.
    """


def forward(log_emlik, log_startprob, log_transmat):
    """Forward (alpha) probabilities in log domain.

    Args:
        log_emlik: NxM array of emission log likelihoods, N frames, M states
        log_startprob: log probability to start in state i
        log_transmat: log transition probability from state i to j

    Output:
        forward_prob: NxM array of forward log probabilities for each of the M states in the model
    """
    log_alpha = np.zeros(log_emlik.shape)
    log_startprob = log_startprob[:-1]      # remove non-emitting state
    log_transmat = log_transmat[:-1, :-1]

    log_alpha[0] = log_startprob + log_emlik[0]
    for n in range(1, len(log_emlik)):
        log_alpha[n] = logsumexp(log_alpha[n-1].reshape(-1, 1) + log_transmat) + log_emlik[n]

    loglik = logsumexp(log_alpha[-1])
    return log_alpha, loglik


def backward(log_emlik, log_startprob, log_transmat):
    """Backward (beta) probabilities in log domain.

    Args:
        log_emlik: NxM array of emission log likelihoods, N frames, M states
        log_startprob: log probability to start in state i
        log_transmat: transition log probability from state i to j

    Output:
        backward_prob: NxM array of backward log probabilities for each of the M states in the model
    """
    log_beta = np.zeros(log_emlik.shape)
    log_startprob = log_startprob[:-1]  # remove non-emitting state
    log_transmat = log_transmat[:-1, :-1]

    log_beta[-1] = 0
    for n in range(len(log_emlik)-2, -1, -1):
        log_beta[n] = logsumexp(log_transmat + log_emlik[n+1] + log_beta[n+1], axis=1)

    loglik = logsumexp(log_emlik[0] + log_beta[0] + log_startprob)
    return log_beta, loglik


def viterbi(log_emlik, log_startprob, log_transmat, forceFinalState=True):
    """Viterbi path.

    Args:
        log_emlik: NxM array of emission log likelihoods, N frames, M states
        log_startprob: log probability to start in state i
        log_transmat: transition log probability from state i to j
        forceFinalState: if True, start backtracking from the final state in
                  the model, instead of the best state at the last time step

    Output:
        viterbi_loglik: log likelihood of the best path
        viterbi_path: best path
    """
    N, M = log_emlik.shape
    logV = np.zeros((N, M))
    B = np.zeros((N, M), dtype='uint32')    # the first row is not used but kept for simplicity
    log_startprob = log_startprob[:-1]      # remove non-emitting state
    log_transmat = log_transmat[:-1, :-1]

    logV[0] = log_startprob + log_emlik[0]
    for n in range(1, len(log_emlik)):
        aux = logV[n - 1].reshape(-1, 1) + log_transmat
        logV[n] = np.max(aux, axis=0) + log_emlik[n]
        B[n] = np.argmax(aux, axis=0)
    viterbi_loglik = np.max(logV[-1])

    # backtracking
    viterbi_path = np.zeros(N, dtype='uint32')
    if forceFinalState:
        viterbi_path[-1] = M-1
    else:
        viterbi_path[-1] = np.argmax(logV[-1])
    for n in range(N-2, -1, -1):
        viterbi_path[n] = B[n+1, viterbi_path[n+1]]

    return viterbi_loglik, viterbi_path


def statePosteriors(log_alpha, log_beta):
    """State posterior (gamma) probabilities in log domain.

    Args:
        log_alpha: NxM array of log forward (alpha) probabilities
        log_beta: NxM array of log backward (beta) probabilities
    where N is the number of frames, and M the number of states

    Output:
        log_gamma: NxM array of gamma probabilities for each of the M states in the model
    """
    log_gamma = log_alpha + log_beta - logsumexp(log_alpha[-1])
    return log_gamma


def updateMeanAndVar(X, log_gamma, varianceFloor=5.0):
    """ Update Gaussian parameters with diagonal covariance

    Args:
         X: NxD array of feature vectors
         log_gamma: NxM state posterior probabilities in log domain
         varianceFloor: minimum allowed variance scalar
    were N is the lenght of the observation sequence, D is the
    dimensionality of the feature vectors and M is the number of
    states in the model

    Outputs:
         means: MxD mean vectors for each state
         covars: MxD covariance (variance) vectors for each state
    """
    N, D = X.shape
    M = log_gamma.shape[1]
    means = np.zeros((M, D))
    covars = np.zeros((M, D))

    for j in range(M):
        means[j] = np.sum(np.exp(log_gamma[:, j]) * X.T, axis=1) / np.exp(logsumexp(log_gamma[:, j]))
        covars[j] = np.sum(np.exp(log_gamma[:, j]) * (X - means[j]).T ** 2, axis=1) / np.exp(logsumexp(log_gamma[:, j]))
        covars[covars < varianceFloor] = varianceFloor
    return means, covars


In [5]:
#@title

# lab3_tools.py

import numpy as np
import os
from pysndfile import sndio


def path2info(path):
    """
    path2info: parses paths in the TIDIGIT format and extracts information
               about the speaker and the utterance

    Example:
    path2info('tidigits/disc_4.1.1/tidigits/train/man/ae/z9z6531a.wav')
    """
    rest, filename = os.path.split(path)
    rest, speakerID = os.path.split(rest)
    rest, gender = os.path.split(rest)
    digits = filename[:-5]
    repetition = filename[-5]
    return gender, speakerID, digits, repetition


def loadAudio(filename):
    """
    loadAudio: loads audio data from file using pysndfile

    Note that, by default pysndfile converts the samples into floating point
    numbers and rescales them in the range [-1, 1]. This is avoided by specifying
    the option dtype=np.int16 which keeps both the original data type and range
    of values.
    """
    sndobj = sndio.read(filename, dtype=np.int16)
    samplingrate = sndobj[1]
    samples = np.array(sndobj[0])
    return samples, samplingrate


def frames2trans(sequence, outfilename=None, timestep=0.01):
    """
    Outputs a standard transcription given a frame-by-frame
    list of strings.

    Example (using functions from Lab 1 and Lab 2):
    phones = ['sil', 'sil', 'sil', 'ow', 'ow', 'ow', 'ow', 'ow', 'sil', 'sil']
    trans = frames2trans(phones, 'oa.lab')

    Then you can use, for example wavesurfer to open the wav file and the transcription
    """
    sym = sequence[0]
    start = 0
    end = 0
    trans = ''
    for t in range(len(sequence)):
        if sequence[t] != sym:
            trans = trans + str(start) + ' ' + str(end) + ' ' + sym + '\n'
            sym = sequence[t]
            start = end
        end = end + timestep
    trans = trans + str(start) + ' ' + str(end) + ' ' + sym + '\n'
    if outfilename != None:
        with open(outfilename, 'w') as f:
            f.write(trans)
    return trans


ModuleNotFoundError: No module named 'pysndfile'

In [None]:
#@title

# lab3_proto

def words2phones(wordList, pronDict, addSilence=True, addShortPause=True):
    """ word2phones: converts word level to phone level transcription adding silence

    Args:
       wordList: list of word symbols
       pronDict: pronunciation dictionary. The keys correspond to words in wordList
       addSilence: if True, add initial and final silence
       addShortPause: if True, add short pause model "sp" at end of each word
    Output:
       list of phone symbols
    """
    phones = []

    for word in wordList:
        phones.extend(pronDict[word])       # insert phones of words
        if addShortPause:
            phones.append('sp')             # add short pause between words

    if addSilence:
        phones.insert(0, 'sil')             # add initial silence
        phones.append('sil')                # add final silence

    return phones

def forcedAlignment(lmfcc, phoneHMMs, phoneTrans):
    """ forcedAlignmen: aligns a phonetic transcription at the state level

    Args:
       lmfcc: NxD array of MFCC feature vectors (N vectors of dimension D)
              computed the same way as for the training of phoneHMMs
       phoneHMMs: set of phonetic Gaussian HMM models
       phoneTrans: list of phonetic symbols to be aligned including initial and
                   final silence

    Returns:
       list of strings in the form phoneme_index specifying, for each time step
       the state from phoneHMMs corresponding to the viterbi path.
    """
    # phone transcription => state transcription
    phones = sorted(phoneHMMs.keys())
    nstates = {phone: phoneHMMs[phone]['means'].shape[0] for phone in phones}
    stateTrans = [p + '_' + str(i) for p in phoneTrans for i in range(nstates[p])]

    # combined HMM for utterance
    utteranceHMM = concatHMMs(phoneHMMs, phoneTrans)

    # Viterbi decoder
    obsloglik = log_multivariate_normal_density_diag(lmfcc, utteranceHMM['means'], utteranceHMM['covars'])
    viterbiPath = viterbi(obsloglik, np.log(utteranceHMM['startprob']), np.log(utteranceHMM['transmat']))[1]

    # time alignment (frame-by-frame state transcription)
    viterbiStateTrans = [stateTrans[s] for s in viterbiPath]

    return viterbiStateTrans

def hmmLoop(hmmmodels, namelist=None):
    """ Combines HMM models in a loop

    Args:
       hmmmodels: list of dictionaries with the following keys:
           name: phonetic or word symbol corresponding to the model
           startprob: M+1 array with priori probability of state
           transmat: (M+1)x(M+1) transition matrix
           means: MxD array of mean vectors
           covars: MxD array of variances
       namelist: list of model names that we want to combine, if None,
                 all the models in hmmmodels are used

    D is the dimension of the feature vectors
    M is the number of emitting states in each HMM model (could be
      different in each model)

    Output
       combinedhmm: dictionary with the same keys as the input but
                    combined models
       stateMap: map between states in combinedhmm and states in the
                 input models.

    Examples:
       phoneLoop = hmmLoop(phoneHMMs)
       wordLoop = hmmLoop(wordHMMs, ['o', 'z', '1', '2', '3'])
    """


In [None]:
#@title

import warnings
import matplotlib.pyplot as plt

def pcolormesh(matrix, title, ylabel):
    plt.pcolormesh(matrix.T)
    plt.title(title)
    plt.ylabel(ylabel)
    plt.xlabel('time step')
    plt.yticks(np.arange(matrix.shape[1]) + .5, range(matrix.shape[1]))
    plt.xticks(np.arange(0, matrix.shape[0], 10) + .5, range(0, matrix.shape[0], 10))

prondict = {
    'o': ['ow'],
    'z': ['z', 'iy', 'r', 'ow'],
    '1': ['w', 'ah', 'n'],
    '2': ['t', 'uw'],
    '3': ['th', 'r', 'iy'],
    '4': ['f', 'ao', 'r'],
    '5': ['f', 'ay', 'v'],
    '6': ['s', 'ih', 'k', 's'],
    '7': ['s', 'eh', 'v', 'ah', 'n'],
    '8': ['ey', 't'],
    '9': ['n', 'ay', 'n'],
}

warnings.filterwarnings(action='ignore', category=RuntimeWarning)
np.random.seed(1)

###############################
# 4.1 Target class definition #
###############################

# unique states (=> classes for DNN)
phoneHMMs = np.load('data/lab2_models_all.npz', allow_pickle=True)['phoneHMMs'].item()
phones = sorted(phoneHMMs.keys())
nstates = {phone: phoneHMMs[phone]['means'].shape[0] for phone in phones}
stateList = [p + '_' + str(i) for p in phones for i in range(nstates[p])]

In [None]:
#@title Debugging of forced alignment

########################
# 4.2 Forced alignment #
########################

# load correct example
example = np.load('data/lab3_example.npz', allow_pickle=True)['example'].item()

# feature extraction
filename = 'data/tidigits/disc_4.1.1/tidigits/train/man/nw/z43a.wav'
samples, samplingrate = loadAudio(filename)
lmfcc = mfcc(samples)

# transcription
wordTrans = list(path2info(filename)[2])            # word transcription (contained in the filename)
phoneTrans = words2phones(wordTrans, prondict)      # word transcription => phone transcription
stateTrans = [p + '_' + str(i) for p in phoneTrans
              for i in range(nstates[p])]           # phone transcription => state transcription

# combined HMM for utterance
utteranceHMM = concatHMMs(phoneHMMs, phoneTrans)

# Viterbi decoder
obsloglik = log_multivariate_normal_density_diag(lmfcc, utteranceHMM['means'], utteranceHMM['covars'])
viterbiLoglik, viterbiPath = viterbi(obsloglik, np.log(utteranceHMM['startprob']), np.log(utteranceHMM['transmat']))

# time alignment (frame-by-frame state transcription)
viterbiStateTrans = [stateTrans[s] for s in viterbiPath]

# save in standard format (to use it, put it in the same directory of .wav and open .wav with wavesurfer)
frames2trans(viterbiStateTrans, outfilename='z43a.lab')

# check results
plt.figure()
pcolormesh(lmfcc, 'MFCC - computed', ylabel='MFCC')
plt.figure()
pcolormesh(example['lmfcc'], 'MFCC - example', ylabel='MFCC')

plt.figure()
pcolormesh(obsloglik, 'Emission probabilities - computed', ylabel='state')
plt.figure()
pcolormesh(example['obsloglik'], 'Emission probabilities - example', ylabel='state')

plt.figure()
plt.plot(np.arange(len(viterbiPath)) + .5, example['viterbiPath'] + .5, 'k', linewidth=2, label='example')
plt.plot(np.arange(len(viterbiPath)) + .5, viterbiPath + .5, 'r--', linewidth=1, label='computed')
plt.legend()
plt.title('Viterbi path')
plt.show()

print('Word transcription, computed:', wordTrans)
print('Word transcription, example:', example['wordTrans'])
print()

print('Phone transcription, computed:', phoneTrans)
print('Phone transcription, example:', example['phoneTrans'])
print()

print('State transcription, computed:', stateTrans)
print('State transcription, example:', example['stateTrans'])
print()

print('Viterbi log-likelihood, computed:', viterbiLoglik)
print('Viterbi log-likelihood, example:', example['viterbiLoglik'])
print()

print('Forced alignment, computed:', viterbiStateTrans)
print('Forced alignment, example:', example['viterbiStateTrans'])
print()

In [None]:
#@title

##########################
# 4.3 Feature extraction #
##########################

def extract_feature(path, states):
    data = []
    for root, dirs, files in os.walk(path):
        for file in files:
            if file.endswith('.wav'):
                filename = os.path.join(root, file)
                samples, samplingrate = loadAudio(filename)
                print(filename + '... ', end='')

                # feature extraction (=> inputs for DNN)
                lmfcc = mfcc(samples)
                mspec_ = mspec(samples)

                # forced alignment (=> targets for DNN)
                wordTrans = list(path2info(filename)[2])                # word transcription (contained in the filename)
                phoneTrans = words2phones(wordTrans, prondict)          # word transcription => phone transcription
                targets = forcedAlignment(lmfcc, phoneHMMs, phoneTrans)
                targets = np.array([states.index(t) for t in targets])  # save targets as indeces

                data.append({'filename': filename, 'lmfcc': lmfcc, 'mspec': mspec_, 'targets': targets})
                print('done')
    return np.array(data)


# training set
filename = 'data/traindata.npz'
if os.path.isfile(filename):
    print('loading training set... ', end='')
    traindata = np.load(filename, allow_pickle=True)['traindata']
    print('done')
else:
    traindata = extract_feature('data/tidigits/disc_4.1.1/tidigits/train', stateList)
    np.savez(filename, traindata=traindata)

# test set
filename = 'data/testdata.npz'
if os.path.isfile(filename):
    print('loading test set... ', end='')
    testdata = np.load(filename, allow_pickle=True)['testdata']
    print('done')
else:
    testdata = extract_feature('data/tidigits/disc_4.2.1/tidigits/test', stateList)
    np.savez(filename, testdata=testdata)

In [None]:
#@title

import random

####################################
# 4.4 Training and validation sets #
####################################

def split_data_gender(data, percentage):
    n = len(data)
    speakers = list({path2info(u['filename'])[1] for u in data})    # set => no duplicates!
    random.shuffle(speakers)                                        # shuffle to randomly select speakers
    val_data = []

    # add to validation set (all utterances of) speakers until the percentage is reached
    for s in speakers:
        speaker_data = [u for u in data if path2info(u['filename'])[1] == s]
        val_data.extend(speaker_data)

        # check if percentage is reached
        if len(val_data) > percentage * n:
            break

    # add rest to training set
    train_data = [u for u in data if u not in val_data]

    return train_data, val_data


def split_data(data, percentage):
    train_data = []
    val_data = []

    # stratified sampling:
    # 1. split data by gender
    # 2. sample from these partitions preserving the distribution

    # split data by gender
    men_data = [u for u in data if path2info(u['filename'])[0] == 'man']
    women_data = [u for u in data if path2info(u['filename'])[0] == 'woman']

    # add utterances of men
    train_data_, val_data_ = split_data_gender(men_data, percentage)
    train_data.extend(train_data_)
    val_data.extend(val_data_)

    # add utterances of women
    train_data_, val_data_ = split_data_gender(women_data, percentage)
    train_data.extend(train_data_)
    val_data.extend(val_data_)

    return np.array(train_data), np.array(val_data)


print('extracting validation set... ', end='')
traindata, valdata = split_data(traindata, .1)
print('done')

In [None]:
#@title

###########################################
# 4.5 Acoustic context (dynamic features) #
###########################################

def stack_acoustic_context(features):
    length = np.shape(features)[0]
    idx_list = list(range(length))
    idx_list = idx_list[1:4][::-1] + idx_list + idx_list[-4:-1][::-1]
    output = [features[idx_list[i:i + 7]].reshape(-1) for i in range(length)]
    return np.array(output)

In [None]:
#@title

from sklearn.preprocessing import StandardScaler
from tensorflow.keras.utils import to_categorical

###############################
# 4.6 Feature standardisation #
###############################

def prepare_matrices(data, K, feature_type, dynamic_features, scaler=None):
    if feature_type != 'lmfcc' and feature_type != 'mspec':
        raise ValueError('Invalid feature type. Choose among lmfcc and mspec.')

    N = sum([len(u['targets']) for u in data])      # total number of frames
    D = data[0][feature_type].shape[1]
    if dynamic_features:
        D = D*7

    x = np.zeros((N, D), dtype='float32')           # float32 to save memory
    y = np.zeros(N)

    # flatten (i.e. concatenate frames and targets of all utterances)
    i = 0
    for u in data:
        n = len(u['targets'])                                       # number of frames for utterance
        if dynamic_features:
            u_features = stack_acoustic_context(u[feature_type])    # add dynamic features
        else:
            u_features = u[feature_type]

        x[i:i+n] = u_features
        y[i:i+n] = u['targets']
        i += n

    # standardise
    if scaler is not None:
        x = scaler.transform(x)

    # one-hot encoding
    y = to_categorical(y, K)

    return x, y


feature_type = 'lmfcc'      # select here features to use
dynamic_features = True     # select here whether to use dynamic features

K = len(stateList)          # number of classes (don't touch)
print('preparing matrices... ', end='')

# matrices for training set
train_x, train_y = prepare_matrices(traindata, K, feature_type, dynamic_features=dynamic_features)

# standardisation of training set
scaler = StandardScaler()
scaler.fit(train_x)
train_x = scaler.transform(train_x)

# matrices for validation and test sets (already standardised)
val_x, val_y = prepare_matrices(valdata, K, feature_type, dynamic_features=dynamic_features, scaler=scaler)
test_x, test_y = prepare_matrices(testdata, K, feature_type, dynamic_features=dynamic_features, scaler=scaler)
print('done')

In [None]:
#@title Training

from tensorflow.keras.models import Sequential
from tensorflow.keras.layers import Dense, Dropout, Activation, BatchNormalization
from tensorflow.keras.callbacks import ModelCheckpoint
from tensorflow.keras.models import load_model

####################################
# 5. Phoneme recognition with DNNs #
####################################

model_dir = 'models'
model_name = 'dnn_4hl_256u_dlmfcc_relu_bn'
model_filepath = os.path.join(model_dir, model_name + '.h5')

if not os.path.isfile(model_filepath):
    # create directory to save model
    try:
        os.mkdir(model_dir)
        print('Directory', model_dir, 'created')
    except FileExistsError:
        print('Directory', model_dir, 'already existing')

    # architecture
    dnn = Sequential()
    for i in range(4):
        if i == 0:
            dnn.add(Dense(units=256, input_shape=(train_x.shape[1],)))
        else:
            dnn.add(Dense(units=256))
        dnn.add(BatchNormalization())
        dnn.add(Activation(activation='relu'))
    dnn.add(Dense(units=K, activation='softmax'))
    dnn.summary()

    # compile
    dnn.compile(optimizer='adam', loss='categorical_crossentropy', metrics=['accuracy'])

    # train
    checkpoint = ModelCheckpoint(model_filepath, verbose=1, save_best_only=True)
    history = dnn.fit(x=train_x, y=train_y, batch_size=256, epochs=10, validation_data=(val_x, val_y), callbacks=[checkpoint])

    # plot loss
    plt.figure()
    plt.plot(history.history['loss'], label='train')
    plt.plot(history.history['val_loss'], label='val')
    plt.xlabel('epoch')
    plt.ylabel('loss')
    plt.legend()
    plt.title('Loss')
    plt.savefig(os.path.join(model_dir, model_name + '_loss.jpg'))

    # plot accuracy
    plt.figure()
    plt.plot(history.history['accuracy'], label='train')
    plt.plot(history.history['val_accuracy'], label='val')
    plt.xlabel('epoch')
    plt.ylabel('accuracy')
    plt.legend()
    plt.title('Accuracy')
    plt.savefig(os.path.join(model_dir, model_name + '_accuracy.jpg'))

# load (best checkpoint of) model
print('loading best checkpoint... ', end='')
dnn = load_model(model_filepath)
print('done')
dnn.summary()

# evaluate on validation set (for model selection)
metric_values = dnn.evaluate(x=val_x, y=val_y, batch_size=256)
with open(os.path.join(model_dir, 'model_selection.txt'), 'a') as f:
    f.write(model_name + '\n')
    for n, v in zip(dnn.metrics_names, metric_values):
        print('validation {}: {}'.format(n, v))
        f.write('validation {}: {}\n'.format(n, v))
    f.write('\n\n')

In [None]:
#@title Evaluation of best DNN
!pip install edit_distance > /dev/null

from tensorflow.keras.metrics import Accuracy
from edit_distance import SequenceMatcher
from sklearn.metrics import confusion_matrix

###########################
# 5.1 Detailed evaluation #
###########################

def states2phones(y_states, phones, states):
    # work in one-hot encoding
    y_phones = np.zeros((y_states.shape[0], len(phones)))
    y_states = to_categorical(y_states, len(states))

    for i, p in enumerate(phones):
        # indeces of states belonging to phoneme p
        idx_states = [j for j in range(len(states)) if states[j].split(sep='_')[0] == p]

        # merge states (just sum columns in one-hot encoding!)
        y_phones[:, i] = np.sum(y_states[:, idx_states], axis=1)

    # go back to labels
    y_phones = np.argmax(y_phones, axis=1)

    return y_phones


def merge_consequent_states(y):
    cur_y = y[0]
    merged_y = [cur_y]
    for i in y:
        if i == cur_y:
            continue
        else:
            cur_y = i
            merged_y += [cur_y]
    return merged_y


def plot_confusion_matrix(y_true, y_pred):
    cm = confusion_matrix(y_true, y_pred, normalize='true')
    plt.imshow(cm, cmap='Blues')
    plt.xlabel('predictions')
    plt.ylabel('ground truth')


# predict
y_pred = np.argmax(dnn.predict(x=test_x, batch_size=256), axis=1)
y_true = np.argmax(test_y, axis=1)
accuracy = Accuracy()

# frame-by-frame at the state level
accuracy.update_state(y_true, y_pred)
print('Frame-by-frame accuracy at the state level: {:.2f}%'.format(accuracy.result().numpy()*100))
plt.figure()
plot_confusion_matrix(y_true, y_pred)
plt.title('Frame-by-frame confusion matrix at the state level')

# frame-by-frame at the phoneme level
y_pred_phones = states2phones(y_pred, phones, stateList)
y_true_phones = states2phones(y_true, phones, stateList)
accuracy.reset_states()
accuracy.update_state(y_true_phones, y_pred_phones)
print('Frame-by-frame accuracy at the phoneme level: {:.2f}%'.format(accuracy.result().numpy()*100))
plt.figure()
plot_confusion_matrix(y_true_phones, y_pred_phones)
plt.title('Frame-by-frame confusion matrix at the phoneme level')

# PER at the state level
N = 10000     # number of frames to consider (distance computation is expensive)
y_pred_merged = merge_consequent_states(y_pred[:N])
y_true_merged = merge_consequent_states(y_true[:N])
sm = SequenceMatcher(a=y_true_merged, b=y_pred_merged)
edit_distance = sm.distance()
print('PER at the state level: {:.2f}%'.format(edit_distance/N*100))

# PER at the phoneme level
y_pred_merged = merge_consequent_states(y_pred_phones[:N])
y_true_merged = merge_consequent_states(y_true_phones[:N])
sm = SequenceMatcher(a=y_true_merged, b=y_pred_merged)
edit_distance = sm.distance()
print('PER at the phoneme level: {:.2f}%'.format(edit_distance/N*100))

# posteriors for first utterance
utterance = testdata[0]
x, y = prepare_matrices([utterance], K, feature_type, dynamic_features=dynamic_features, scaler=scaler)
y_pred = dnn.predict(x=x, batch_size=256)
wordTrans = path2info(utterance['filename'])[2]
plt.figure()
pcolormesh(y_pred, 'Predicted posteriors (produced by DNN) - words: ' + wordTrans, ylabel='state')
plt.figure()
pcolormesh(y, 'Target posteriors (produced by forced alignment) - words: ' + wordTrans, ylabel='state')
plt.show()
