# 最大熵模型中文分词

In [1]:
import sys
import re
import logging

logging.basicConfig(format='%(asctime)s | %(levelname)s : %(message)s', level=logging.INFO, stream=sys.stdout)
logger = logging.getLogger(__name__)
logger.info('Hello world!')

## 给句子中的每个字符打标签

In [2]:
def tag4_word(word):
    """
    tag4给词语中的每个字符打标签
    """
    tag_word = ''
    if len(word) == 0:
        return ''
    
    if len(word) == 1:
        tag_word = word + '/S'
    elif len(word) == 2:
        tag_word = word[0] + '/B' + word[1] + '/E'
    else:
        tag_word = word[0] + '/B'
        for char in word[1:-1]:
            tag_word = tag_word + char + '/M'
        tag_word = tag_word + word[-1] + '/E'
    return tag_word

def tag4_sentence(sentence):
    """
    tag4给句子中的每个字符打标签
    sentence: 空格分隔个句子。e.g. "我 爱 伟大 的 中国 。"
    """
    words = re.split('\s+', sentence)
    
    tag_word_list = []
    for word in words:
        tag_word = tag4_word(word)
        tag_word_list.append(tag_word)
    return ''.join(tag_word_list)

def detag4_sentence(sentence):
    """
    对tag4打标签的句子进行反解码，输出按空格分隔的句子
    """
    if len(sentence) == 0:
        return ''
    
    sen = sentence.replace('/S', ' ').replace('/E', ' ').replace('/B', '').replace('/M', '')
    return sen

In [3]:
def tag4_data(input_file, output_file):
    """
    对读入文件中的句子打tag4标签，并写出文件
    """
    with open(input_file, 'r') as f:
        train_data = f.readlines()
        train_data_row_num = len(train_data)
        with open(output_file, 'w+') as f:
            f.truncate()
            for row, sentence in enumerate(train_data):
                tag4_sent = tag4_sentence(sentence)
                f.write(tag4_sent + '\n')
                logger.info('写入tag4标签[%d/%d]. [sentence]=%s, [tag4-sentence]=%s' % (row+1, train_data_row_num, sentence, tag4_sent))
        logger.info('成功将tag4标签写入文件 ')
                
tag4_data('../data/pku_training.txt', '../data/pku_tag4_training.txt')

## 定义特征函数

In [4]:
def get_char(sentence, i, steps=3):
    """
    获取sentence在位置i处的字符
    """
    words_len = len(sentence) / steps;
    if (i < 0 or i > words_len - 1):
        return '_'
    else:
        return sentence[i*steps]
    
def get_feature1(sentence, i, steps=3):
    """
    特征1：Cn(n=-2,-1,0,1,2)
    """
    words = []
    words.append('C-2=' + get_char(sentence, i - 2, steps))
    words.append('C-1=' + get_char(sentence, i - 1, steps))
    words.append('C0=' + get_char(sentence, i, steps))
    words.append('C1=' + get_char(sentence, i + 1, steps))
    words.append('C2=' + get_char(sentence, i + 2, steps))
    return ''.join(words)

