In [1]:
# Passo 1: Usuário Especifica os seguintes parâmetros: 

# Número de regimes (M)
# Amostra (Y_1_n)
# Modelo observacional
# Ordem da cadeia oculta

# Passo 2: Estimar os parâmetros do modelo (P,Q,PI) via EM ou GD

# Passo 3: Algoritmo de Viterbi para encontar os valores dos regimes

# Passo 4: Passar como output os parâmetros estimados e os regimes encontrados


In [2]:
# A variável observável Y é discreta, assume valores num conjunto enumerável
# Temos N estados possíveis para a variável X oculta
# X é cadeia de markov de ordem 1 (generalizamos para ordem k?)
# Q é um dicionário mapeando a função de probabilidade de Y de acordo com cada estado da variável X

In [2]:
# Função de probabilidade da distribuição de Poisson
def poisson(x,lambda_param):

    import numpy as np
    import math

    return (np.exp(-lambda_param) * (lambda_param ** x))/math.factorial(x)

# Distribuição de Y no regime 1 (Poisson com lambda = 5)
def dist_Y_1(x):

    return poisson(x,lambda_param=5)

# Distribuição de Y no regime 2 (Poisson com lambda = 10)
def dist_Y_2(x):

    return poisson(x,lambda_param=10)

# Dicionário de distribuições de Y
Q = {1:dist_Y_1,
     2:dist_Y_2}

# Distribuição inicial da cadeia latente X
def pi(x):

    if x == 1 or x == 2:
        return 1/2
    
    else:
        return 0
    
# Matriz de transição
import numpy as np
mat_trans = np.array([[0.5,0.5],
                      [0.5,0.5]])  

# Alpha (Forward Variable)

In [3]:
# k
# estado: estado da cdeia de markov oculta (N é o número de estados)
# pi : distribuição inicial da cadeia não observável X
# mat_trans: matriz de transição NxN da cadeia não observável X
# sample: amostra da variável observável Y (ex: [1,5,10,7])

def alpha(k,
          estado,
          Q,
          mat_trans,
          pi,
          amostra):
    
    import numpy as np

    # Obtendo número de estados (dimensão N da matriz de transição)
    N = np.shape(mat_trans)[0]

    # Caso base da recursão (alpha_1(state))
    if k == 1:

        return Q[estado](amostra[1]) * pi(estado)
    
    # Regra geral
    else:
        
        # Inicializando soma
        soma = 0

        # Iterando para cada estado de 1 a N (0 a N-1)
        for estado_iter in np.arange(1,N+1):

            # Definindo forward variable para (n-1) considerando o estado da iteração atual
            alpha_ant = alpha(k=k-1,
                              estado=estado_iter,
                              Q=Q,
                              mat_trans=mat_trans,
                              pi=pi,
                              amostra=amostra)
            
            # Definindo parcela da soma
            parcela = alpha_ant * Q[estado](amostra[k-1]) * mat_trans[estado_iter-1,estado-1]

            # Acrescentando parcela na soma
            soma += parcela

        return soma


# Exemplo de chamada da função alpha

In [4]:
alpha(k=2,
      estado=1,
      Q=Q,
      mat_trans=mat_trans,
      pi=pi,
      amostra=[1,2,3,4,5])

0.0018212319939254747

# Usando forward variable para escrever verossimilhança

In [5]:
def verossimilhança(Q,
                    mat_trans,
                    pi,
                    amostra):
    
    # Obtendo número de estados (dimensão N da matriz de transição)
    N = np.shape(mat_trans)[0]

    # Obtendo tamanho da amostra n
    n = len(amostra)
    
    # Inicializando soma
    soma = 0

    # Iterando para cada estado de 1 até N
    for estado_iter in np.arange(1,N+1):
        
        # Incrementando soma
        soma += alpha(k=n,
                      estado=estado_iter,
                      Q=Q,
                      mat_trans=mat_trans,
                      pi=pi,
                      amostra=amostra)
                      
    return soma
    

