In [1]:
import pixiedust
import sys, os
from math import log, pow, e
from IPython.display import HTML, display
import tabulate
from collections import defaultdict


Pixiedust database opened successfully


In [2]:
# Hidden Markov Model 
class HMM(object):
    """
    start_prob: 시작확률 p(?|start)
    trans_prob: 전이확률 p(t2|t1), etc...
    emit_prob: 방출확률 p(o|t1), etc...
    """
    def __init__(self, start_prob=None, trans_prob=None, emit_prob=None):
        self.start_prob = start_prob
        self.trans_prob = trans_prob
        self.emit_prob = emit_prob
        
        
    """
    특정 상태집합으로 부터 전이 가능한 상태집합 구하는 함수
    from_states: 특정 상태집합
    return: 입력상태집합으로 부터 전이 가능한 상태집합
    """
    def _get_to_states(self, from_states):
        to_states = set([])
        for state in from_states:
            to_states = to_states.union(list(self.trans_prob[state].keys()))
        return to_states
    
    
    """
    Forward algorithm으로 Likelihood 계산
    return: Probability table
    """
    def forward(self, seq):
        seq_len = len(seq)
        a = [{}]
        
        # 시작확률로 부터 테이블 값 계산
        for state in self.start_prob:
            a[0][state] = self.start_prob[state] * self.emit_prob[state][seq[0]]
        
        # 나머지 시퀀스에 대해 확률 계산
        to_states = self._get_to_states(list(self.start_prob.keys()))
        for i in range(1, seq_len):
            a.append({})
            for to_state in to_states:
                prob = 0
                # 전이확률 * 방출확률 계산하여 테이블 저장
                for from_state in list(a[i - 1].keys()):
                    prob += a[i - 1][from_state] * self.trans_prob[from_state][to_state]
                a[i][to_state] = prob * self.emit_prob[to_state][seq[i]]
            # _get_to_states통해 매 시퀀스마다 다음 시퀀스에 대해 전이가능한 상태집합을 계산
            if i != seq_len - 1:
                to_states = self._get_to_states(to_states)
        return a

    
    """
    Viterbi algorithm으로 observation sequence에 대한 most likely state sequence 계산
    seq: observation sequence array
    return: most likely state sequence
    """
    def viterbi(self, seq):
        seq_len = len(seq)
        v = {}
        max_paths = []
        # 확률 곱은 모두 log를 통해 정수 합 계산으로 변형
        for to_state in self.start_prob:
            # 시작확률 또는 방출확률이 없는 경우 lim{x->+0}(log(x))값을 -sys.maxsize로 계산
            if self.start_prob[to_state] == 0 or self.emit_prob[to_state][seq[0]] == 0:
                v[to_state] = -sys.maxsize
            else:
                v[to_state] = log(self.start_prob[to_state]) + log(self.emit_prob[to_state][seq[0]])
        
        # Viterbi value 계산
        for i in range(1, seq_len):
            cur_path = {}
            temp_v = {}
            to_states = self._get_to_states(list(v.keys()))
            for to_state in to_states:
                max_prob = -sys.maxsize
                max_state = None
                for from_state in list(v.keys()):
                    prob = v[from_state] + log(self.trans_prob[from_state][to_state])
                    if max_prob < prob:
                        max_prob = prob
                        max_state = from_state
                # 방출확률이 없는 경우 최저확률로 계산
                if self.emit_prob[to_state][seq[i]] == 0:
                    temp_v[to_state] = max_prob - sys.maxsize
                else:    
                    temp_v[to_state] = max_prob + log(self.emit_prob[to_state][seq[i]])
                cur_path[to_state] = max_state
            v = temp_v
            max_paths.append(cur_path)
        
        max_state = None
        max_prob = -sys.maxsize
        
        # 마지막 상태부터 backtrace
        for last_state in list(v.keys()):
            if pow(e, v[last_state]) > max_prob:
                max_prob = pow(e, v[last_state])
                max_state = last_state
                
        # Viterbi path가 없는 경우
        if max_state is None:
            # print("viterbi path is none")
            return []
        
        viterbi_path = [max_state]
        for i in range(seq_len - 1, 0, -1):
            max_state = max_paths[i - 1][max_state]
            viterbi_path.insert(0, max_state)
            
        return viterbi_path

