In [1]:
import numpy as np

In [2]:
# load trained HMM model
import json
with open('./hmm-zh-segmentation.json', 'r', encoding='utf8') as f:
    data = json.load(f)


states = data['states']
start_probability = data['start_probability']
transition_probability = data['transition_probability']
emission_probability = data['emission_probability']
vocabulary = data['vocabulary']
print(states)
print(start_probability)
print(transition_probability)
print(emission_probability.keys())
print('vocabulary cap: ', len(vocabulary))

['B', 'M', 'E', 'S']
{'B': 0.6345649207515482, 'S': 0.36543507924845176}
{'B': {'E': 0.8532242244876338, 'M': 0.14677577551236617}, 'E': {'B': 0.48537965431249463, 'S': 0.5146203456875054}, 'S': {'S': 0.42896261210773334, 'B': 0.5710373878922667}, 'M': {'M': 0.3456713007046277, 'E': 0.6543286992953723}}
dict_keys(['B', 'E', 'S', 'M'])
vocabulary cap:  4700


In [3]:
def generate_index_map(lables):
    id2label = {}
    label2id = {}
    for idx, label in enumerate(lables):
        id2label[idx] = label
        label2id[label] = idx
    return id2label, label2id
 
states_id2label, states_label2id = generate_index_map(states)
vocabulary_id2label, vocabulary_label2id = generate_index_map(vocabulary)
print(states_id2label, states_label2id)
# print(vocabulary_id2label, vocabulary_label2id)

{0: 'B', 1: 'M', 2: 'E', 3: 'S'} {'B': 0, 'M': 1, 'E': 2, 'S': 3}


In [4]:
def convert_map_to_vector(probs_map, col_label2id):
    """conver to 1d vector"""
    v = np.zeros(len(col_label2id), dtype=float)
    for col, value in probs_map.items():
        v[col_label2id[col]] = value
    return v


def convert_map_to_matrix(probs_map, row_label2id, col_label2id):
    """convert to matrix"""
    m = np.zeros((len(row_label2id), len(col_label2id)), dtype=float)
    for row, cols in probs_map.items():
        for col, value in cols.items():
            m[row_label2id[row]][col_label2id[col]] = value
    return m

print(states_id2label)
A = convert_map_to_matrix(transition_probability, states_label2id, states_label2id)
print('状态转移矩阵 A: ', A)
pi = convert_map_to_vector(start_probability, states_label2id)
print('初始状态概率向量 π: ', pi)
B = convert_map_to_matrix(emission_probability, states_label2id, vocabulary_label2id)
print('观测概率矩阵 B: ', B)

{0: 'B', 1: 'M', 2: 'E', 3: 'S'}
状态转移矩阵 A:  [[0.         0.14677578 0.85322422 0.        ]
 [0.         0.3456713  0.6543287  0.        ]
 [0.48537965 0.         0.         0.51462035]
 [0.57103739 0.         0.         0.42896261]]
初始状态概率向量 π:  [0.63456492 0.         0.         0.36543508]
观测概率矩阵 B:  [[1.70874158e-06 0.00000000e+00 4.61360227e-05 ... 0.00000000e+00
  2.17010181e-03 2.22136405e-05]
 [0.00000000e+00 0.00000000e+00 2.28527899e-05 ... 0.00000000e+00
  4.72290992e-04 8.37935631e-05]
 [8.54370790e-06 1.70874158e-06 3.24660900e-05 ... 0.00000000e+00
  1.58400344e-03 1.19611911e-05]
 [0.00000000e+00 0.00000000e+00 1.71519722e-05 ... 3.81154938e-06
  1.46744651e-04 2.85866203e-05]]


In [5]:
def viterbi(obs_idxs, A, B, pi):
    """
    -------
    V : numpy.ndarray
        V [s][t] = Maximum probability of an observation sequence ending
                   at time 't' with final state 's'
    prev : numpy.ndarray
        Contains a pointer to the previous state at t-1 that maximizes
        V[state][t]
        
    V对应δ，prev对应ψ
    """
    N = A.shape[0]
    T = len(obs_idxs)
    prev = np.zeros((T - 1, N), dtype=int)

    # DP matrix containing max likelihood of state at a given time
    V = np.zeros((N, T))
    V[:,0] = pi * B[:,obs_idxs[0]]

    for t in range(1, T):
        for n in range(N):
            seq_probs = V[:,t-1] * A[:,n] * B[n, obs_idxs[t]]
            prev[t-1,n] = np.argmax(seq_probs)
            V[n,t] = np.max(seq_probs)

    reversed_path = []
    state_ptr = np.argmax(V[:,-1])  # end state
    reversed_path.append(state_ptr)
    for ptrs in reversed(prev):
        state_ptr = ptrs[state_ptr]
        reversed_path.append(state_ptr)
    path_idx = reversed(reversed_path)

    return path_idx, [states[x] for x in path_idx]

In [6]:
def segmentation(observations):
    obs_seq = [vocabulary_label2id[o] for o in observations]
    words = []
    path_idx, path = viterbi(obs_seq, A, B, pi)
    word = []
    for char, state in zip(observations, path):
        word.append(char)
        if state in ['E', 'S']:
            words.append(''.join(word))
            word = []
    if len(word) > 0:
        words.append(''.join(word))
    return words

In [7]:
segmentation('小明是个好学生')

['小明', '是', '个', '好', '学生']

In [8]:
segmentation('清华大学')

['清华', '大学']

In [9]:
segmentation('干一行行一行')

['干一', '行行', '一行']

In [10]:
segmentation('干一行要行一行')

['干一行', '要行', '一行']

In [11]:
segmentation('作战处女干部')

['作战', '处女', '干部']

In [12]:
segmentation('作战处的女干部')

['作战处', '的', '女', '干部']