# A HMM-GMM model (3 senones per HMM AND GMMs are modeled by a single Gaussian Distribution)

# Inspired by lab 2 from DT2119 course from KTH

In [1]:
import math
from math import e
import numpy
from numpy import linalg as LA
from numpy.linalg import inv
import numpy as np
import librosa
import os
import IPython.display as ipd


In [2]:
path = os.getcwd()
print(path)
test_path = os.getcwd() + "/train/animals"
folders = os.listdir(test_path)

recordings = {}

#Generates a dictionary with the mfccs for each recording
for folder in folders:
    filenames = os.listdir(test_path + "/" + folder)
    recordings[folder] = []
    for fn in filenames:
        filepath = test_path + "/" + folder + "/" + fn
        y, sr = librosa.load(filepath)
        wav, _ = librosa.effects.trim(y)
        mfcc = librosa.feature.mfcc(y=wav,sr=sr, n_mfcc=13)
        recordings[folder].append(mfcc.T)

/home/pedro/Desktop/Gaussian_Mixture_Models-EM/Sistema_Basico


# Create HMM class

<p> O processo de classificação de um recording funciona da seguinte maneira: Os HMMs correspondentes a cada palavra do sistema vão ser iterados com a sequência de MFCCs que vai servir de input à função log_multivariate_normal_density. O output desta função vai ser uma matriz com NxM em que N é o número de frames e M o número de estados da HMM. De notar que neste exemplo o GMM só tem uma gaussiana. Caso tivesse mais a única coisa que mudava era que o resultado era uma soma ponderada das gaussianas. </p>



In [3]:

def logsumexp(arr,axis=0):
    '''Computes the sum of arr assuming arr is in the log domain.
    Returns log(sum(exp(arr))) while minimizing the possibility of over/underflow
    '''

    arr = np.rollaxis(arr,axis)
    vmax = arr.max(axis = 0)
    if vmax.ndim > 0:
        vmax[~np.isfinite(vmax)] = 0
    elif not np.isfinite(vmax):
        vmax = 0
    with np.errstate(divide="ignore"):
        out = np.log(np.sum(np.exp(arr-vmax),axis=0))
        out += vmax
        return out


