In [2]:
import collections

# data initialize
transition_dict = collections.defaultdict(dict)
transition_key = ['start','hot','cold','end']
transition_arr = [[0.0, 0.8, 0.2, 0.0],[0.0, 0.6, 0.3, 0.1],[0.0, 0.4, 0.5, 0.1],[0.0, 0.0, 0.0, 0.0]]
for i in range(len(transition_key)):
    for j in range(len(transition_key)):
        transition_dict[transition_key[j]][transition_key[i]]=transition_arr[i][j]

emission_dict = collections.defaultdict(dict)
emission_key = ['hot','cold']
emission_arr = [[0.0,0.0],[0.2,0.5],[0.4,0.4],[0.4,0.1]]

observation = [3,1,3]
for i in range(len(emission_arr)):
    for j in range(len(emission_key)):
        emission_dict[i][emission_key[j]]=emission_arr[i][j]

In [3]:
print(transition_dict['hot']['start'],emission_dict[3]['hot'],transition_dict['hot']['hot'],emission_dict[1]['hot'],transition_dict['cold']['hot'],emission_dict[3]['cold'])

0.8 0.4 0.6 0.2 0.3 0.1


In [4]:
def forward(start_keyword:str, transition_dict:collections.defaultdict(dict), emission_key:list, observation_list:list) -> (collections.defaultdict(dict), float):
    # dp[상태][관측값(T)]
    dp = collections.defaultdict(dict)
    print('Forward Algo')
    
    # start로부터 첫번째 dp를 결정짓기 위함 (initialize)
    for i in emission_key:
        dp[i][0] = transition_dict[i][start_keyword]*emission_dict[observation_list[0]][i]


    for observ in range(1,len(observation_list)):
        for to_emi in emission_key:
            dp[to_emi][observ]=0
            for from_emi in emission_key:
                # print(from_emi,to_emi)
                print('전방('+from_emi+'|'+str(observ-1)+') * '+'전이('+to_emi+'|'+from_emi+') * '+'관측('+str(observation_list[observ])+'|'+from_emi+') '+ 'Target : '+'후방('+to_emi+'|'+str(observ)+')')
                dp[to_emi][observ] = (dp[from_emi][observ-1]*transition_dict[to_emi][from_emi]*emission_dict[observation_list[observ]][to_emi]) + dp[to_emi][observ]

    forward_prob = 0
    for emi in emission_key:
        forward_prob = dp[emi][len(observation_list)-1] + forward_prob
    return dp, forward_prob

In [5]:
def backward(start_keyword:str, transition_dict:collections.defaultdict(dict), emission_key:list, observation_list:list) -> (collections.defaultdict(dict), float):
    # dp[상태][관측값(T)]
    dp = collections.defaultdict(dict)
    print('Backward Algo')
    for observ in range(len(observation_list)-1,-1,-1):
        for to_emi in emission_key:
            # END로는 무조건 끝이라는 1가지 경우의 수밖에 없으니, END로의 전이는 무조건 확률 1
            if observ == len(observation_list)-1:
                dp[to_emi][observ] = 1
                continue
            dp[to_emi][observ]=0
            for from_emi in emission_key:
                # print(from_emi,to_emi)
                print('후방('+from_emi+'|'+str(observ+1)+') * '+'전이('+to_emi+'|'+from_emi+') * '+'관측('+str(observation_list[observ+1])+'|'+from_emi+') '+ 'Target : '+'후방('+to_emi+'|'+str(observ)+')')
                dp[to_emi][observ] = (dp[from_emi][observ+1]*transition_dict[from_emi][to_emi]*emission_dict[observation_list[observ+1]][from_emi]) + dp[to_emi][observ]
    
    backward_prob = 0
    for emi in emission_key:
        backward_prob = dp[emi][0]*transition_dict[emi][start_keyword]*emission_dict[observation[0]][emi] + backward_prob

    return dp, backward_prob

