<a href="https://colab.research.google.com/github/chasubeen/ML_lab/blob/main/3_HMM_viterbi.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [1]:
import os
import re
import numpy as np

# **1. Preprocessing**

## **1-1. Load data**
- CoNLL-2000 file을 읽어와 word(x)와 tag(z) 처리
- 특수 품사(`remove_pos`) 제거

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


In [3]:
# data path

train_path = "/content/drive/MyDrive/ML_lab/3_HMM/data/train.txt"
test_path = "/content/drive/MyDrive/ML_lab/3_HMM/data/test.txt"

In [4]:
remove_pos = ['#', '$', "''", '(', ')', ',', '.', ':', '``', 'POS', 'SYM']

In [5]:
### Read file

def read_file(path):
  raw = open(path, 'r').read().strip().split('\n') # 파일을 읽어와 문장마다 나누기
  words, tags = [], []

  for line in raw:
    if line.strip(): # 빈 줄이 아니면
      parts = line.split() # 열 분할

      if len(parts) == 3:
        word, pos, chunk = parts
        if pos not in remove_pos: # 특정 POS는 제거
          words.append(word.lower()) # 소문자로 변환
          tags.append(pos)
        else:
          continue # 적절한 tag가 없는 경우는 건너뜀

  return words, tags

In [6]:
train_words, train_tags = read_file(train_path)
test_words, test_tags = read_file(test_path)

## **1-2. Preprocess Data**

### **a) 특수 문자 제거**

In [7]:
def only_alphabet(words, tags):
  processed_words = []
  processed_tags = []

  for word, tag in zip(words, tags):
    alphabetic = re.sub('[^a-z]', '', word)  # 알파벳만 남기기
    if alphabetic:
      processed_words.append(alphabetic)
      processed_tags.append(tag)

  return processed_words, processed_tags

In [8]:
train_words, train_tags = only_alphabet(train_words, train_tags)
test_words, test_tags = only_alphabet(test_words, test_tags)

### **b) POS Tag 통합 및 변경**

In [9]:
def modify_tags(tags):
  modified_tags = []

  for tag in tags:
    if tag in ['JJ', 'JJR', 'JJS']:
      modified_tags.append('ADJ')
    elif tag in ['NN', 'NNS', 'NNP', 'NNPS']:
      modified_tags.append('N')
    elif tag in ['PRP', 'PRP$', 'WDT', 'WP', 'WP$']:
      modified_tags.append('PN')
    elif tag in ['RB', 'RBR', 'RBS', 'WRB']:
      modified_tags.append('AD')
    elif tag in ['VB', 'VBD', 'VBG', 'VBN', 'VBP', 'VBZ']:
      modified_tags.append('V')
    else:
      modified_tags.append(tag)  # 다른 경우는 그대로 유지

  return modified_tags

In [10]:
train_tags = modify_tags(train_tags)
test_tags = modify_tags(test_tags)

### **c) make dictionary**

In [11]:
def make_dict(tags, words):
  tag_to_index = {tag: idx for idx, tag in enumerate(set(tags))}
  vocab_to_index = {word: idx for idx, word in enumerate(set(words))}

  return tag_to_index, vocab_to_index

In [12]:
train_tag_dict, train_word_dict = make_dict(train_tags, train_words)
test_tag_dict, test_word_dict = make_dict(test_tags, test_words)

### **e) index 변환**

In [13]:
### () to index

def to_index(objs, obj_dict):
  return [obj_dict[obj] for obj in objs if obj in obj_dict]

In [14]:
train_tag_idx = to_index(train_tags, train_tag_dict)
test_tag_idx = to_index(test_tags, test_tag_dict)

In [15]:
train_data_idx = to_index(train_words, train_word_dict)
test_data_idx = to_index(test_words, test_word_dict)

## **1-3. 최종 데이터 확인**

In [16]:
print(train_tags[:5])
print()
print(train_words[:5])

['N', 'IN', 'DT', 'N', 'V']

['confidence', 'in', 'the', 'pound', 'is']


# **2. Modeling**