In [3]:
"""
Construct Bigram table by corpus
""" 
# 품사 Count
pos_count = defaultdict(lambda:0, {})
# 형태소/품사 Count
morph_pos_count = defaultdict(lambda:0, {})
# 품사접합 Count (t1|t2)
bigram_pos_count = defaultdict(lambda:0, {})
# 문장 개수
sentence_count = 0

# corpus로부터 Count table 계산
o = open('train0.txt', encoding='euc-kr', newline='\r\n')
last_pos = None
for line in o.readlines():
    line = line.strip()
    if len(line) == 0:
        last_pos = None
        sentence_count += 1
        continue
    (word, morphs) = line.split('\t')
    for morph_pos in morphs.split('+'):
        split_index = morph_pos.rfind('/')
        (morph, pos) = (morph_pos[:split_index], morph_pos[split_index + 1:])
        pos_count[pos] += 1
        morph_pos_count[morph_pos] += 1
        if last_pos is not None:
            bigram_pos_count[f'{pos}|{last_pos}'] += 1
        else:
            bigram_pos_count[f'{pos}|$'] += 1
        last_pos = pos
o.close()


In [6]:
"""
테스트할 문장을 HMM의 Input에 맞게 Parsing
"""

# 문장별 형태소 분석 후보 로드
o = open('result.txt', 'r+', encoding='euc-kr', newline='\r\n')

# 문장 배열
sentences = []
# 한 어절에 대한 형태소분석 후보 배열
word_morph_cases = []
# 어절 배열
sentence_word_morph_cases = []
# Parsing 상태 값 (1: 새 어절, 2: 형태소후보)
state = 1

for line in o.readlines():
    line = line.strip()
    # 새 문장 시작
    if len(line) == 0: 
        state = 1
        if len(word_morph_cases) > 0:
            sentence_word_morph_cases.append(word_morph_cases)
            word_morph_cases = []
        if len(sentence_word_morph_cases) > 0:
            sentences.append(sentence_word_morph_cases)
            sentence_word_morph_cases = []
        continue

    # 형태소 후보 행
    if state == 2: 
        # 공백이 없으면 어절시작
        if line.find(' ') == -1:
            state = 1
        else:
            morphs = line.split(' ')[1].split('+')
            word_morph_cases.append(tuple([tuple(morph.split('/')) for morph in morphs]))

    # 새 어절 시작
    if state == 1 and len(line) > 0:
        state = 2
        # 앞의 어절에 대한 형태소 후보 끝
        if len(word_morph_cases) > 0:
            sentence_word_morph_cases.append(word_morph_cases)
            word_morph_cases = []

if len(word_morph_cases) > 0:
    sentence_word_morph_cases.append(word_morph_cases)
if len(sentence_word_morph_cases) > 0:
    sentences.append(sentence_word_morph_cases)
o.close()

In [10]:
#%%pixie_debugger
"""
어절의 방출확률 계산

word_morph_case: 어절의 형태소 정보. 다음과 같은 튜플 형태
ex) 오/VX+았/EP+니/EF+?/SF == (('오','VX'),('았','EP'),('니','EF'),('?','SF'))
일 때 방출확률은 p(오|VX)*p(EP|VX)*p(았|EP)*p(EF|EP)*p(니|EF)*p(SF|EF)*p(?|SF)

return: 어절의 방출확률 (0~1)
"""
def get_emit_prob(word_morph_case):
    # 확률은 log 합으로 계산하기에 초기 값은 0
    prob = 0
    for i in range(1, len(word_morph_case)):
        from_pos = word_morph_case[i - 1][1]
        to_pos = word_morph_case[i][1]
        bigram_key = f'{to_pos}|{from_pos}'
        if bigram_pos_count[bigram_key] == 0:
            return 0
        prob += log(bigram_pos_count[bigram_key] / pos_count[from_pos])
    for i in range(0, len(word_morph_case)):
        morph_pos_key = word_morph_case[i][0] + '/' + word_morph_case[i][1]
        if morph_pos_count[morph_pos_key] == 0:
            return 0
        prob += log(morph_pos_count[morph_pos_key] / pos_count[word_morph_case[i][1]])
    return pow(e, prob)