In [6]:
import collections
from typing import Tuple

def decoding(start_keyword:str, transition_dict:collections.defaultdict(dict), emission_key:list, observation_list:list) -> Tuple[list, list]:
    # dp[상태][관측값(T)]
    dp = list()
    print('Viterbi Algo')
    
    # start로부터 첫번째 dp를 결정짓기 위함 (initialize)
    tmp = list()
    for i in emission_key:
        dp.append([transition_dict[i][start_keyword]*emission_dict[observation_list[0]][i]])

    for observ in range(1,len(observation_list)):
        for to_emi in emission_key:
            tmp = list()
            for from_emi in emission_key:
                tmp.append(dp[emission_key.index(from_emi)][observ-1]*transition_dict[to_emi][from_emi]*emission_dict[observation_list[observ]][to_emi])
            dp[emission_key.index(to_emi)].append(max(tmp))

    hidden_states_list = list()    
    for i in zip(*dp):
        hidden_states_list.append((emission_key[i.index(max(i))],max(i)))
    return dp, hidden_states_list

In [7]:
result_for, forward_prob = forward('start',transition_dict,emission_key,observation)
result_back, backward_prob = backward('start',transition_dict,emission_key,observation)
viterbi, hidden_seq = decoding('start',transition_dict,emission_key,observation)

Forward Algo
전방(hot|0) * 전이(hot|hot) * 관측(1|hot) Target : 후방(hot|1)
전방(cold|0) * 전이(hot|cold) * 관측(1|cold) Target : 후방(hot|1)
전방(hot|0) * 전이(cold|hot) * 관측(1|hot) Target : 후방(cold|1)
전방(cold|0) * 전이(cold|cold) * 관측(1|cold) Target : 후방(cold|1)
전방(hot|1) * 전이(hot|hot) * 관측(3|hot) Target : 후방(hot|2)
전방(cold|1) * 전이(hot|cold) * 관측(3|cold) Target : 후방(hot|2)
전방(hot|1) * 전이(cold|hot) * 관측(3|hot) Target : 후방(cold|2)
전방(cold|1) * 전이(cold|cold) * 관측(3|cold) Target : 후방(cold|2)
Backward Algo
후방(hot|2) * 전이(hot|hot) * 관측(3|hot) Target : 후방(hot|1)
후방(cold|2) * 전이(hot|cold) * 관측(3|cold) Target : 후방(hot|1)
후방(hot|2) * 전이(cold|hot) * 관측(3|hot) Target : 후방(cold|1)
후방(cold|2) * 전이(cold|cold) * 관측(3|cold) Target : 후방(cold|1)
후방(hot|1) * 전이(hot|hot) * 관측(1|hot) Target : 후방(hot|0)
후방(cold|1) * 전이(hot|cold) * 관측(1|cold) Target : 후방(hot|0)
후방(hot|1) * 전이(cold|hot) * 관측(1|hot) Target : 후방(cold|0)
후방(cold|1) * 전이(cold|cold) * 관측(1|cold) Target : 후방(cold|0)
Viterbi Algo


In [8]:
print('3,1,3 개의 순서로 아이스크림을 먹을 확률(forward) :',forward_prob)
print('3,1,3 개의 순서로 아이스크림을 먹을 확률(backward) :',backward_prob)
print('3,1,3 개의 순서로 아이스크림을 먹을때 hidden states SEQ (viterbi decoding) :',hidden_seq)

3,1,3 개의 순서로 아이스크림을 먹을 확률(forward) : 0.021930000000000005
3,1,3 개의 순서로 아이스크림을 먹을 확률(backward) : 0.02193000000000001
3,1,3 개의 순서로 아이스크림을 먹을때 hidden states SEQ (viterbi decoding) : [('hot', 0.32000000000000006), ('cold', 0.04800000000000001), ('hot', 0.009216000000000002)]