In [17]:
class HMM_viterbi:
  def __init__(self, num_states):
    self.num_states = num_states # Hidden 상태의 개수(POS 태그 개수)

  def fit(self, X, Z, word_size, smoothing=1.0):
    ## pi 추정
    # Initialization
    self.pi = np.zeros(self.num_states)  # initial state probabilities

    for z in Z:
      self.pi[z[0]] += 1
    self.pi = (self.pi + smoothing) / (len(Z) + smoothing * self.num_states)

    ## A 추정(transition probabilities matrix)
    # Initialization
    self.A = np.zeros((self.num_states, self.num_states))

    for z in Z:
      for i in range(len(z) - 1):
        self.A[z[i], z[i + 1]] += 1
    self.A = (self.A + smoothing) / (self.A.sum(axis=1, keepdims=True) + smoothing * self.num_states)

    ## B 추정(emission probabilities matrix)
    # Initialization
    self.B = np.zeros((self.num_states, word_size))

    for x, z in zip(X, Z):
      for xi, zi in zip(x, z):
        self.B[zi, xi] += 1
    self.B = (self.B + smoothing) / (self.B.sum(axis=1, keepdims=True) + smoothing * word_size)

  def calculate_likelihood(self, V, t, z_t):
    return V[t-1] + np.log(self.A[:, z_t]) + np.log(self.B[z_t, t])

  def update_state(self, V, t):
    return np.argmax(V[t-1] + np.log(self.A[:, t]))

  def get_latent_path(self, X, num_words):
    V = np.zeros((num_words, self.num_states))
    state_idx = np.zeros((num_words, self.num_states), dtype=int)

    # Initialization
    V[0] = np.log(self.pi) + np.log(self.B[:, X[0]])

    # Recursion
    for t in range(1, num_words):
      V[t] = np.max(V[t - 1, :, np.newaxis] + np.log(self.A.T) + np.log(self.B[:, X[t]]), axis=0)
      state_idx[t] = np.argmax(V[t - 1, :, np.newaxis] + np.log(self.A.T), axis=0)

    # Traceback
    z = np.zeros(num_words, dtype=int)
    z[-1] = np.argmax(V[-1])
    for t in range(num_words - 2, -1, -1):
      z[t] = state_idx[t + 1, z[t + 1]]

    return z

  def predict(self, X):
    num_words = len(X)
    return self.get_latent_path(X, num_words)

# **3. Training**

In [18]:
num_states = len(train_tag_dict)
vocab_size = len(train_word_dict)

In [19]:
model = HMM_viterbi(num_states = num_states)

In [20]:
# 학습 데이터 준비 (각 문장을 인덱스 형태로 변환)

train_sequences = [train_data_idx]
train_tags_sequences = [train_tag_idx]

In [21]:
smoothing = 1.0  # 스무딩 파라미터 설정
model.fit(train_sequences, train_tags_sequences, vocab_size, smoothing)

# **4. Testing**

In [22]:
# likelihood 계산 및 시각화
def calculate_likelihood_sequence(model, X, Z):
  likelihoods = []
  for i in range(len(X)):
    x = X[i]
    z = Z[i]
    V = np.zeros((len(x), model.num_states))

    # Initialization
    V[0] = np.log(model.pi) + np.log(model.B[:, x[0]])

    # Recursion
    for t in range(1, len(x)):
      V[t] = np.max(V[t - 1, :, np.newaxis] + np.log(model.A.T) + np.log(model.B[:, x[t]]), axis=0)

    # Calculate likelihood for this sequence
    likelihood = np.sum(V[-1])
    likelihoods.append(likelihood)

  return likelihoods

In [23]:
# 테스트 데이터 준비
test_sequences = [test_data_idx]
test_tags_sequences = [test_tag_idx]

In [24]:
# 각 반복(iteration)마다의 likelihood 계산
train_likelihoods = calculate_likelihood_sequence(model, train_sequences, train_tags_sequences)
test_likelihoods = calculate_likelihood_sequence(model, test_sequences, test_tags_sequences)

In [26]:
import matplotlib.pyplot as plt