def feature_sentence(sentence, steps=3):
    features = []
    for i in range(len(sentence)//steps):
        # 特征1
        feat = get_feature1(sentence, i, steps)
        # 标签
        label = sentence[i*steps+2]
        if len(feat) > 0:
            features.append((feat, label))
    return features

feature_sentence('是/S中/B国/E发/B展/E历/B史/E上/S非/B常/E重/B要/E的/S', 3)

[('C-2=_C-1=_C0=是C1=中C2=国', 'S'),
 ('C-2=_C-1=是C0=中C1=国C2=发', 'B'),
 ('C-2=是C-1=中C0=国C1=发C2=展', 'E'),
 ('C-2=中C-1=国C0=发C1=展C2=历', 'B'),
 ('C-2=国C-1=发C0=展C1=历C2=史', 'E'),
 ('C-2=发C-1=展C0=历C1=史C2=上', 'B'),
 ('C-2=展C-1=历C0=史C1=上C2=非', 'E'),
 ('C-2=历C-1=史C0=上C1=非C2=常', 'S'),
 ('C-2=史C-1=上C0=非C1=常C2=重', 'B'),
 ('C-2=上C-1=非C0=常C1=重C2=要', 'E'),
 ('C-2=非C-1=常C0=重C1=要C2=的', 'B'),
 ('C-2=常C-1=重C0=要C1=的C2=_', 'E'),
 ('C-2=重C-1=要C0=的C1=_C2=_', 'S')]

In [5]:
def feature_data(input_file, output_file):
    steps = 3
    with open(input_file, 'r') as fin:
        with open(output_file, 'w+') as fout:
            fout.truncate()
            for row in fin.readlines():
                row = row.strip()
                # 提取特征
                features = feature_sentence(row, steps)
                if len(features) == 0: continue
                feature_str = '\n'.join([fea[0] + ' ' + fea[1] for fea in features])
                # 写入文本
                fout.write(feature_str)
                logger.info('写入特征. [feature]=%s'% feature_str)
        logger.info('成功将特征写入文件')

feature_data('../data/pku_tag4_training.txt', '../data/pku_feature_training.txt')

## 训练最大熵模型

In [6]:
from nltk.classify import MaxentClassifier
import pickle

In [None]:
def parse_tag4_to_joint_feature(string):
    return ({'C-2': string[4], 'C-1': string[9], 'C0': string[13], 'C1': string[17], 'C2': string[21]}, string[-1])

def parse_tag4_to_feature(string):
    return {'C-2': string[4], 'C-1': string[9], 'C0': string[13], 'C1': string[17], 'C2': string[21]}

# 载入训练数据
with open('../data/pku_feature_training.txt', 'r') as f:
    train_features = [parse_tag4_to_joint_feature(row.strip()) for row in f.readlines()]
train_features

[({'C-1': '_', 'C-2': '_', 'C0': '迈', 'C1': '向', 'C2': '充'}, 'B'),
 ({'C-1': '迈', 'C-2': '_', 'C0': '向', 'C1': '充', 'C2': '满'}, 'E'),
 ({'C-1': '向', 'C-2': '迈', 'C0': '充', 'C1': '满', 'C2': '希'}, 'B'),
 ({'C-1': '充', 'C-2': '向', 'C0': '满', 'C1': '希', 'C2': '望'}, 'E'),
 ({'C-1': '满', 'C-2': '充', 'C0': '希', 'C1': '望', 'C2': '的'}, 'B'),
 ({'C-1': '希', 'C-2': '满', 'C0': '望', 'C1': '的', 'C2': '新'}, 'E'),
 ({'C-1': '望', 'C-2': '希', 'C0': '的', 'C1': '新', 'C2': '世'}, 'S'),
 ({'C-1': '的', 'C-2': '望', 'C0': '新', 'C1': '世', 'C2': '纪'}, 'S'),
 ({'C-1': '新', 'C-2': '的', 'C0': '世', 'C1': '纪', 'C2': '—'}, 'B'),
 ({'C-1': '世', 'C-2': '新', 'C0': '纪', 'C1': '—', 'C2': '—'}, 'E'),
 ({'C-1': '纪', 'C-2': '世', 'C0': '—', 'C1': '—', 'C2': '一'}, 'B'),
 ({'C-1': '—', 'C-2': '纪', 'C0': '—', 'C1': '一', 'C2': '九'}, 'E'),
 ({'C-1': '—', 'C-2': '—', 'C0': '一', 'C1': '九', 'C2': '九'}, 'B'),
 ({'C-1': '一', 'C-2': '—', 'C0': '九', 'C1': '九', 'C2': '八'}, 'M'),
 ({'C-1': '九', 'C-2': '一', 'C0': '九', 'C1': '八', 'C2': '年'}, '

In [None]:
# 训练最大熵模型
maxent_cls = MaxentClassifier.train(train_features)

# 将模型持久化到本地
with open('../data/maxent_cls.model', 'wb') as f:
    pickle.dump(maxent_cls, f)
logger.info('成功将最大熵模型持久化到本地')

In [None]:
maxent_cls.classify({'C-1': '_', 'C-2': '汉', 'C0': '语', 'C1': '然', 'C2': '语'})

## 利用最大熵模型分词

In [None]:
def tokenize_by_maxent(cls, sentence):
    """
    cls: 分类模型
    sentence: 待分词的句子
    """
    # 打标签
    labels = []
    for i in range(len(sentence)):
        feature = parse_tag4_to_feature(get_feature1(sentence, i, 1))
        label = cls.classify(feature)
        labels.append(label)
    
    # 根据每个字符的标签，组合成词
    tokenized_words = []
    for i in range(len(sentence)):
        cha, label = sentence[i], labels[i]
        if label in ['S', 'B']:
            tokenized_words.append(cha)
        elif label in ['M', 'E']:
            tokenized_words[-1] += cha
        else:
             continue
    return tokenized_words

'/'.join(tokenize_by_maxent(maxent_cls, '迈向充满希望的新世纪'))

In [None]:
# 载入测试集
test_data = []
with open('../data/pku_test.txt', 'r') as f:
    test_data = [row.strip() for row in f.readlines()]
    
# 对测试集中的句子进行分词，并写入到文件
with open('../data/pku_test_tokenized.txt', 'w+') as f:
    for sentence in test_data:
        tokenized_words = ' '.join(tokenize_by_maxent(maxent_cls, sentence))
        f.write(tokenized_words)
        logger.info('将分词结果写入文件. [words]=%s' % (tokenized_words))
logger.info('完成写入分词结果')