# Exemplo de chamada da verossimilhança

In [6]:
verossimilhança(Q=Q,
                mat_trans=mat_trans,
                pi=pi,
                amostra=[1,2,3,4,5])

1.4340532207622332e-06

# Delta

In [7]:
def delta(t,
          estado,
          Q,
          mat_trans,
          pi,
          amostra):
    
    import numpy as np
    import pandas as pd
    
    # Obtendo número de estados (dimensão N da matriz de transição)
    N = np.shape(mat_trans)[0]
    
    # Caso base da recursão (t = 1)
    if t == 1:
        
        return pi(estado) * Q[estado](amostra[t-1])
    
    # Caso geral (t > 1)
    else:
        
        # Computando delta_(t-1) para todos os estados i de 1 a M
        deltas_ant = pd.Series({i:delta(t-1,
                                        estado=i,
                                        Q=Q,
                                        mat_trans=mat_trans,
                                        pi=pi,
                                        amostra=amostra)
                                        
                                for i in np.arange(1,N+1)})
        
        # Computando probabilidades de transição partindo do estado i
        probs_i = pd.Series(mat_trans[:,(estado-1)])
        probs_i.index = probs_i.index + 1

        # Produto entre probabilidades de transição e delta para cada estado i
        delta_x_probs = deltas_ant * probs_i
        
        # Computando valor máximo do produto definindo anteriormente
        max_delta_x_probs = delta_x_probs.idxmax()

        return max_delta_x_probs * Q[estado](amostra[t-1])

# Phi

In [9]:
def phi(t,
        estado,
        Q,
        mat_trans,
        pi,
        amostra):
    
    import numpy as np
    import pandas as pd
    
    # Obtendo número de estados (dimensão N da matriz de transição)
    N = np.shape(mat_trans)[0]
    
    # Deltas 
    deltas_ant = pd.Series({i:delta(t-1,
                                    estado=i,
                                    Q=Q,
                                    mat_trans=mat_trans,
                                    pi=pi,
                                    amostra=amostra)
                        
                            for i in np.arange(1,N+1)})
    
    # Computando probabilidades de transição partindo do estado i para i = 1,...,M (coluna estado)
    probs_i = pd.Series(mat_trans[:,(estado-1)])
    probs_i.index = probs_i.index + 1

    # Produto entre probabilidades de transição e delta para cada estado i
    delta_x_probs = deltas_ant * probs_i

    # Obtendo estado que maximiza produto anterior
    return delta_x_probs.idxmax()

# Algoritmo de Viterbi

In [10]:
def viterbi(Q,
            mat_trans,
            pi,
            amostra):
    
    import numpy as np
    import pandas as pd

    # Tamanho da amostra (n)
    n = len(amostra)

    # Obtendo número de estados (dimensão N da matriz de transição)
    N = np.shape(mat_trans)[0]

    # Inicializando dicionário para armazenar estados inferidos
    x_hat = {}

    # Inicializando algoritmo com estado inferido no instante n (última observação da amostra)
    x_hat[n] = pd.Series({estado:delta(t=n,
                                       estado=estado,
                                       Q=Q,
                                       mat_trans=mat_trans,
                                       pi=pi,
                                       amostra=amostra,
                                    ) 

                        for estado in np.arange(1,N+1)}).idxmax()

    # Iterando para t de n-1 a 1 (do "final para o início" da amostra)
    for t in np.arange(n-1,0,-1):
        
        # Obtendo estado inferido no instante t
        x_hat[t] = phi(t+1,
                       estado=x_hat[t+1],
                       Q=Q,
                       mat_trans=mat_trans,
                       pi=pi,
                       amostra=amostra) 
        
    # Criando série com estados inferidos
    x_hat = pd.Series(x_hat).sort_index()
    
    return x_hat
    

# Testando Viterbi

In [41]:
amostra = [5] * 5 + [10] * 10

viterbi(Q=Q,
        mat_trans=mat_trans,
        pi=pi,
        amostra=amostra)

