# 基于HMM的词性标注 (Part-Of-Speech tagging via HMM)

In [1]:
import numpy as np
from tqdm import tqdm
from scipy.special import logsumexp
from scipy.sparse import lil_matrix
import sys

In [2]:
class CorpusHelper:
    """
    用于读取并枚举语料库的辅助类
    """
    def __init__(self, path, sent_end_token="."):
        """
        param: path, 语料库路径, 类别string
        param: sent_end_token, 句末标点, 类别string
        """
        self.path = path
        self.token2id = {} # id从0开始
        self.id2token = {}
        self.tag2id = {}   # id从0开始
        self.id2tag = {}
        self.sent_end_token = sent_end_token
        self.prepare_dict()
        
        
    def read_lines(self):
        """
        读取数据
        
        return: token和词性, 类别tuple(类别，词性)
        """
        with open(self.path, "r") as f:
            for line in tqdm(f):
                token, pos_tag = line.strip().split("/")
                yield token, pos_tag
                
                
    def read_lines2id(self):
        """
        读取数据，并将token和tag转化为id
        """
        for token, pos_tag in self.read_lines():
            yield self.token2id[token], self.tag2id[pos_tag]
            
    def is_end_tokenid(self, token_id):
        """
        判断是否句末标点id
        
        param: token_id 待验证tokenid，类别int
        return: 是否为句末tokenid, 类别bool
        """
        return token_id == self.token2id[self.sent_end_token]
    
    
    def id_to_tags(self, ids):
        """
        将id序列转化为词性标注
        
        param: ids, 待转化词性id，类别list[int]
        return: 词性标注序列, 类别list[string]
        """
        return [self.id2tag[id] for id in ids]
    
    
    def id_to_tokens(self, ids):
        """
        将id序列转化为token序列
        
        param: ids, 待转化id，类别list[int]
        return: token序列, 类别list[string]
        """
        return [self.id2token[id] for id in ids]
            
                
    def _update_dict(self, symbol2id, id2symbol, symbol):
        """
        给定新项，更新词典:
        
        param: symbol2id, 符号id映射词典, 类型dict
        param: id2symbol, id符号映射词典, 类型dict
        param: symbol, 待加入符号, 类型string
        """
        new_id = len(symbol2id)
        symbol2id[symbol] = new_id
        id2symbol[new_id] = symbol
        
        
    def prepare_dict(self):
        """
        根据语料库准备词典
        """
        print("Start constructing dictionaries...")
        for token, pos_tag in self.read_lines():
            if not token in self.token2id:
                self._update_dict(self.token2id, 
                                  self.id2token,
                                  token)
                
            if not pos_tag in self.tag2id:
                self._update_dict(self.tag2id,
                                  self.id2tag,
                                  pos_tag
                                 )        
        print("Finished construction.")
        

