In [298]:
"""
    训练完后的模型进行存储实际上就是存储发射概率，转移概率和初始状态概率
    在HMM模型训练时比较耗时的是计算发射概率，存储比较占空间的也是发射概率n*输出种类数，而转移概率置为n*n（n为状态的个数），初始状态概率为n*1，训练很快而且存储消耗很小
"""

class HMM(object):
    def __init__(self):
        import os
        #用于存取算法的中间结果，不用每次都要从新进行训练
        self.model_file = "./data/hmm_data.pkl"
        
        #状态集合
        self.state_list = ["B","M","E","S"] #B:词首  M:词中  E:词尾 S:单独成词
        
        #参数加载，用于判断时候需要从新加载model_file
        self.load_para = False
         

    def try_load_model(self,trained):
        """
            判断是直接使用中间结果进行分词还是从头进行训练，当选择从新训练时将变量清空
        """
        if trained==True:
            import pickle
            with open(self.model_file,'rb') as f:
                self.A_dic = pickle.load(f)
                self.B_dic = pickle.load(f)
                self.Pi_dic = pickle.load(f)
                self.load_para =True
        else:
            self.A_dic = {}   #状态转移矩阵清空
            self.B_dic = {}   #发射矩阵清空
            self.Pi_dic = {}  #状态的初始化该概率矩阵
            self.load_para = False
            
    def train(self,path):
        
        def init_parameters():
            """
                初始化参数
            """
            for state in self.state_list:
                self.A_dic[state] = {s:0.0 for s in self.state_list}  #初始化发射概率，格式：{'S': {'S': 0.0, 'E': 0.0, 'M': 0.0, 'B': 0.0}, 'E': {'S': 0.0, 'E': 0.0, 'M': 0.0, 'B': 0.0}, 'M': {'S': 0.0, 'E': 0.0, 'M': 0.0, 'B': 0.0}, 'B': {'S': 0.0, 'E': 0.0, 'M': 0.0, 'B': 0.0}}
                self.B_dic[state] = {}
                self.Pi_dic[state] = 0.0
                Count_dic[state] = 0
                
        def makeLabel(text):
            """
                标注各个词中各个字的构词未知(状态)
            """
            out_text = []
            if len(text) == 1:
                out_text.append('S')
            else:
                out_text += ['B'] + ["M"]*(len(text)-2) + ['E']
            return out_text
        
        
        self.try_load_model(False)
        
        #统计状态出现的次数,求p(o)
        Count_dic = {}
        
        init_parameters()
        line_num = -1
        
        #观察者集合，朱主要是字和标点等
        words = set()
        
        
        with open(path,encoding='utf8') as f:
            for line in f:
                line_num = line_num +1 
                line = line.strip()
                if not line:
                    continue
                
                
                word_list = [i for i in line if i!=" "]  #一行的各个字列表
                
                
                words |= set(word_list)  #全部的字列表
                
               
                line_list = line.split()   #一行中出现的词列表
                line_state = []    #一行中各个字对应的状态列表
                for w in line_list:
                    line_state.extend(makeLabel(w))
                    
                
                assert len(word_list) == len(line_state)
                
                
                for k,v in enumerate(line_state):
                    Count_dic[v] += 1 #在当前字代表的状态上加1
                    if k==0:
                        self.Pi_dic[v] += 1 #每个句子的第一个字的初始状态用于计算初始状态概率
                    else:
                        #计算转移概率,这里是进行计数
                        self.A_dic[line_state[k-1]][v] += 1 
                        #计算发射概率，这里是进行计数
                        self.B_dic[line_state[k]][word_list[k]] = self.B_dic[line_state[k]].get(word_list[k],0)+1.0
        #计算状态概率矩阵
        self.Pi_dic = {k:v*1.0/line_num for k,v in self.Pi_dic.items()}
        #计算转移概率矩阵，这里是计算概率
        self.A_dic = {k:{ki:vi/Count_dic[k] for ki,vi in v.items()} for k,v in self.A_dic.items()}
        #计算发射概率矩阵，这里是计算概率,使用了加1进行平滑
        self.B_dic = {k:{ki:(vi+1)/Count_dic[k] for ki,vi in v.items()} for k,v in self.B_dic.items()}
        
        #序列化
        import pickle
        with open(self.model_file,"wb") as f:
            pickle.dump(self.A_dic,f)
            pickle.dump(self.B_dic,f)
            pickle.dump(self.Pi_dic,f)
        
        return self
        
          
        
    def viterbi(self, text, states, start_p, trans_p, emit_p):
        V = [{}] #V[t]中存储的是在t个字的位置，当该字对应的隐藏状态是“S”,"M"等可以达到最大的可能性，已经计算了前面没一个都取对达成结果最有利的概率
        path = {}
        for y in states:
            V[0][y] = start_p[y] * emit_p[y].get(text[0], 0)
            path[y] = [y]

        for t in range(1, len(text)):
            
            V.append({})
            newpath = {}
 
            #检验训练的发射概率矩阵中是否有该字
            neverSeen = text[t] not in emit_p['S'].keys() and \
                text[t] not in emit_p['M'].keys() and \
                text[t] not in emit_p['E'].keys() and \
                text[t] not in emit_p['B'].keys()
            for y in states:
                emitP = emit_p[y].get(text[t], 0) if not neverSeen else 1.0 #设置未知字单独成词
                (prob, state) = max(
                    [(V[t - 1][y0] * trans_p[y0].get(y, 0) *
                      emitP, y0)
                     for y0 in states if V[t - 1][y0] > 0])
                V[t][y] = prob
                newpath[y] = path[state] + [y]
            print(V)
            print(path)
            path = newpath
        
        #这里用来过滤掉一些不符合清理的情况
        if emit_p['M'].get(text[-1], 0) > emit_p['S'].get(text[-1], 0):
            (prob, state) = max([(V[len(text) - 1][y], y) for y in ('E', 'M')])
        else:
            (prob, state) = max([(V[len(text) - 1][y], y) for y in states])
        """
            可以不进行上面的if-else直接进行这一部分，是最原汁原味的viterbi算法
        (prob, state) = max([(V[len(text) - 1][y], y) for y in states])
        """
        return (prob, path[state])

                
        
    def cut(self,text):
        import os
        if not self.load_para:
            self.try_load_model(os.path.exists(self.model_file))
        prob,pos_list = self.viterbi(text,self.state_list,self.Pi_dic,self.A_dic,self.B_dic)
        print(prob)
        print(pos_list)
        begin,next = 0,0
        for i,char in enumerate(text):
            pos = pos_list[i]
            if pos == 'B':
                begin = i
            elif pos == 'E':
                yield text[begin:i+1]
                next = i+1
            elif pos=='S':
                yield char
                next = i + 1
        if next<len(text):
            yield text[next:]