'''Created matrices will have 4 states.
A first,middle, last one and a transition one'''
class WordHMM:
    def __init__(self,word, data, n_states = 3, n_mfccs = 13):
        
        self.word = word
        self.data = data
        self.n_states = n_states
        self.n_mfccs = n_mfccs
        

        #Transition Probabilities
        self.startprob = np.zeros((n_states + 1, 1)) #Existe mais um estado inicial que e o de fim
        self.transmat = np.zeros((n_states + 1, n_states + 1))

        #Emission probabilities - There are 13 emissions  for each state.
        self.means = np.zeros((n_states, n_mfccs))  #Esta bem. A media  do estado extra nao conta
        self.covars=  np.zeros((n_states, n_mfccs)) #Esta bem a media do estado extra nao conta
        self.init_emissions()
        self.init_transitions()
        
    def get_word(self):
        return self.word
    
    def get_transmat(self):
        return self.transmat
    
    def get_emission_means(self):
        return self.means
    
    def get_emission_covars(self):
        return self.emission
    
        
    '''
        This function needs:
        -A matrix with the likelihood of a given frame per second.
        -An array with the initial probabilities.
        -A transition matrix.

    '''
    def forward_algorithm(self,loglh_senone,log_startprob,log_transmat):
        
        alpha = np.zeros(loglh_senone.shape)
        
        #forward initialization
        alpha[0]= log_startprob.T + loglh_senone[0]
        
        for n in range(1,len(alpha)):
            for i in range(alpha.shape[1]):
                alpha[n,i] = logsumexp(alpha[n -1] + log_transmat[:,i]) + loglh_senone[n,i]
                
        return alpha, logsumexp(alpha[len(alpha) -1])
        
        
        
        
        
        
        
        
    '''
    Initializes transition matrix (n_states+1Xn_states+1) with uniform distribution and...
    Initializes start probabilities
    '''
    def init_transitions(self):
        self.startprob[0] = 1
    
        for l in range(0,len(self.transmat) - 1):
            self.transmat[l][l] = 0.5
            self.transmat[l][l+1] = 0.5   
                        

        #Iterativo6
        for x in range(0,45):
            #inicializa matriz de transicao
            new_trans = np.zeros((self.n_states + 1, self.n_states + 1))

            #INSERE ARRAYS VAZIOS PARA AS EMISSOes
            emission_frames = []
            for _ in range(0,self.n_states):
                emission_frames.append([])

            #REALIZA VITERBI TRAINING PARA TODOS OS RECORDINGS     
            for recording in self.data:
                loglh_senone = self.log_multivariate_normal_density(recording.T)
                viterbi_loglik, viterbi_path = self.viterbi(loglh_senone,np.log(self.startprob[:-1]),np.log(self.transmat[:-1,:-1]))

                #Transicao: INCREMENTA MATRIZ DE TRANSICAO
                for x in range(0,len(viterbi_path) -1 ):
                    new_trans[viterbi_path[x],viterbi_path[x+1]] +=1

                #EMISSAO: ADICIONA FRAMES A CADA ESTADO ALIGNEMENT
                for x in range(0,len(viterbi_path)):
                    emission_frames[viterbi_path[x]].append(recording[x])


            #SOMA VALORES POR ROW
            sum_across_rows = [sum(e) for e in new_trans]

            #CONVERTE MATRIZ DE TRANSICAO PARA PROBABILIDADES
            for l in range(0,len(new_trans) -1):
                for c in range(0,len(new_trans)):
                    new_trans[l,c] = new_trans[l,c] / sum_across_rows[l]


            for x in range(0,len(emission_frames)):
                self.means[x] = np.mean(emission_frames[x],axis=0)
                self.covars[x] = np.diag(np.cov(np.array(emission_frames[x]).T))

            self.transmat = new_trans
            #print(new_trans)

        
                
    def init_emissions(self):
        
        #Lista de listas(com numero igual ao estados) a partir da qual vai ser calculada a media e o desvio padrao
        frames_by_state = []
        
        #insert empty arrays
        for _ in range(0,self.n_states):
            frames_by_state.append([])
                    
        #Para cada gravacao
        for recording in self.data:
            separated_frames = np.array_split(np.array(recording),3)
            
            #Para cada lista de frames 
            for x in range(0,len(separated_frames)):                
                for frame in separated_frames[x]:
                    frames_by_state[x].append(frame)
                    
        #Calcula a media e a matriz diagonal de covariancia de cada emissao de cada estado
        for x in range(0,len(frames_by_state)):
            self.means[x] = np.mean(frames_by_state[x],axis=0)
            self.covars[x] = np.diag(np.cov(np.array(frames_by_state[x]).T))
            

        
        
    
    '''
    This function needs:
        -Sequence of MFCC frames.
        -Means matrix with the mean value for each MFCC for each senone.
        -Diagonal Covariance matrix with the covariance value for each MFCC in each senone.
    '''
    def log_multivariate_normal_density(self,frames):
        frames = frames.T
        covars = self.covars
        means = self.means
        n_samples,n_dim = frames.shape
        
        
        return -0.5 * (n_dim * np.log(2 * np.pi) + np.sum(np.log(covars), 1)
                      + np.sum((means ** 2) / covars, 1)
                      - 2 * np.dot(frames, (means / covars).T)
                      + np.dot(frames ** 2, (1.0 / covars).T))
    

    def viterbiBacktrack(self,B, lastIdx):
        """Does backtracking retrieving the viterbi path given the most probable
            previous indices in each timestep.

        Args:
            B: NxM array where N are the timesteps and M are the states and each
                element contains the most probable state in the previous timestep.
            lastIdx: index of the most probable state in timestep N
        Returns:
            A vector of N-1 elements with the viterbi path
        """
        viterbi_path = [lastIdx]
        for i in reversed(range(1, B.shape[0])):
            viterbi_path.append(B[i, viterbi_path[-1]])
        viterbi_path.reverse()
        return np.array(viterbi_path)

    
    def viterbi(self,log_emlik, log_startprob, log_transmat):
        """Viterbi path.

        Args:
            log_emlik: NxM array of emission log likelihoods, N frames, M states
            log_startprob: log probability to start in state i
            log_transmat: transition log probability from state i to j

        Output:
            viterbi_loglik: log likelihood of the best path
            viterbi_path: best path
        """
        B = np.zeros(log_emlik.shape, dtype = int)
        V = np.zeros(log_emlik.shape)
        V[0] = log_startprob.flatten() + log_emlik[0]

        for n in range(1, log_emlik.shape[0]):
            for j in range(log_emlik.shape[1]):
                V[n][j] = np.max(V[n - 1,:] + log_transmat[:,j]) + log_emlik[n, j]
                B[n][j] = np.argmax(V[n - 1,:] + log_transmat[:,j])

        # Backtrack to take viteri path
        viterbi_path = self.viterbiBacktrack(B, np.argmax(V[ log_emlik.shape[0] - 1]))

        #print(viterbi_path)
        return np.max(V[ log_emlik.shape[0] - 1]), viterbi_path


    
    def guess_word(self,mfccs):
        #print("o shape dos mfccs e: " + str(mfccs.shape))
        #Retorna uma matrix com a loglikelihood de cada frame do mfcc pertencer a um senone.
        loglh_senone = self.log_multivariate_normal_density(mfccs)
        
        forward_prob,obs_seq_log_prob = self.forward_algorithm(loglh_senone,np.log(self.startprob[:-1]),np.log(self.transmat[:-1,:-1]))
        
        viterbi_loglik, viterbi_path = self.viterbi(loglh_senone,np.log(self.startprob[:-1]),np.log(self.transmat[:-1,:-1]))
        
        return obs_seq_log_prob
    
    