In [3]:
class HMMPOSTagger:
    """
    HMM 词性标注模型，实现模型的定义，训练和预测等功能
    HMM 参数: 
        初始状态概率向量 pi, 
        状态转移概率矩阵 A, 
        观测概率矩阵    B
    """
    
    def __init__(self, corpus_helper, eps=None):
        """
        param: corpus_helper，语料库辅助类实例，类别CorpusHelper
        param: eps, 极小值，用于平滑log计算，类别float
        """
        self.corpus_helper = corpus_helper
        self.n_tokens = len(corpus_helper.token2id)
        self.n_tags = len(corpus_helper.tag2id)
        self.pi = np.zeros(self.n_tags, dtype=np.float)
        self.A = np.zeros((self.n_tags, self.n_tags), dtype=np.float)
        self.B = lil_matrix((self.n_tags, self.n_tokens), dtype=np.float)
        self.eps = np.finfo(float).eps if eps is None else eps
        
    def train(self):
        """
        训练模型，完成语料库的统计工作       
        """
        
        last_tag_id = None # 记录前一个tag，若其值为None则表明当前为新句开始。
        for token_id, tag_id in corpus_helper.read_lines2id():
            
            # 无论如何都要更新B的统计
            self.B[tag_id, token_id] += 1
            
            if last_tag_id is None:
                # 若当前是新句子的开始，需要更新pi
                self.pi[tag_id] += 1
            else:
                # 否则，更新A
                self.A[last_tag_id, tag_id] += 1
                
            # 更新上一时刻tag
            last_tag_id = None if corpus_helper.is_end_tokenid(token_id) else tag_id
            
        # 转化为概率
        self.pi = self.pi / sum(self.pi)
        self.A = self.A / self.A.sum(axis=1).reshape(-1,1)
        self.B = self.B / self.B.sum(axis=1).reshape(-1,1)
        
        print("训练结束")
        print("pi:{}".format(self.pi))
        print("A[0,:]:\n{}".format(self.A[0]))
        
        
    def _log(self, p):
        """
        log 函数，考虑平滑
        """
        return np.log(p + self.eps)

    
    def decode(self, sentence):
        """
        给定句子，使用Viterbi算法找到最佳词性标注序列
        
        注意！该玩具程序不做未登录词和分词等处理，若需要可自行扩展功能。
        
        param: sentence, 输入句子, 类型string
        return:词性标注序列, 类型list[string]
        """
        if not sentence:
            print("请输入句子")
            return ""
        
        # (这里没有考虑未登录词的情况)
        token_ids = [self.corpus_helper.token2id[token] for token in sentence.split(" ")]
        n_tags, n_tokens = self.n_tags, len(token_ids)
        A, B = self.A, self.B
        
        # 初始化动态规划存储矩阵和记录最佳路径的回溯矩阵
        dp = np.zeros((n_tags, n_tokens), dtype=np.float)
        traces = np.zeros((n_tags, n_tokens), dtype=np.int)
      
        # 初始化第一个token的位置
        for i in range(n_tags):
            dp[i,0] = self._log(self.pi[i]) + self._log(self.B[i,token_ids[0]])
            
        # 动态规划更新第二个token开始的分数
        for t in range(1, n_tokens):
            
            token_id = token_ids[t] # 当前token id
            
            for i in range(n_tags):
                
                dp[i, t] = -sys.maxsize # 初始值为系统最小值
                
                for k in range(n_tags):
                    score = dp[k, t-1] + self._log(A[k, i]) + self._log(B[i, token_id])
                    
                    if score > dp[i, t]:
                        dp[i, t] = score
                        traces[i, t] = k

        # dp中最佳路径的最终tag
        last_best_tag = np.argmax(dp[:, -1])
        
        # 回溯最佳路径
        decoded = [0] * n_tokens 
        
        decoded[-1] = last_best_tag
        for t in range(n_tokens-1,0,-1):
            last_best_tag = traces[last_best_tag, t]
            decoded[t-1] = last_best_tag
    
        pos_tags = self.corpus_helper.id_to_tags(decoded)
        return pos_tags

In [4]:
corpus_helper = CorpusHelper("./traindata.txt")
print("Number of tags: {}\nNumber of tokens: {}".format(len(corpus_helper.tag2id), len(corpus_helper.token2id)))
print(corpus_helper.tag2id)

tagger = HMMPOSTagger(corpus_helper)
tagger.train()

198796it [00:00, 1181194.15it/s]
0it [00:00, ?it/s]

Start constructing dictionaries...
Finished construction.
Number of tags: 54
Number of tokens: 18978
{'NNP': 0, ',': 1, 'VBG': 2, 'TO': 3, 'VB': 4, 'NN': 5, 'IN': 6, 'JJ': 7, 'VBD': 8, 'NNS': 9, 'CD': 10, 'CC': 11, 'PRP': 12, 'MD': 13, 'DT': 14, '.': 15, 'VBZ': 16, 'VBN': 17, 'WDT': 18, 'VBP': 19, 'POS': 20, 'RB': 21, '$': 22, 'PRP$': 23, ':': 24, 'JJR': 25, '``': 26, "''": 27, 'WP': 28, 'JJS': 29, 'WRB': 30, 'RBR': 31, 'NNPS': 32, 'RP': 33, 'WP$': 34, 'EX': 35, '(': 36, ')': 37, 'PDT': 38, 'RBS': 39, 'FW': 40, 'UH': 41, 'SYM': 42, 'LS': 43, '#': 44, 'VBG|NN': 45, 'JJ|NN': 46, 'RB|IN': 47, 'NNS|NN': 48, 'VBN|JJ': 49, 'VB|NN': 50, 'RBR|JJR': 51, 'NN|NNS': 52, 'JJ|RB': 53}