o = open('output.txt', mode='w', encoding='euc-kr')
# 문장별로 HMM 모델을 통해 가장 적합한 형태소 조합 결정
# 단 어절자체가 상태 및 방출값이 되므로 별도의 방출값 집합은 갖지 않는 것으로 취급하여 계산한다.
# 여기서는 모든 어절 상태에 관한 기본 방출가능 값은 0으로 (ex. emit_props[어절][0] = prob) 취급
for sentence in sentences:
    # HMM모델 Input 확률값들 계산
    start_probs = {}
    # map의 not exists key에 대한 default value를 empty map으로 하기위해 defaultdict 사용.
    trans_probs =  defaultdict(lambda:{}, {})
    emit_props = defaultdict(lambda:{}, {})
    
    # 첫번째 전이에 해당하는 시작확률 및 방출확률 계산
    for word_morph_case in sentence[0]:
        # P(pos|$)
        prob = bigram_pos_count[word_morph_case[0][1] + '|$'] / sentence_count
        start_probs[word_morph_case] = prob
        emit_props[word_morph_case][0] = get_emit_prob(word_morph_case)
        
    # display(HTML(tabulate.tabulate([start_probs], tablefmt='html', headers='keys')))
    # 두번째부터 나머지 어절에 관한 전이 및 방출확률 계산
    seq = []
    for i in range(1, len(sentence)):
        sentence_prev_word_morph_cases = sentence[i - 1]
        sentence_word_morph_cases = sentence[i]
        for prev_word_morph_case in sentence_prev_word_morph_cases:
            for word_morph_case in sentence_word_morph_cases:
                from_morph = prev_word_morph_case[-1][1]
                to_morph = word_morph_case[0][1]
                trans_key = f'{to_morph}|{from_morph}'
                prob = bigram_pos_count[trans_key] / pos_count[from_morph]
                trans_probs[prev_word_morph_case][word_morph_case] = prob
                emit_props[word_morph_case][0] = get_emit_prob(word_morph_case)
                
    # print(start_probs)
    # print(trans_probs)
    # print(emit_props)
    
    # HMM 모델에 확률 입력
    model = HMM(start_probs, trans_probs, emit_props)
    
    # Viterbi algorithm로 most likely state sequence 출력
    # 모든 어절의 방출가능 값을 0하나로 삼고 확률을 계산했으므로 Observation 시퀀스는 0으로 전달.
    state_seq = model.viterbi([0 for i in range(0, len(sentence))])
    print(state_seq)
    
    
    tagged_sentence = ''
    for state in state_seq:
        word = []
        for morph_pos in state:
            word.append(f'{morph_pos[0]}/{morph_pos[1]}')
        tagged_sentence += '+'.join(word) + ' '
    o.write(tagged_sentence + os.linesep)
o.close()

[(('우리', 'NP'), ('집', 'NNG'), ('에', 'JKB')), (('왜', 'MAG'),), (('오', 'VV'), ('았', 'EP'), ('니', 'EF'), ('?', 'SF'))]
[(('너', 'NP'), ('를', 'JKO')), (('사랑', 'NNG'), ('하', 'VV'), ('어', 'EF'), ('!', 'SF'))]
[(('안녕', 'NNG'), ('하', 'XSV'), ('세', 'EC'), ('요', 'JX'))]


In [19]:
"""
Simple example. 
날씨에 따라 먹는 아이스크림 개수 예제
"""

start_prob = {
    'hot' : 0.8,
    'cold' : 0.2
}

trans_prob = {
    'hot': { 'hot' : 0.6, 'cold' : 0.4 },
    'cold': { 'hot' : 0.4, 'cold' : 0.6 }
}

emit_prob = {
    'hot': { '1' : 0.2, '2' : 0.4, '3' : 0.4 },
    'cold': { '1' : 0.5, '2' : 0.4, '3' : 0.1 }
}

model = HMM(start_prob, trans_prob, emit_prob)
sequence = ['3', '3', '1', '2', '2', '1','3','2','2','3']

# Likelihood 계산결과 observation 순서 별 table 값
display(HTML(tabulate.tabulate(model.forward(sequence), tablefmt='html', headers='keys', showindex=True)))
#print(model.forward(sequence))

# 최적상태열 추정결과
print(model.viterbi(sequence)) 

Unnamed: 0,hot,cold
0,0.32,0.02
1,0.08,0.014
2,0.01072,0.0202
3,0.0058048,0.0065632
4,0.00244326,0.00250394
5,0.000493507,0.00123983
6,0.000316815,9.41303e-05
7,9.10964e-05,7.32817e-05
8,3.35882e-05,3.2163e-05
9,1.32073e-05,3.27331e-06


['hot', 'hot', 'cold', 'cold', 'cold', 'cold', 'hot', 'hot', 'hot', 'hot']