model = {}
for word in folders:
    model[word] = WordHMM(word, recordings[word])    




# Test: Perform ASR on a test recording:

In [4]:
#Given a file path, extracts the mfccs of that file.
def extract_mfccs(filepath):
    y, sr = librosa.load(filepath)
    wav, _ = librosa.effects.trim(y)
    mfccs = librosa.feature.mfcc(y=wav,sr=sr, n_mfcc=13)
    return mfccs

test_path = os.getcwd() + "/test"

for file in os.listdir(test_path):
    mfccs = extract_mfccs(test_path + "/" + file)
    print("FICHEIRO:" + str(file))
    for word in folders:
        res = model[word].guess_word(mfccs)
        print("estou a iterar com o modelo: " + str(word) + " res: " + str(res))
        print("------------------------------------")

print(folders)

FICHEIRO:cao1.wav
estou a iterar com o modelo: vaca res: -2858.5047184355058
------------------------------------
estou a iterar com o modelo: cão res: -2626.5068309451017
------------------------------------
estou a iterar com o modelo: grilo res: -3223.858245963852
------------------------------------
estou a iterar com o modelo: leao res: -3068.8665028454006
------------------------------------
estou a iterar com o modelo: sapo res: -3242.9699714552853
------------------------------------
estou a iterar com o modelo: gato res: -2728.8015344464366
------------------------------------
FICHEIRO:gato4.wav
estou a iterar com o modelo: vaca res: -3351.0627414131227
------------------------------------
estou a iterar com o modelo: cão res: -3166.7213636095908
------------------------------------




estou a iterar com o modelo: grilo res: -3570.937005674933
------------------------------------
estou a iterar com o modelo: leao res: -3590.651926295017
------------------------------------
estou a iterar com o modelo: sapo res: -3806.1212841408096
------------------------------------
estou a iterar com o modelo: gato res: -3043.615130306499
------------------------------------
['vaca', 'cão', 'grilo', 'leao', 'sapo', 'gato']


