### HMM과 품사 태깅

간단한 품사 태깅 모형을 만들어서 원리를 알아본다.

In [None]:
# 다음 라이브러리를 설치한다.
# !pip install pomegranate

In [None]:
# 라이브러리를 불러온다.
import numpy as np
import pandas as pd
import nltk
from pomegranate import State, HiddenMarkovModel, DiscreteDistribution

In [None]:
# 다음 데이터를 내려 받는다.
# nltk.download('averaged_perceptron_tagger')

In [None]:
pd.set_option("precision",2)

#### 1. 자연어 데이터: 

In [None]:
# 태깅된 학습 데이터.
# 태그는 단 3가지만 사용한다: N = 명사, M = 조동사, V = 동사.
my_tagged_sentences =[
   [ ("John","N"), ("Paul", "N"), ("can", "M"), ("see","V"), ("Sarah", "N")],
   [ ("Sarah", "N"), ("will", "M"), ("meet", "V"), ("Mary", "N")],
   [ ("Will", "N"), ("can", "M"), ("meet", "V"), ("Mary","N")],
   [ ("Mary", "N"), ("can", "M"), ("see", "V"), ("John", "N")]    
]

In [None]:
# 문장들을 출력해 본다.
for a_line in my_tagged_sentences:
    text = ""
    for a_word, a_tag in a_line:        
        text += f"{a_word}({a_tag}) "   
    print(text)

In [None]:
# 정규화 전처리.
temp = []
for a_line in my_tagged_sentences:
    temp.append( [ (a_word.lower(), a_tag) for a_word, a_tag in a_line] )
my_tagged_sentences = temp                             # 대체.
my_tagged_sentences

In [None]:
# Sentence와 Tag분리.
my_sentences = []
my_taggs = []
for a_line in my_tagged_sentences:
    sentences = []
    taggs = []
    for a_word, a_tag in a_line:
        sentences.append(a_word)
        taggs.append(a_tag)
    my_sentences.append(sentences)
    my_taggs.append(taggs)

In [None]:
# 출력해 본다.
my_sentences

In [None]:
# 출력해 본다.
my_taggs

#### 2. 모델 파라미터 산정:

2.1. 상태의 초기 확률 (Initial Probabilities):

In [None]:
# 태그의 도수 집계.
my_dict = {}
for a_line in my_taggs:
    for a_tag in a_line:
        if a_tag not in my_dict:
            my_dict[a_tag] = 1
        else:
            my_dict[a_tag] += 1
state_freqs = pd.Series(my_dict)
state_probs = state_freqs/state_freqs.sum()
state_probs

2.2. 전이 확률 (Transition Probabilities):

In [None]:
# Bigram을 바탕으로 집계한다.
my_dict = {}
for a_line in my_taggs:
    for i in range(len(a_line)-1):
        tag_1 = a_line[i]                         # Bigram의 앞에 있는 tag (tag_1).
        tag_2 = a_line[i+1]                       # Bigram의 뒤에 있는 tag (tag_2).
        if tag_1 not in my_dict.keys():
            my_dict[tag_1] =  {tag_2: 1}         # word_1은 새로운 key. 딕셔너리에 처음 입력.
        else:
            if tag_2 not in my_dict[tag_1]:
                my_dict[tag_1][tag_2] = 1
            else:
                my_dict[tag_1][tag_2] += 1
my_dict

In [None]:
# 전이 횟수 계산.
transition_freqs = pd.DataFrame(my_dict).fillna(value=0).astype(int)
transition_freqs

In [None]:
# 전이 확률 계산.
transition_prob = transition_freqs/transition_freqs.sum(axis=0).values.reshape(1,-1)
transition_prob

2.3. 출력 확률 (Emission Probabilities):

In [None]:
my_dict = {}
for a_line in my_tagged_sentences:
    for a_word, a_tag in a_line:
        if a_word in my_dict:                 # 단어가 이미 딕셔너리의 키로 들어가 있는 경우.
            if a_tag in my_dict[a_word]:      # 태그도 이미 서브 딕셔너리의 기록되어 있는 경우.
                my_dict[a_word][a_tag] +=1    
            else:
                my_dict[a_word][a_tag] = 1
        else:
            my_dict[a_word] = {a_tag: 1}

In [None]:
# 출력 횟수 계산.
emission_freqs = pd.DataFrame(my_dict).fillna(value=0).T
emission_freqs

In [None]:
# 출력 확률 계산.
emission_prob = (emission_freqs/emission_freqs.sum(axis=0).values.reshape(1,-11))
emission_prob

#### 3. 모델 생성:

In [None]:
# 모델 생성.
my_hmm = HiddenMarkovModel(name="my_pos_tagger")

In [None]:
# 출력확률 P(O | S) 설정.
my_states = {}
for a_col in emission_prob.columns:         # 컬럼 이름 = 상태 S.
    emissions = DiscreteDistribution(dict(emission_prob[a_col]))  # <= P(O|S) 를 딕셔너리 행태로 가져와서 사용한다!
    state = State(emissions, name=a_col)    # 상태 생성.
    my_states[a_col] = state
    my_hmm.add_states(state)                # 모델에 상태 추가.

In [None]:
# 초기 확률 설정.
for a_tag_name, a_state in zip(my_states.keys(), my_states.values()):         
    my_hmm.add_transition(my_hmm.start, a_state, state_probs[a_tag_name])

In [None]:
# 전이 확률 P(S'|S) 설정.
for a_col in transition_prob.columns:
    for a_row in transition_prob.index:
        a_prob = transition_prob.loc[a_row,a_col]
        my_hmm.add_transition(my_states[a_col], my_states[a_row], a_prob) 

In [None]:
# 모델 최종 완성.
my_hmm.bake()

In [None]:
# 모델의 구조 출력.
print("Edge counts: ",my_hmm.edge_count()) 
print("Node counts: ",my_hmm.node_count()) 

#### 4. 디코딩 (Inference):

In [None]:
# 다음과 같은 문자열이 관찰되었다고 전제한다.
# 학습 데이터에 출현하는 단어들만을 사용해서 문장을 만들어 보았다.
my_sentence = "John Paul will meet Mary"
#my_sentence = "Will will meet John Paul"
#my_sentence = "Will will see Paul"

In [None]:
# 데이터 전처리.
my_observations = [ a_word.lower() for a_word in my_sentence.split() ]

In [None]:
# Viterbi 알고리즘으로 가장 우도 (likelihood)가 높은 경로를 계산한다.
viterbi_likelihood, viterbi_path = my_hmm.viterbi(my_observations)

In [None]:
# 가장 유력한 tag 시퀀스 예측.
tag_pred = []
for s in viterbi_path[1:]:
    tag_pred.append(s[1].name)

In [None]:
# POS tagging 실시.
my_result = " ".join( [ f"{a_word}({a_tag})" for a_word, a_tag in zip(my_sentence.split(), tag_pred) ] )

In [None]:
# 결과 출력.
print("POS tagging 결과 : {}".format(my_result))
print("우도 함수의 값   : {:0.6f}".format(np.exp(viterbi_likelihood)))

In [None]:
# NLTK의 태깅 결과와 비교한다.
res = nltk.pos_tag(my_observations)
print(" ".join( [ f"{a_word}({a_tag})" for a_word, a_tag in res ] ))