## 处理训练集和测试集

In [17]:
#在txt文件中随机抽取10%作为测试集，剩余90%作为训练集
import random
from random import randint
 
oldf = open('词性标注@人民日报199801.txt', 'r',encoding='utf-8')    #要被抽取的文件（原文件）
trainf = open('traindata.txt', 'w',encoding='utf-8')  #用于存放训练集
testf = open('testdata.txt', 'w',encoding='utf-8')   #用于存放测试集
ratio = 0.9  #训练集占比

lines = oldf.readlines()
for line in lines:
    if random.random()<ratio: #数据集分割比例
        trainf.write(line)  #训练数据集
    else:
        testf.write(line)  #测试数据集
        
oldf.close()
trainf.close()
testf.close()

In [18]:
import re
#处理原始文档得到词典
def string_process(x):    #处理字符串
    a=re.sub(r'\d{8}-\d{2}-\d{3}-\d{3}/m|[/a-z！。”“，、——\[\]（）：《》……A-Z？]', "", x)
#正则表达式去除时间和后面部分的字母和其他符号
    b=a.replace("  "," ")
    return b.rstrip()

In [19]:
#处理训练集得到只有分词结果的训练集数据
def file_process():
    s=""
    with open('traindata.txt','r',encoding='UTF-8') as f:  #处理原始训练集traindata.txt
        for line1 in f:
            a=string_process(line1)
            a=string_process(a)
            s+=a+"\n"
    f.close()
    with open("process_1.txt",'w',encoding='UTF-8') as w: #结果写入process_1.txt
        w.write(s)
 
    w.close()

In [20]:
#处理测试集，先得到只有分词结果的测试集数据
def test_file_process(): 
    s = ""
    with open('testdata.txt', 'r', encoding='UTF-8') as f:  #处理原始测试集testdata.txt
        for line1 in f:
            a = string_process(line1)
            a = string_process(a)
            s += a + "\n"
    f.close()
    with open("test_process_1.txt", 'w', encoding='UTF-8') as w:  #结果写入test_process_1.txt
        w.write(s)
 
    w.close()

In [21]:
#处理分词测试集，去掉空格以用于测试模型分词效果
def delete_space():
    f = open('test_process_1.txt','r', encoding='utf-8')
    f2 = open('test_endfile.txt', 'w', encoding='utf-8')
    for line in f.readlines():  #每行都去掉空格分隔，直接连成句，写入test_endfile.txt
        r = ''.join(line.split()) 
        r += "\n"
        f2.write(r)

In [131]:
#处理训练集和测试集
file_process()

Wall time: 1.94 s


In [23]:
test_file_process()
delete_space()

如上处理过后，最终得到的process_1.txt存放分好词的训练集结果，用于生成词典和进行训练；test_endfile.txt存放去掉空格的测试集结果，用于测试分词模型的效果

## 生成词典

In [24]:
#利用分好词的训练集，生成对应的词典库
def get_dic():
    words = set() #词典集合，利用集合自动去重的特性
    with open('process_1.txt','r',encoding='utf-8') as f:
        for line in f.readlines():  #对每行内容提取所有词语
            line = line.strip("\n")
            line = line.split()
            lineword = set(line)
            words = words.union(lineword)  #不断取并集，形成词典
    worddict = list(words)
    return worddict
dic = get_dic() #生成词典dic，用于FMM和BMM

## FMM分词

In [139]:
%%time
#FMM即为正向最大匹配，需要从左向右扫描寻找词的最大匹配，从所有词最大的长度开始在词库中进行匹配，直到匹配成功
def readfile():
    max_length=0
    for i in dic:
        max_length=max(max_length,len(i))#获得最大长度
        
    inf=open("test_endfile.txt",'r',encoding='utf-8')
    outf=open("FMM分词.txt",'w',encoding='utf-8') #储存分词结果
    
    lines=inf.readlines()
    for line in lines:#分别对每一行进行正向最大匹配处理，即从最长的长度开始逐次递减尝试直至匹配成功
        my_list=[]
        len_hang=len(line)
        while len_hang>0:
            tmp=line[0:max_length] #切割字符串
            while tmp not in dic:
                if len(tmp)==1: #长度为1的时候就直接退出
                    break;
                tmp=tmp[0:len(tmp)-1]
            my_list.append(tmp)
            line=line[len(tmp):]
            len_hang=len(line)
        for i in my_list:
            outf.write(i+"/")#用/来分隔分词结果
            
    outf.close()
    inf.close()
    
    
readfile()

Wall time: 45min 30s


## BMM分词

