In [2]:
#逆向最大匹配
class IMM(object):
    def __init__(self, dic_path):
        self.dictionary = set()
        self.maximum = 0
        #读取词典
        with open(dic_path, 'r', encoding='utf8') as f:
            for line in f:
                line = line.strip()
                if not line:
                    continue
                self.dictionary.add(line)
                if len(line) > self.maximum:
                    self.maximum = len(line)
    def cut(self, text):
        result = []
        index = len(text)
        while index > 0:
            word = None
            for size in range(self.maximum, 0, -1):
                if index - size < 0:
                    continue
                piece = text[(index - size):index]
                if piece in self.dictionary:
                    word = piece
                    result.append(word)
                    index -= size
                    break
            if word is None:
                index -= 1
        return result[::-1]

def main():
    text = "南京市长江大桥"
    
    tokenizer = IMM('./data/imm_dic.utf8')
    print(tokenizer.cut(text))

main()

['南京市', '长江大桥']


In [3]:
class HMM(object):
    def __init__(self):
        pass
    
    def try_load_model(self, trained):
        pass
    
    def train(self, path):
        pass
    
    def viterbi(self, text, states, start_p, trans_p, emit_p):
        pass
    
    def cut(self, text):
        pass

In [4]:
class HMM(object):
    def __init__(self):
        import os

        # 主要是用于存取算法中间结果，不用每次都训练模型
        self.model_file = './data/hmm_model.pkl'

        # 状态值集合
        self.state_list = ['B', 'M', 'E', 'S']
        # 参数加载,用于判断是否需要重新加载model_file
        self.load_para = False

    # 用于加载已计算的中间结果，当需要重新训练时，需初始化清空结果
    def try_load_model(self, trained):
        if trained:
            import pickle
            with open(self.model_file, 'rb') as f:
                self.A_dic = pickle.load(f)
                self.B_dic = pickle.load(f)
                self.Pi_dic = pickle.load(f)
                self.load_para = True

        else:
            # 状态转移概率（状态->状态的条件概率）
            self.A_dic = {}
            # 发射概率（状态->词语的条件概率）
            self.B_dic = {}
            # 状态的初始概率
            self.Pi_dic = {}
            self.load_para = False

    # 计算转移概率、发射概率以及初始概率
    def train(self, path):

        # 重置几个概率矩阵
        self.try_load_model(False)

        # 统计状态出现次数，求p(o)
        Count_dic = {}

        # 初始化参数
        def init_parameters():
            for state in self.state_list:
                self.A_dic[state] = {s: 0.0 for s in self.state_list}
                self.Pi_dic[state] = 0.0
                self.B_dic[state] = {}

                Count_dic[state] = 0

        def makeLabel(text):
            out_text = []
            if len(text) == 1:
                out_text.append('S')
            else:
                out_text += ['B'] + ['M'] * (len(text) - 2) + ['E']

            return out_text

        init_parameters()
        line_num = -1
        # 观察者集合，主要是字以及标点等
        words = set()
        with open(path, encoding='utf8') as f:
            for line in f:
                line_num += 1

                line = line.strip()
                if not line:
                    continue

                word_list = [i for i in line if i != ' ']
                words |= set(word_list)  # 更新字的集合

                linelist = line.split()

                line_state = []
                for w in linelist:
                    line_state.extend(makeLabel(w))
                
                assert len(word_list) == len(line_state)

                for k, v in enumerate(line_state):
                    Count_dic[v] += 1
                    if k == 0:
                        self.Pi_dic[v] += 1  # 每个句子的第一个字的状态，用于计算初始状态概率
                    else:
                        self.A_dic[line_state[k - 1]][v] += 1  # 计算转移概率
                        self.B_dic[line_state[k]][word_list[k]] = \
                            self.B_dic[line_state[k]].get(word_list[k], 0) + 1.0  # 计算发射概率
        
        self.Pi_dic = {k: v * 1.0 / line_num for k, v in self.Pi_dic.items()}
        self.A_dic = {k: {k1: v1 / Count_dic[k] for k1, v1 in v.items()}
                      for k, v in self.A_dic.items()}
        #加1平滑
        self.B_dic = {k: {k1: (v1 + 1) / Count_dic[k] for k1, v1 in v.items()}
                      for k, v in self.B_dic.items()}
        #序列化
        import pickle
        with open(self.model_file, 'wb') as f:
            pickle.dump(self.A_dic, f)
            pickle.dump(self.B_dic, f)
            pickle.dump(self.Pi_dic, f)

        return self

    def viterbi(self, text, states, start_p, trans_p, emit_p):
        V = [{}]
        path = {}
        for y in states:
            V[0][y] = start_p[y] * emit_p[y].get(text[0], 0)
            path[y] = [y]
        for t in range(1, len(text)):
            V.append({})
            newpath = {}
            
            #检验训练的发射概率矩阵中是否有该字
            neverSeen = text[t] not in emit_p['S'].keys() and \
                text[t] not in emit_p['M'].keys() and \
                text[t] not in emit_p['E'].keys() and \
                text[t] not in emit_p['B'].keys()
            for y in states:
                emitP = emit_p[y].get(text[t], 0) if not neverSeen else 1.0 #设置未知字单独成词
                (prob, state) = max(
                    [(V[t - 1][y0] * trans_p[y0].get(y, 0) *
                      emitP, y0)
                     for y0 in states if V[t - 1][y0] > 0])
                V[t][y] = prob
                newpath[y] = path[state] + [y]
            path = newpath
            
        if emit_p['M'].get(text[-1], 0)> emit_p['S'].get(text[-1], 0):
            (prob, state) = max([(V[len(text) - 1][y], y) for y in ('E','M')])
        else:
            (prob, state) = max([(V[len(text) - 1][y], y) for y in states])
        
        return (prob, path[state])

    def cut(self, text):
        import os
        if not self.load_para:
            self.try_load_model(os.path.exists(self.model_file))
        prob, pos_list = self.viterbi(text, self.state_list, self.Pi_dic, self.A_dic, self.B_dic)      
        begin, next = 0, 0    
        for i, char in enumerate(text):
            pos = pos_list[i]
            if pos == 'B':
                begin = i
            elif pos == 'E':
                yield text[begin: i+1]
                next = i+1
            elif pos == 'S':
                yield char
                next = i+1
        if next < len(text):
            yield text[next:]