1     1
2     1
3     1
4     1
5     1
6     2
7     2
8     2
9     2
10    2
11    2
12    2
13    2
14    2
15    2
dtype: int64

# Beta (Backward Variable)

In [12]:
# k 
# estado: estado da cadeia de markov oculta (N é o número de estados)
# pi : distribuição inicial da cadeia não observável X
# mat_trans: matriz de transição NxN da cadeia não observável X
# sample: amostra da variável observável Y (ex: [1,5,10,7])

def beta(k,
         estado,
         Q,
         mat_trans,
         pi,
         amostra):
    
    # Obtendo número de estados (dimensão N da matriz de transição)
    N = np.shape(mat_trans)[0]

    # Caso base da recursão (beta_k(estado))
    if k == len(amostra):
        return 1
    
    # Caso geral
    else:

        # Inicializa soma
        soma = 0

        # Iterando sobre os estados (1 a N)
        for estado_iter in np.arange(1,N+1):

            # Definindo parcela atual da soma
            parcela = mat_trans[(estado-1),(estado_iter-1)] * Q[estado_iter](amostra[k]) * beta(k=k+1,
                                                                                                estado=estado_iter,
                                                                                                Q=Q,
                                                                                                mat_trans=mat_trans,
                                                                                                pi=pi,
                                                                                                amostra=amostra)
            
            # Incrementando soma
            soma += parcela
        
        return soma

# Exemplo de chamada da função beta

In [13]:
beta(k=2,
     estado=1,
     Q=Q,
     mat_trans=mat_trans,
     pi=pi,
     amostra=[1,2,3,4,5])

0.0007667432172178277

# Função gamma

In [14]:
def gamma(k,
          estado,
          Q,
          mat_trans,
          pi,
          amostra):
    
    import numpy as np
    import pandas as pd

    # Obtendo número de estados (dimensão N da matriz de transição)
    N = np.shape(mat_trans)[0]
    
    # Obtendo valores das alpha e beta variables
    alphas = pd.Series({estado_iter:alpha(k=k,
                                          estado=estado_iter,
                                          Q=Q,
                                          mat_trans=mat_trans,
                                          pi=pi,
                                          amostra=amostra) for estado_iter in np.arange(1,N+1)})
      
    betas = pd.Series({estado_iter:beta(k=k,
                                        estado=estado_iter,
                                        Q=Q,
                                        mat_trans=mat_trans,
                                        pi=pi,
                                        amostra=amostra) for estado_iter in np.arange(1,N+1)})
      
    # Numerador
    num = alphas[estado] * betas[estado]

    # Denominador
    den = (alphas * betas).sum()

    return num/den

# Inferência marginal por estado

In [31]:
def inf_states_marginal(Q=Q,
                        mat_trans=mat_trans,
                        pi=pi,
                        amostra=amostra):
    
    import numpy as np
    import pandas as pd
    
    # Obtendo número de estados (dimensão N da matriz de transição)
    N = np.shape(mat_trans)[0]

    # Tamanho da amostra (n)
    n = len(amostra)

    # Dicionário com estados inferidos
    dic_states = {ind:pd.Series({estado:gamma(k=ind,
                                              estado=estado,
                                              Q=Q,
                                              mat_trans=mat_trans,
                                              pi=pi,
                                              amostra=amostra)
                                
                                # Iterando por estado (1 a N)
                                for estado in np.arange(1,N+1)}).idxmax()

                # Iterando para cada valor na amostra (1 a n)              
                for ind in np.arange(1,n+1)}

    # Série com estados inferidos
    inf_states = pd.Series(dic_states)

    return inf_states

# Testando inferência marginal

In [40]:
amostra = [5] * 5 + [10] * 5

inf_states_marginal(Q=Q,
                    mat_trans=mat_trans,
                    pi=pi,
                    amostra=amostra)

1     1
2     1
3     1
4     1
5     1
6     2
7     2
8     2
9     2
10    2
dtype: int64