In [140]:
%%time
#BMM即为逆向最大匹配，需要从右向左扫描寻找词的最大匹配，从所有词最大的长度开始在词库中进行匹配，直到匹配成功
def readfile():
    max_length=0
    for i in dic:
        max_length=max(max_length,len(i))#获得最大长度
        
    inf=open("test_endfile.txt",'r',encoding='utf-8')
    outf=open("BMM分词.txt",'w',encoding='utf-8') #储存分词结果
    
    lines=inf.readlines()
    for line in lines:#分别对每一行进行逆向最大匹配处理
        my_list = []
        len_hang=len(line)
        while len_hang>0:
            tmp=line[max(0,len_hang-max_length):len_hang]#取max防止超出范围
            while tmp not in dic:
                if len(tmp)==1:
                    break;
                tmp=tmp[1:len(tmp)]
            my_list.append(tmp)
            line=line[0:len_hang-len(tmp)]
            len_hang=len(line)
        while len(my_list):
            tt=my_list.pop()#这里类似栈的操作，因为是逆向进行的匹配所以输出顺序要反过来
            outf.write(tt+"/") #用/来分隔分词结果
            
    outf.close()
    inf.close()
    
    
readfile()

Wall time: 46min 50s


## 最短路径分词法

In [30]:
#构造词频词典
class WordDictModel:
    def __init__(self):
        self.word_dict = {}
        
    def update(self, filename): #更新建立词频词典，即每个词语对应有其出现次数
        f = open(filename, "r", encoding="utf-8")
        for line in f:
            words = line.split(" ")
            for word in words:
                if self.word_dict.get(word): 
                    self.word_dict[word] += 1 #如果已经出现过，次数+1
                else:
                    self.word_dict[word] = 1  #第一次出现

In [135]:
#利用词频词典，构造有向无环图（DAG图），并利用其进行分词
class DAG(WordDictModel):
    def build_dag(self, sentence): #构建有向无环图
        dag = {}
        for start in range(len(sentence)): #对每个字，为以其为开头的所有词语建立结点和连边（权值为词频）
            unique = [start + 1] #与start相连的结点在句子中下标
            tmp = [(start + 1, 1)] #边及其权重
            for stop in range(start+1, len(sentence)+1):
                fragment = sentence[start:stop]
                num = self.word_dict.get(fragment, 0)
                if num > 0 and (stop not in unique): #遇到新的词语，添加连边和权重
                    tmp.append((stop, num))
                    unique.append(stop)
            dag[start] = tmp
        return dag
    
    def predict(self, sentence):  #选择最优路径
        Len = len(sentence)
        route = [0] * Len
        dag = self.build_dag(sentence)  

        for i in range(0, Len):
            route[i] = max(dag[i], key=lambda x: x[1])[0] #每次都选择权值最大的边所连接的结点作为下一个结点
        return route
    
    def cut(self, sentence):  #按照最优路径来切分原句
        route = self.predict(sentence)
        next = 0
        word_list = []
        i = 0
        while i < len(sentence):
            next = route[i]
            word_list.append(sentence[i:next])
            i = next
        return word_list
    
    def test(self):
        inf=open("test_endfile.txt",'r',encoding='utf-8')
        outf=open("最短路径分词.txt",'w',encoding='utf-8')  #储存分词结果
        for line in inf.readlines():
            result = self.cut(line)
            for word in result:
                outf.write(word+"/")
        outf.close()
    

In [136]:
%%time
#定义DAG图对象，应用于训练集和测试集
dag_segger = DAG()
dag_segger.update("process_1.txt")
dag_segger.test()

Wall time: 14.3 s


## 最大概率分词

In [39]:
#最大概率分词法需要在词图上选择概率最大的分词路经作为最优结果

#首先生成词典，词典中包括词语本身和它出现的概率
import math
word_dict = {}
num_words = 0  #统计总词数
f = open("process_1.txt", "r", encoding="utf-8")
for line in f:
    words = line.split(" ")
    for word in words:
        num_words += 1  #计算总词数
        if word_dict.get(word):
            word_dict[word] += 1  #计算每个词的出现次数
        else:
            word_dict[word] = 1
for word in word_dict:  #所有词的出现次数统计完后，计算其出现概率=出现次数/总词数
    #为了防止句子过长导致概率相乘的值太小，在此对概率取对数并加符号，从而把相乘的最大值变成相加的最小值
     word_dict[word] = -math.log(word_dict[word] / num_words)

In [102]:

def max_pro(sentence):
    dp = [9999] * len(sentence)  # 到每个字为止的最大概率
    root = [0] * len(sentence)  # 每个字所在词的起始下标
    max_len = 0
    
    for i in word_dict.keys():
        max_len=max(max_len,len(i))##获得最大长度

    for i in range(len(sentence)):
        findflag = 0
        for j in range(i, i+max_len):
            if j < len(sentence):  #对每个字，尝试以其为开头的所有词
                word = sentence[i: j+1]
                if word in word_dict.keys():
                    findflag = 1
                    temp_pro = word_dict[word]
                    if i > 0:
                        temp_pro += dp[i-1]
                    if temp_pro < dp[j]:  #选择保留概率最大的词
                        dp[j] = temp_pro
                        root[j] = i
            else:
                break
        if (findflag == 0) and (dp[i] == 9999):  #如果以这个字开头的所有词都不在词典中，并且这个字也不在其他词中，则需要单独处理
            dp[i] = dp[i - 1] + 20  
            root[i] = i  #单字为词，并且代价较大
            
    # 输出结果
    result = []
    word_tail = len(sentence) - 1
    while word_tail >= 0:  #从词尾开始向前不断寻找最优解
        result.append(sentence[root[word_tail]: word_tail + 1])
        word_tail = root[word_tail] - 1
    result.reverse()
    return dp, root, result

In [137]:
%%time
inf=open("test_endfile.txt",'r',encoding='utf-8')
outf=open("最大概率分词.txt",'w',encoding='utf-8') #储存分词结果

for line in inf.readlines():
    dp, root, result = max_pro(line)
    for word in result:
        outf.write(word+"/")
        
outf.close()
inf.close()

Wall time: 48.8 s


## 基于HMM的分词法

In [105]:
import pandas as pd
import codecs
from numpy import *
import numpy as np
import sys
import re
STATES = ['B', 'M', 'E', 'S']
array_A = {}    #状态转移概率矩阵
array_B = {}    #发射概率矩阵
array_E = {}    #测试集存在的字符，但在训练集中不存在，发射概率矩阵
array_Pi = {}   #初始状态分布
word_set = set()    #训练数据集中所有字的集合
count_dic = {}  #‘B,M,E,S’每个状态在训练集中出现的次数
line_num = 0    #训练集语句数量

In [106]:
#初始化所有概率矩阵
def Init_Array():
    for state0 in STATES:
        array_A[state0] = {}
        for state1 in STATES:
            array_A[state0][state1] = 0.0
    for state in STATES:
        array_Pi[state] = 0.0
        array_B[state] = {}
        array_E = {}
        count_dic[state] = 0

#对训练集获取状态标签
def get_tag(word):
    tag = []
    if len(word) == 1:
        tag = ['S']
    elif len(word) == 2:
        tag = ['B', 'E']
    else:
        num = len(word) - 2
        tag.append('B')
        tag.extend(['M'] * num)
        tag.append('E')
    return tag


#将参数估计的概率取对数，对概率0取无穷小-3.14e+100
def Prob_Array():
    for key in array_Pi:
        if array_Pi[key] == 0:
            array_Pi[key] = -3.14e+100
        else:
            array_Pi[key] = log(array_Pi[key] / line_num)
    for key0 in array_A:
        for key1 in array_A[key0]:
            if array_A[key0][key1] == 0.0:
                array_A[key0][key1] = -3.14e+100
            else:
                array_A[key0][key1] = log(array_A[key0][key1] / count_dic[key0])
    for key in array_B:
        for word in array_B[key]:
            if array_B[key][word] == 0.0:
                array_B[key][word] = -3.14e+100
            else:
                array_B[key][word] = log(array_B[key][word] /count_dic[key])

In [107]:
#将字典转换成数组
def Dic_Array(array_b):
    tmp = np.empty((4,len(array_b['B'])))
    for i in range(4):
        for j in range(len(array_b['B'])):
            tmp[i][j] = array_b[STATES[i]][list(word_set)[j]]
    return tmp

#判断一个字最大发射概率的状态
def dist_tag():
    array_E['B']['begin'] = 0
    array_E['M']['begin'] = -3.14e+100
    array_E['E']['begin'] = -3.14e+100
    array_E['S']['begin'] = -3.14e+100
    array_E['B']['end'] = -3.14e+100
    array_E['M']['end'] = -3.14e+100
    array_E['E']['end'] = 0
    array_E['S']['end'] = -3.14e+100

def dist_word(word0,word1,word2,array_b):
    if dist_tag(word0,array_b) == 'S':
        array_E['B'][word1] = 0
        array_E['M'][word1] = -3.14e+100
        array_E['E'][word1] = -3.14e+100
        array_E['S'][word1] = -3.14e+100
    return