In [238]:
hmm = HMM()
hmm.train("./data/trainCorpus.txt_utf8.txt")
text = "这是一个非常棒的方案!"
res = hmm.cut(text)
print(str(list(res)))

### 这里是viterbi算法调试模式迭过程中V输出的结果，以及最终的概率和隐含状态连

In [301]:

hmm = HMM()
hmm.train("./data/trainCorpus.txt_utf8.txt")
start_p,trans_p,emit_p,state = hmm.Pi_dic,hmm.A_dic,hmm.B_dic,hmm.state_list
text = "这是一个非常棒的方案!"
hmm.viterbi(text,state,start_p,trans_p,emit_p)

[{'B': 0.003291232115235236, 'S': 0.0012044407157278893, 'E': 0.0, 'M': 0.0}, {'B': 1.3735188283184088e-07, 'S': 7.866572112633136e-06, 'E': 1.4243040542188446e-05, 'M': 1.2667977122628068e-07}]
{'B': ['B'], 'S': ['S'], 'E': ['E'], 'M': ['M']}
[{'B': 0.003291232115235236, 'S': 0.0012044407157278893, 'E': 0.0, 'M': 0.0}, {'B': 1.3735188283184088e-07, 'S': 7.866572112633136e-06, 'E': 1.4243040542188446e-05, 'M': 1.2667977122628068e-07}, {'B': 9.159459714932618e-08, 'S': 7.431913756579592e-08, 'E': 4.0602335642896605e-10, 'M': 4.1915951333893927e-10}]
{'B': ['S', 'B'], 'S': ['S', 'S'], 'E': ['B', 'E'], 'M': ['B', 'M']}
[{'B': 0.003291232115235236, 'S': 0.0012044407157278893, 'E': 0.0, 'M': 0.0}, {'B': 1.3735188283184088e-07, 'S': 7.866572112633136e-06, 'E': 1.4243040542188446e-05, 'M': 1.2667977122628068e-07}, {'B': 9.159459714932618e-08, 'S': 7.431913756579592e-08, 'E': 4.0602335642896605e-10, 'M': 4.1915951333893927e-10}, {'B': 2.6625621852931926e-11, 'S': 1.6321590727295764e-10, 'E': 5

(6.804555363172183e-29,
 ['B', 'E', 'B', 'E', 'B', 'E', 'S', 'S', 'B', 'E', 'S'])

1.6733155446748817e-30
['B', 'E', 'B', 'E', 'B', 'E', 'S', 'S', 'B', 'M', 'E']
['这是', '一个', '非常', '棒', '的', '方案!']