198796it [00:01, 131180.38it/s]

训练结束
pi:[1.81324111e-01 0.00000000e+00 1.00049407e-02 3.33498024e-03
 3.95256917e-03 3.68083004e-02 1.11660079e-01 3.66847826e-02
 6.17588933e-04 3.81669960e-02 8.76976285e-03 5.18774704e-02
 6.02766798e-02 2.47035573e-04 2.17267787e-01 0.00000000e+00
 1.48221344e-03 6.05237154e-03 8.64624506e-04 2.47035573e-04
 0.00000000e+00 4.73073123e-02 0.00000000e+00 7.16403162e-03
 1.72924901e-03 2.09980237e-03 7.53458498e-02 6.36116601e-02
 2.59387352e-03 1.85276680e-03 5.92885375e-03 1.97628458e-03
 2.84090909e-03 0.00000000e+00 0.00000000e+00 2.71739130e-03
 5.92885375e-03 5.92885375e-03 9.88142292e-04 3.70553360e-04
 1.23517787e-04 0.00000000e+00 0.00000000e+00 1.85276680e-03
 0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00
 0.00000000e+00 0.00000000e+00 0.00000000e+00 0.00000000e+00
 0.00000000e+00 0.00000000e+00]
A[0,:]:
[3.79116341e-01 1.41891194e-01 1.29038918e-03 8.25849076e-03
 9.80695778e-04 5.28027253e-02 4.27376897e-02 8.72303087e-03
 6.72550841e-02 2.33302364e-02 1.9975




In [5]:
sent = "I am a good programmer ."
pos_tags = tagger.decode(sent)
print(sent)
print(' '.join(pos_tags))

I am a good programmer .
PRP VBP DT JJ NN .


In [6]:
# 无监督
from hmmlearn import hmm
model=hmm.MultinomialHMM(n_components=6)
long_sent = [token_id for token_id, _ in corpus_helper.read_lines2id()]
model.fit(np.array(long_sent).reshape(-1,1))

198796it [00:00, 1148816.28it/s]


MultinomialHMM(algorithm='viterbi', init_params='ste', n_components=6,
               n_iter=10, params='ste',
               random_state=RandomState(MT19937) at 0x7F5E908FE8D0,
               startprob_prior=1.0, tol=0.01, transmat_prior=1.0,
               verbose=False)

In [7]:
print(model.startprob_)
print(model.transmat_)
print(model.emissionprob_)
print()
sents = ["I am a good programmer .", "You have a new book ."]
for sent in sents:
    token_ids = [corpus_helper.token2id[w] for w in sent.split(" ")] 
    print("="*25)
    print(sent)
    print(model.decode(np.array(token_ids).reshape(-1,1)))
    print("="*25)


[9.94816506e-01 5.90719636e-10 5.70470715e-05 5.06236832e-03
 4.45699806e-14 6.40782416e-05]
[[0.16541853 0.1562406  0.17235831 0.18549638 0.16903852 0.15144767]
 [0.17065134 0.15239057 0.18182866 0.19202188 0.15776145 0.14534611]
 [0.17404666 0.1509628  0.17516193 0.18857971 0.15463097 0.15661792]
 [0.18019045 0.17238159 0.16452027 0.14960147 0.16869493 0.16461128]
 [0.18333828 0.17992215 0.18174878 0.1535702  0.15451066 0.14690993]
 [0.17038877 0.19275633 0.18047987 0.14129436 0.16840071 0.14667995]]
[[1.80216536e-04 9.96836816e-02 3.25359625e-04 ... 7.07893565e-08
  1.23378509e-06 6.74999528e-06]
 [7.48413415e-06 9.37375727e-02 7.39038138e-05 ... 8.06400763e-06
  7.71788397e-06 7.05876440e-07]
 [2.67694359e-05 5.12280412e-02 1.93237799e-04 ... 2.44528069e-06
  8.79107102e-06 7.29237967e-06]
 [5.04934498e-05 1.64695585e-02 3.88238498e-04 ... 4.79079806e-06
  5.13306864e-07 8.72847493e-06]
 [2.89775012e-06 2.16572308e-04 1.91525390e-04 ... 7.48727345e-06
  3.53728744e-06 2.27453982e-0