In [108]:
#Viterbi算法求测试集的最优状态序列，之后再按照状态序列进行分词
def Viterbi(sentence,array_pi,array_a,array_b):
    tab = [{}]  #动态规划表
    path = {}

    if sentence[0] not in array_b['B']:
        for state in STATES:
            if state == 'S':
                array_b[state][sentence[0]] = 0
            else:
                array_b[state][sentence[0]] = -3.14e+100

    for state in STATES:
        tab[0][state] = array_pi[state] + array_b[state][sentence[0]]
        #tab[t][state]表示时刻t到达state状态的所有路径中，概率最大路径的概率值
        path[state] = [state]
    for i in range(1,len(sentence)):
        tab.append({})
        new_path = {}
        for state in STATES:
            if state == 'B':
                array_b[state]['begin'] = 0
            else:
                array_b[state]['begin'] = -3.14e+100
        for state in STATES:
            if state == 'E':
                array_b[state]['end'] = 0
            else:
                array_b[state]['end'] = -3.14e+100
        for state0 in STATES:
            items = []
            for state1 in STATES:
                if sentence[i] not in array_b[state0]:  #所有在测试集出现但没有在训练集中出现的字符
                    if sentence[i-1] not in array_b[state0]:
                        prob = tab[i - 1][state1] + array_a[state1][state0] + array_b[state0]['end']
                    else:
                        prob = tab[i - 1][state1] + array_a[state1][state0] + array_b[state0]['begin']
                else:
                    prob = tab[i-1][state1] + array_a[state1][state0] + array_b[state0][sentence[i]]    #计算每个字符对应STATES的概率
                items.append((prob,state1))
            best = max(items)   #bset:(prob,state)保留概率最大的
            tab[i][state0] = best[0]
            new_path[state0] = path[best[1]] + [state0]
        path = new_path

    prob, state = max([(tab[len(sentence) - 1][state], state) for state in STATES])
    return path[state]

In [109]:
#根据状态序列进行分词
def tag_seg(sentence,tag):
    word_list = []
    start = -1
    started = False

    if len(tag) != len(sentence):
        return None

    if len(tag) == 1:
        word_list.append(sentence[0])   #语句只有一个字，直接输出

    else:
        if tag[-1] == 'B' or tag[-1] == 'M':    #最后一个字状态不是'S'或'E'则修改
            if tag[-2] == 'B' or tag[-2] == 'M':
                tag[-1] = 'S'
            else:
                tag[-1] = 'E'


        for i in range(len(tag)):
            if tag[i] == 'S':
                if started:
                    started = False
                    word_list.append(sentence[start:i])
                word_list.append(sentence[i])
            elif tag[i] == 'B':
                if started:
                    word_list.append(sentence[start:i])
                start = i
                started = True
            elif tag[i] == 'E':
                started = False
                word = sentence[start:i + 1]
                word_list.append(word)
            elif tag[i] == 'M':
                continue

    return word_list


In [138]:
%%time
trainset = open('process_1.txt', encoding='utf-8')     #读取训练集

Init_Array()

for line in trainset:  #对每行分别处理，添加状态序列、词典、状态转移概率和发射概率
    line = line.strip()
    line_num += 1

    word_list = []
    for k in range(len(line)):
        if line[k] == ' ':continue
        word_list.append(line[k])
    if len(word_list) == 0:
        continue
    word_set = word_set | set(word_list)    #训练集所有字的集合

    line = line.split()
    line_state = []     #这句话的状态序列

    for i in line:
        line_state.extend(get_tag(i)) #获得并添加状态序列
    array_Pi[line_state[0]] += 1  # array_Pi用于计算初始状态分布概率

    for j in range(len(line_state)-1):
        array_A[line_state[j]][line_state[j+1]] += 1  #array_A计算状态转移概率

    for p in range(len(line_state)):
        count_dic[line_state[p]] += 1  # 记录每一个状态的出现次数
        
        for state in STATES:
            if word_list[p] not in array_B[state]:
                array_B[state][word_list[p]] = 0.0  #保证每个字都在STATES的字典中
        array_B[line_state[p]][word_list[p]] += 1  # array_B用于计算发射概率

Prob_Array()    #对概率取对数保证精度


testset = open('test_endfile.txt', encoding='utf-8')       #读取测试集
outputfile = open('HMM分词.txt', mode='w', encoding='utf-8')
output = ''

for line in testset:
    line = line.strip()
    if len(line) == 0:
        continue
    tag = Viterbi(line, array_Pi, array_A, array_B)
    seg = tag_seg(line, tag)
    list = ''
    for i in range(len(seg)):
        list = list + seg[i] + '/'
    output = output + list + '\n'
    outputfile.write(list + '\n')
testset.close()
outputfile.close()

Wall time: 18.1 s