In [5]:
hmm = HMM()
hmm.train('./data/trainCorpus.txt_utf8')  # 用的人民日报的分词语料

text = '这是一个非常棒的方案！'
res = hmm.cut(text)
print(text)
print(str(list(res)))

这是一个非常棒的方案！
['这是', '一个', '非常', '棒', '的', '方案', '！']


In [7]:
#jieba分词示例
def get_content(path):
    
    with open(path, 'r', encoding='UTF-8', errors='ignore') as f:
        content = ''
        for l in f:
            l = l.strip()
            content += l
        return content


def get_TF(words, topK=10):
    
    tf_dic = {}
    for w in words:
        tf_dic[w] = tf_dic.get(w, 0) + 1
    return sorted(tf_dic.items(), key = lambda x: x[1], reverse=True)[:topK]

def stop_words(path):
    with open(path, encoding='UTF-8') as f:
        return [l.strip() for l in f]
stop_words('./data/stop_words.utf8')



['"',
 '.',
 '。',
 ',',
 '、',
 '！',
 '？',
 '：',
 '；',
 '`',
 '﹑',
 '•',
 '＂',
 '^',
 '…',
 '‘',
 '’',
 '“',
 '”',
 '〝',
 '〞',
 '~',
 '\\',
 '∕',
 '|',
 '¦',
 '‖',
 '—',
 '(',
 ')',
 '〈',
 '〉',
 '﹞',
 '﹝',
 '「',
 '」',
 '‹',
 '›',
 '〖',
 '〗',
 '】',
 '【',
 '»',
 '«',
 '』',
 '『',
 '〕',
 '〔',
 '》',
 '《',
 '}',
 '{',
 ']',
 '[',
 '﹐',
 '¸',
 '﹕',
 '︰',
 '﹔',
 ';',
 '！',
 '¡',
 '？',
 '¿',
 '﹖',
 '﹌',
 '﹏',
 '﹋',
 '＇',
 '´',
 'ˊ',
 'ˋ',
 '-',
 '―',
 '﹫',
 '@',
 '︳',
 '︴',
 '_',
 '¯',
 '＿',
 '￣',
 '﹢',
 '+',
 '﹦',
 '=',
 '﹤',
 '‐',
 '<',
 '\xad',
 '˜',
 '~',
 '﹟',
 '#',
 '﹩',
 '$',
 '﹠',
 '&',
 '﹪',
 '%',
 '﹡',
 '*',
 '﹨',
 '\\',
 '﹍',
 '﹉',
 '﹎',
 '﹊',
 'ˇ',
 '︵',
 '︶',
 '︷',
 '︸',
 '︹',
 '︿',
 '﹀',
 '︺',
 '︽',
 '︾',
 '_',
 'ˉ',
 '﹁',
 '﹂',
 '﹃',
 '﹄',
 '︻',
 '︼',
 '的',
 '了',
 'the',
 'a',
 'an',
 'that',
 'those',
 'this',
 'that',
 '$',
 '0',
 '1',
 '2',
 '3',
 '4',
 '5',
 '6',
 '7',
 '8',
 '9',
 '?',
 '_',
 '“',
 '”',
 '、',
 '。',
 '《',
 '》',
 '一',
 '一些',
 '一何',
 '一切',
 '一则',
 '一方面',
 '一旦',


In [8]:
#分词
def main():
    import glob
    import random
    import jieba
    
    files = glob.glob('./data/news/C000013/*.txt')
    corpus = [get_content(x) for x in files[:5]]
    
    
    sample_inx = random.randint(0, len(corpus))
    sample_inx = 3
    
    import jieba.posseg as psg
    
    split_words = [x for x in jieba.cut(corpus[sample_inx]) if x not in stop_words('./data/stop_words.utf8')]
    print('样本之一：'+corpus[sample_inx])
    print('样本分词效果：'+'/ '.join(split_words))
    print('样本的topK（10）词：'+str(get_TF(split_words)))
main()

样本之一：йԱ24˵2005ףйرĳβۼѳ60ְҵϾХڵվеġְҵʾҵҵְҵᡱ˵й༱ְҵж¹ÿ귢200ǧжֱӾʧϰԪְҵ󡢷ʽϸߡʧӰӡ24չ2005յȫ30ʡֱϽУءۡġְ̨ҵ12212гβ9173ռ7511%Х˵κúйҪĳβҳβ̡ȥ걨ĳβ̽ӳʱ䲻£ƽ409꣬С20ꡣХʾִϡලҵˮƽߡ豸ְҵصԭҪԭЩҵƹȱλȱάְǿҵʶְĺϷȨ治ܵõЧıϡ˵ΪҵְҵӣҰȫලֺܾлȫܹ24ھѡ56ҹҼְҵʾҵϣЩҵΪƹְҵξ飬ʹҵְҵͶ߽
样本分词效果：й/ Ա/ 24/ ˵/ 2005/ ף/ й/ ر/ ĳ/ β/ ۼ/ ѳ/ 60/ ְ/ ҵ/ Ͼ/ Х/ ڵ/ վ/ е/ ġ/ ְ/ ҵ/ ʾ/ ҵ/ ҵ/ ְ/ ҵ/ ᡱ/ ˵/ й/ ༱/ ְ/ ҵ/ ж/ ¹/ ÿ/ 귢/ 200/ ǧ/ ж/ ֱ/ Ӿ/ ʧ/ ϰ/ Ԫ/ ְ/ ҵ/ 󡢷/ ʽ/ ϸ/ ߡ/ ʧ/ Ӱ/ ӡ/ 24/ չ/ 2005/ յ/ ȫ/ 30/ ʡ/ ֱ/ Ͻ/ У/ ء/ ۡ/ ġ/ ̨/ ְ/ ҵ/ 12212/ г/ β/ 9173/ ռ/ 7511%/ Х/ ˵/ κ/ ú/ й/ Ҫ/ ĳ/ β/ ҳ/ β/ ̡/ ȥ/ 걨/ ĳ/ β/ ̽/ ӳ/ ʱ/ 䲻/ £/ ƽ/ 409/ ꣬/ С/ 20/ ꡣ/ Х/ ʾ/ ִ/ ϡ/ ල/ ҵ/ ˮ/ ƽ/ ߡ/ 豸/ ְ/ ҵ/ ص/ ԭ/ Ҫ/ ԭ/ Щ/ ҵ/ ƹ/ / ȱ/ λ/ ȱ/ ά/ ְ/ ǿ/ ҵ/ ʶ/ ְ/ ĺ/ Ϸ/ Ȩ/ 治/ ܵ/ õ/ Ч/ ı/ ϡ/ ˵/ Ϊ/ ҵ/ ְ/ ҵ/ ӣ/ Ұ/ ȫ/ ල/ ܾ/ ֺ/ л/ ȫ/ ܹ/ 24/ ھ/ ѡ/ 56/ ҹ/ Ҽ/ ְ/ ҵ/ ʾ/ ҵ/ ϣ/ Щ/ ҵ/ Ϊ/ ƹ/ ְ/ ҵ/ ξ/ 飬/ ʹ/ ҵ/ ְ/ ҵ/ Ͷ/ ߽
样本的topK（10）词：[('ҵ', 20), ('ְ', 13), ('β', 5), ('й', 4), ('˵', 4), ('24', 3), ('ĳ', 3), ('Х', 3), ('ʾ', 3), ('ȫ', 3)]


In [9]:
#jieba词性标注示例

def main():
    import glob
    import random
    import jieba
    import jieba.posseg as psg
    
    files = glob.glob('./data/news/C000013/*.txt')
    corpus = [get_content(x) for x in files]
    
    sample_inx = random.randint(0, len(corpus))
    sample_inx = 3
    
    split_words = [w for w, t in psg.cut(corpus[sample_inx]) 
                   if w not in stop_words('./data/stop_words.utf8')
                  and t.startswith('n')]
    print('样本之一：'+corpus[sample_inx])
    print('样本分词效果：'+'/ '.join(split_words))
    print('样本的topK（10）词：'+str(get_TF(split_words)))
main()

样本之一：йԱ24˵2005ףйرĳβۼѳ60ְҵϾХڵվеġְҵʾҵҵְҵᡱ˵й༱ְҵж¹ÿ귢200ǧжֱӾʧϰԪְҵ󡢷ʽϸߡʧӰӡ24չ2005յȫ30ʡֱϽУءۡġְ̨ҵ12212гβ9173ռ7511%Х˵κúйҪĳβҳβ̡ȥ걨ĳβ̽ӳʱ䲻£ƽ409꣬С20ꡣХʾִϡලҵˮƽߡ豸ְҵصԭҪԭЩҵƹȱλȱάְǿҵʶְĺϷȨ治ܵõЧıϡ˵ΪҵְҵӣҰȫලֺܾлȫܹ24ھѡ56ҹҼְҵʾҵϣЩҵΪƹְҵξ飬ʹҵְҵͶ߽
样本分词效果：
样本的topK（10）词：[]