In [10]:

from sys import byteorder
from array import array
from struct import pack

import pyaudio
import wave

import asyncio
import time

THRESHOLD = 500
CHUNK_SIZE = 1024
FORMAT = pyaudio.paInt16
RATE = 44100

def is_silent(snd_data):
    "Returns 'True' if below the 'silent' threshold"
    return max(snd_data) < THRESHOLD

def normalize(snd_data):
    "Average the volume out"
    MAXIMUM = 16384
    times = float(MAXIMUM)/max(abs(i) for i in snd_data)

    r = array('h')
    for i in snd_data:
        r.append(int(i*times))
    return r

def trim(snd_data):
    "Trim the blank spots at the start and end"
    def _trim(snd_data):
        snd_started = False
        r = array('h')

        for i in snd_data:
            if not snd_started and abs(i)>THRESHOLD:
                snd_started = True
                r.append(i)

            elif snd_started:
                r.append(i)
        return r

    # Trim to the left
    snd_data = _trim(snd_data)

    # Trim to the right
    snd_data.reverse()
    snd_data = _trim(snd_data)
    snd_data.reverse()
    return snd_data

def add_silence(snd_data, seconds):
    "Add silence to the start and end of 'snd_data' of length 'seconds' (float)"
    silence = [0] * int(seconds * RATE)
    r = array('h', silence)
    r.extend(snd_data)
    r.extend(silence)
    return r

def record():
    """
    Record a word or words from the microphone and 
    return the data as an array of signed shorts.

    Normalizes the audio, trims silence from the 
    start and end, and pads with 0.5 seconds of 
    blank sound to make sure VLC et al can play 
    it without getting chopped off.
    """
    p = pyaudio.PyAudio()
    stream = p.open(format=FORMAT, channels=1, rate=RATE,
        input=True, output=True,
        frames_per_buffer=CHUNK_SIZE)

    num_silent = 0
    snd_started = False

    r = array('h')
    

    while 1:
        # little endian, signed short
        snd_data = array('h', stream.read(CHUNK_SIZE))
        if byteorder == 'big':
            snd_data.byteswap()
        r.extend(snd_data)
        
        silent = is_silent(snd_data)

        if silent and snd_started:
            num_silent += 1
        elif not silent and not snd_started:
            snd_started = True

        if snd_started and num_silent > 30:
            break

    sample_width = p.get_sample_size(FORMAT)
    #print("a sample_width e: " + str(sample_width))
    stream.stop_stream()
    stream.close()
    p.terminate()

    r = normalize(r)
    r = trim(r)
    r = add_silence(r, 0.5)
    return sample_width, r

def record_to_file(path):
    "Records from the microphone and outputs the resulting data to 'path'"
    sample_width, data = record()
    data = pack('<' + ('h'*len(data)), *data)

    wf = wave.open(path, 'wb')
    wf.setnchannels(1)
    wf.setsampwidth(sample_width)
    wf.setframerate(RATE)
    wf.writeframes(data)
    wf.close()
    

print("please speak a word into the microphone")
record_to_file('demo.wav')
#print("done - result written to demo.wav")
mfccs = extract_mfccs(os.getcwd() + "/demo.wav")


maxval = None
maxtag = None


#itera cada modelo
for word in folders:
    res = model[word].guess_word(mfccs)
    print("Iterated word: " + str(word) + " Score: " + str(res))
    if(maxval == None or res > maxval):
        maxval = res
        maxtag = word
        
print("You said: " + str(maxtag))


please speak a word into the microphone
Iterated word: vaca Score: -3000.3099007128303
Iterated word: cão Score: -2972.729534087238
Iterated word: grilo Score: -3012.9355184671826
Iterated word: leao Score: -2894.3246225996363
Iterated word: sapo Score: -3345.537491551016
Iterated word: gato Score: -2929.070126893186
You said: leao


