# 作业一：实现HMM中文分词和BPE英文分词
姓名： 黄奔皓
学号： 521030910073

## 任务一：HMM模型用于中文分词

任务一评分标准：
1. 共有8处TODO需要填写，每个TODO计1-2分，共9分，预计代码量30行；
2. 允许自行修改、编写代码完成，对于该情况，请补充注释以便于评分，否则结果不正确将导致较多的扣分；
3. 用于说明实验的文字和总结不额外计分，但不写会导致扣分。

注：本任务仅在短句子上进行效果测试，因此对概率的计算可直接进行连乘。在实践中，常先对概率取对数，将连乘变为加法来计算，以避免出现数值溢出的情况。

导入HMM参数，初始化所需的起始概率矩阵，转移概率矩阵，发射概率矩阵

In [20]:
import pickle
import numpy as np

In [21]:
with open("hmm_parameters.pkl", "rb") as f:
    hmm_parameters = pickle.load(f)

# 非断字（B）为第0行，断字（I）为第1行
# 发射概率矩阵中，词典大小为65536，以汉字的ord作为行key
start_probability = hmm_parameters["start_prob"]  # shape(2,)
trans_matrix = hmm_parameters["trans_mat"]  # shape(2, 2)
emission_matrix = hmm_parameters["emission_mat"]  # shape(2, 65536)

定义待处理的句子

In [22]:
# TODO: 将input_sentence中的xxx替换为你的姓名（1分）
input_sentence = "黄奔皓是一名优秀的学生"

实现viterbi算法，并以此进行中文分词

In [23]:
def viterbi(sent_orig, start_prob, trans_mat, emission_mat):
    """
    viterbi算法进行中文分词

    Args:
        sent_orig: str - 输入的句子
        start_prob: numpy.ndarray - 起始概率矩阵
        trans_mat: numpy.ndarray - 转移概率矩阵
        emission_mat: numpy.ndarray - 发射概率矩阵

    Return:
        str - 中文分词的结果
    """

    #  将汉字转为数字表示
    sent_ord = [ord(x) for x in sent_orig]

    # `dp`用来储存不同位置每种标注（B/I）的最大概率值
    dp = np.zeros((2, len(sent_ord)), dtype=float)

    # `path`用来储存最大概率对应的上步B/I选择
    #  例如 path[1][7] == 1 意味着第8个（从1开始计数）字符标注I对应的最大概率，其前一步的隐状态为1（I）
    #  例如 path[0][5] == 1 意味着第6个字符标注B对应的最大概率，其前一步的隐状态为1（I）
    #  例如 path[1][1] == 0 意味着第2个字符标注I对应的最大概率，其前一步的隐状态为0（B）
    path = np.zeros((2, len(sent_ord)), dtype=int)

    #  TODO: 第一个位置的最大概率值计算（1分）
    dp[0][0] = start_prob[0] * emission_mat[0][sent_ord[0]]
    dp[1][0] = start_prob[1] * emission_mat[1][sent_ord[1]]
    signs = [0,1]
    #  TODO: 其余位置的最大概率值计算（填充dp和path矩阵）（2分）
    for i in range(1, len(sent_ord)): # 词
        for label in signs: # 列
            value = []
            for pre_label in signs: # 上一列，找最大值
                value.append(dp[pre_label][i - 1] * trans_mat[pre_label][label] * emission_mat[label][sent_ord[i]])
            max_id = np.argmax(value)
            path[label][i] = max_id
            dp[label][i] = value[max_id]
        

    #  `labels`用来储存每个位置最有可能的隐状态
    labels = [0 for _ in range(len(sent_ord))]

    #  TODO: 计算labels每个位置上的值（填充labels矩阵）（1分）
    for p in range(len(labels)-1,0,-1):
        if p == len(labels) - 1:
            max_label = np.argmax(dp[:,p]) # 最后一项的label由概率直接选
            labels[p] = max_label
        else:
            previous_label = path[labels[p + 1]][p + 1]
            labels[p] = previous_label

    #  根据lalels生成切分好的字符串
    sent_split = []
    for idx, label in enumerate(labels):
        if label == 1:
            sent_split += [sent_ord[idx], ord("/")]
        else:
            sent_split += [sent_ord[idx]]
    sent_split_str = "".join([chr(x) for x in sent_split])

    return sent_split_str

In [24]:
print("viterbi算法分词结果：", viterbi(input_sentence, start_probability, trans_matrix, emission_matrix))

viterbi算法分词结果： 黄奔/皓是/一名/优秀/的/学生/


实现前向算法，计算该句子的概率值

In [25]:
def compute_prob_by_forward(sent_orig, start_prob, trans_mat, emission_mat):
    """
    前向算法，计算输入中文句子的概率值

    Args:
        sent_orig: str - 输入的句子
        start_prob: numpy.ndarray - 起始概率矩阵
        trans_mat: numpy.ndarray - 转移概率矩阵
        emission_mat: numpy.ndarray - 发射概率矩阵

    Return:
        float - 概率值
    """

    #  将汉字转为数字表示
    sent_ord = [ord(x) for x in sent_orig]

    # `dp`用来储存不同位置每种隐状态（B/I）下，到该位置为止的句子的概率
    dp = np.zeros((2, len(sent_ord)), dtype=float)

    # TODO: 初始位置概率的计算（1分）
    dp[0][0] = start_prob[0] * emission_mat[0][sent_ord[0]]
    dp[1][0] = start_prob[1] * emission_mat[1][sent_ord[0]]

    # TODO: 先计算其余位置的概率（填充dp矩阵），然后return概率值（1分）
    signs = [0, 1]
    for i in range(1, len(sent_ord)): # 词
        for label in signs: # 列
            value = 0
            for pre_label in signs: # 上一列，找最大值
                value += (dp[pre_label][i - 1] * trans_mat[pre_label][label] * emission_mat[label][sent_ord[i]])
            dp[label][i] = value
    return sum([dp[i][len(sent_ord) - 1] for i in range(2)])

实现后向算法，计算该句子的概率值

In [26]:
def compute_prob_by_backward(sent_orig, start_prob, trans_mat, emission_mat):
    """
    后向算法，计算输入中文句子的概率值

    Args:
        sent_orig: str - 输入的句子
        start_prob: numpy.ndarray - 起始概率矩阵
        trans_mat: numpy.ndarray - 转移概率矩阵
        emission_mat: numpy.ndarray - 发射概率矩阵

    Return:
        float - 概率值
    """

    #  将汉字转为数字表示
    sent_ord = [ord(x) for x in sent_orig]

    # `dp`用来储存不同位置每种隐状态（B/I）下，从结尾到该位置为止的句子的概率
    dp = np.zeros((2, len(sent_ord)), dtype=float)

    # TODO: 终末位置概率的初始化（1分）
    dp[0][len(sent_ord) - 1] = 1
    dp[1][len(sent_ord) - 1] = 1

    # TODO: 先计算其余位置的概率（填充dp矩阵），然后return概率值（1分）
    signs = [0, 1]
    for i in range(len(sent_ord) - 1, 0, -1): # 词
        for label in signs: # 列
            value = 0
            for pre_label in signs: # 上一列，找最大值
                value += (dp[pre_label][i] * trans_mat[label][pre_label] * emission_mat[pre_label][sent_ord[i]])
            dp[label][i - 1] = value

    return sum([dp[i][0] * start_prob[i] * emission_mat[i][sent_ord[0]] for i in range(2)])

In [27]:
print("前向算法概率：", compute_prob_by_forward(input_sentence, start_probability, trans_matrix, emission_matrix))
print("后向算法概率：", compute_prob_by_backward(input_sentence, start_probability, trans_matrix, emission_matrix))

前向算法概率： 8.380526777122497e-36
后向算法概率： 8.380526777122501e-36


# 实验总结

在此次试验中，我实现了HMM的viterbi算法，用于中文分词；并实现了前向算法和后向算法，计算了某句子生成的概率值，复习回顾了林老师教授的知识，能做到掌握和应用。实验过程中，我的难点在于理解动态规划和ppt上转移方程的对应关系，这需要耐心细致。

## viterbi算法流程
1. 初始化句子第一个词不同label的概率
2. 对下一个词的所有可能标签(CD为例)，根据 $\max _{l e f t}(\text { left } \cdot P(\mathrm{CD} \mid \text { left_tag }) P(\mathrm{Jobs} \mid \mathrm{CD}))$ 进行计算。
3. 重复步骤2，直到最后一个单词
4. 在最后一个单词各可能label的概率选出最大的一个label，进而进行回溯。

## 前向算法
前向算法和viterbi算法的流程比，省去了回溯的过程，且状态转移方程变为求某个单词的所有可能label的数值和。掌握viterbi算法后，前向算法并不难实现。

## 后向算法
后向算法从句子末端的单词开始进行计算，最后与前向算法所得结果数值上基本一致。

## 任务二：BPE算法用于英文分词

任务二评分标准：

1. 共有7处TODO需要填写，每个TODO计1-2分，共9分，预计代码量50行；
2. 允许自行修改、编写代码完成，对于该情况，请补充注释以便于评分，否则结果不正确将导致较多的扣分；
3. 用于说明实验的文字和总结不额外计分，但不写会导致扣分。

构建空格分词器，将语料中的句子以空格切分成单词，然后将单词拆分成字母加`</w>`的形式。例如`apple`将变为`a p p l e </w>`。

In [28]:
import re
import functools

In [29]:
_splitor_pattern = re.compile(r"[^a-zA-Z']+|(?=')")
_digit_pattern = re.compile(r"\d+")


def white_space_tokenize(corpus):
    """
    先正则化（字母转小写、数字转为N、除去标点符号），然后以空格分词语料中的句子，例如：
    输入 corpus=["I am happy.", "I have 10 apples!"]，
    得到 [["i", "am", "happy"], ["i", "have", "N", "apples"]]

    Args:
        corpus: List[str] - 待处理的语料

    Return:
        List[List[str]] - 二维List，内部的List由每个句子的单词str构成
    """

    tokeneds = [list(filter(lambda tkn: len(tkn) > 0, _splitor_pattern.split(_digit_pattern.sub("N", stc.lower())))) for stc in corpus]

    return tokeneds

编写相应函数构建BPE算法需要用到的初始状态词典

In [30]:
def build_bpe_vocab(corpus):
    """
    将语料进行white_space_tokenize处理后，将单词每个字母以空格隔开、结尾加上</w>后，构建带频数的字典，例如：
    输入 corpus=["I am happy.", "I have 10 apples!"]，
    得到
    {
        'i </w>': 2,
        'a m </w>': 1,
        'h a p p y </w>': 1,
        'h a v e </w>': 1,
        'N </w>': 1,
        'a p p l e s </w>': 1
     }

    Args:
        corpus: List[str] - 待处理的语料

    Return:
        Dict[str, int] - "单词分词状态->频数"的词典
    """

    tokenized_corpus = white_space_tokenize(corpus)

    bpe_vocab = dict()

    # TODO: 完成函数体（1分）
    for sentence in tokenized_corpus:
        for item in sentence:
            item = " ".join(item) + ' </w>'
            if item in bpe_vocab:
                bpe_vocab[item] += 1
            else:
                bpe_vocab[item] = 1

    return bpe_vocab

编写所需的其他函数

In [31]:
def get_bigram_freq(bpe_vocab):
    """
    统计"单词分词状态->频数"的词典中，各bigram的频次（假设该词典中，各个unigram以空格间隔），例如：
    输入 bpe_vocab=
    {
        'i </w>': 2,
        'a m </w>': 1,
        'h a p p y </w>': 1,
        'h a v e </w>': 1,
        'N </w>': 1,
        'a p p l e s </w>': 1
    }
    得到
    {
        ('i', '</w>'): 2,
        ('a', 'm'): 1,
        ('m', '</w>'): 1,
        ('h', 'a'): 2,
        ('a', 'p'): 2,
        ('p', 'p'): 2,
        ('p', 'y'): 1,
        ('y', '</w>'): 1,
        ('a', 'v'): 1,
        ('v', 'e'): 1,
        ('e', '</w>'): 1,
        ('N', '</w>'): 1,
        ('p', 'l'): 1,
        ('l', 'e'): 1,
        ('e', 's'): 1,
        ('s', '</w>'): 1
    }

    Args:
        bpe_vocab: Dict[str, int] - "单词分词状态->频数"的词典

    Return:
        Dict[Tuple(str, str), int] - "bigram->频数"的词典
    """

    bigram_freq = dict()

    # TODO: 完成函数体（1分）
    for key in bpe_vocab.keys():
        lst = key.split(' ')
        for i in range(len(lst) - 1):
            t = (lst[i],lst[i + 1])
            if t in bigram_freq.keys():
                bigram_freq[t] += bpe_vocab[key]
            else:
                bigram_freq[t] = bpe_vocab[key]

    return bigram_freq

In [32]:
def refresh_bpe_vocab_by_merging_bigram(bigram, old_bpe_vocab):
    """
    在"单词分词状态->频数"的词典中，合并指定的bigram（即去掉对应的相邻unigram之间的空格），最后返回新的词典，例如：
    输入 bigram=('i', '</w>')，old_bpe_vocab=
    {
        'i </w>': 2,
        'a m </w>': 1,
        'h a p p y </w>': 1,
        'h a v e </w>': 1,
        'N </w>': 1,
        'a p p l e s </w>': 1
    }
    得到
    {
        'i</w>': 2,
        'a m </w>': 1,
        'h a p p y </w>': 1,
        'h a v e </w>': 1,
        'N </w>': 1,
        'a p p l e s </w>': 1
    }

    Args:
        old_bpe_vocab: Dict[str, int] - 初始"单词分词状态->频数"的词典

    Return:
        Dict[str, int] - 合并后的"单词分词状态->频数"的词典
    """    

    # TODO: 完成函数体（1分）
    new_bpe_vocab = dict()
    for key in old_bpe_vocab.keys():
        lst = key.split(' ')
        record = []
        for i in range(len(lst) - 1):
            if lst[i] == bigram[0]:
                if lst[i + 1] == bigram[1]:
                    record.append(i)
        new_key = ''
        for k in range(len(lst)):
            if k in record:
                new_key += lst[k]
            else:
                if k == len(lst) - 1:
                    new_key += (lst[k])
                else:
                    new_key += (lst[k] + ' ')
        new_bpe_vocab[new_key] = old_bpe_vocab[key]

    return new_bpe_vocab

print(refresh_bpe_vocab_by_merging_bigram(bigram=('i', 'b'), old_bpe_vocab=
    {
        'i ba </w>': 2, # 出现ba是有可能的，b,a bi-gram合并但是单个b也可以保留，这个例子如果直接替换不太对
        'a m i </w>': 1,
        'h a p p y </w>': 1,
        'h a v e </w>': 1,
        'N </w>': 1,
        'a p p l e s </w>': 1
    }))

In [33]:
def sort_by_len(token):
    '''
    used for sort key
    '''
    if '</w>' in token[0]:
        return len(token[0]) - 3 # '</w>' should be treated as length 1
    else:
        return len(token[0])

def get_bpe_tokens(bpe_vocab):
    """
    根据"单词分词状态->频数"的词典，返回所得到的BPE分词列表，并将该列表按照分词长度降序排序返回，例如：
    输入 bpe_vocab=
    {
        'i</w>': 2,
        'a m </w>': 1,
        'ha pp y </w>': 1,
        'ha v e </w>': 1,
        'N </w>': 1,
        'a pp l e s </w>': 1
    }
    得到
    [
        ('i</w>', 2),
        ('ha', 2),
        ('pp', 2),
        ('a', 2),
        ('m', 1),
        ('</w>', 5),
        ('y', 1),
        ('v', 1),
        ('e', 2),
        ('N', 1),
        ('l', 1),
        ('s', 1)
     ]

    Args:
        bpe_vocab: Dict[str, int] - "单词分词状态->频数"的词典

    Return:
        List[Tuple(str, int)] - BPE分词和对应频数组成的List
    """

    # TODO: 完成函数体（2分）
    tmp_dict = dict()
    for key in bpe_vocab.keys():
        lst = key.split(' ')
        for token in lst:
            if token in tmp_dict.keys():
                tmp_dict[token] += bpe_vocab[key]
            else:
                tmp_dict[token] = bpe_vocab[key]
    bpe_tokens = []
    for key in tmp_dict.keys():
        bpe_tokens.append((key,tmp_dict[key]))
    bpe_tokens.sort(key=sort_by_len,reverse=True)
    
    
    return bpe_tokens


In [34]:
def print_bpe_tokenize(word, bpe_tokens):
    """
    根据按长度降序的BPE分词列表，将所给单词进行BPE分词，最后打印结果。
    
    思想是，对于一个待BPE分词的单词，按照长度顺序从列表中寻找BPE分词进行子串匹配，
    若成功匹配，则对该子串左右的剩余部分递归地进行下一轮匹配，直到剩余部分长度为0，
    或者剩余部分无法匹配（该部分整体由"<unknown>"代替）
    
    例1：
    输入 word="supermarket", bpe_tokens=[
        ("su", 20),
        ("are", 10),
        ("per", 30),
    ]
    最终打印 "su per <unknown>"

    例2：
    输入 word="shanghai", bpe_tokens=[
        ("hai", 1),
        ("sh", 1),
        ("an", 1),
        ("</w>", 1),
        ("g", 1)
    ]
    最终打印 "sh an g hai </w>"

    Args:
        word: str - 待分词的单词str
        bpe_tokens: List[Tuple(str, int)] - BPE分词和对应频数组成的List
    """

    # TODO: 请尝试使用递归函数定义该分词过程（2分）
    def bpe_tokenize(sub_word):
        if len(sub_word) == 0:
            return ''
        for token in bpe_tokens:
            token = token[0]
            i = sub_word.find(token)
            if i == -1:
                continue
            else:
                left = sub_word[:i]
                right = sub_word[i + len(token):]
                left_res = bpe_tokenize(left)
                right_res = bpe_tokenize(right)
                if left_res != '':
                    left_res = left_res + ' '
                if right_res != '':
                    right_res = ' ' + right_res 
                res = left_res + token  + right_res
                return res
        return "<unknown>"

    res = bpe_tokenize(word + "</w>")
    print(res)
    return res

开始读取数据集并训练BPE分词器

In [35]:
with open("data/news.2007.en.shuffled.deduped.train", encoding="utf-8") as f:
    training_corpus = list(map(lambda l: l.strip(), f.readlines()[:1000]))

print("Loaded training corpus.")

Loaded training corpus.


In [36]:
def sort_by_freq(item):
    '''
    sort bi-gram freq
    '''
    return item[1]

from tqdm import tqdm
training_iter_num = 300
training_bpe_vocab = build_bpe_vocab(training_corpus)
for i in tqdm(range(training_iter_num),total=training_iter_num):
    # TODO: 完成训练循环内的代码逻辑（2分）
    bigram_freq = get_bigram_freq(training_bpe_vocab)

    bigram_freq_items = list(bigram_freq.items())
    bigram_freq_items.sort(key = sort_by_freq,reverse = True)
    most_freq_item = bigram_freq_items[0]
    training_bpe_vocab = refresh_bpe_vocab_by_merging_bigram(bigram=most_freq_item[0],old_bpe_vocab=training_bpe_vocab)
training_bpe_tokens = get_bpe_tokens(training_bpe_vocab)

100%|██████████| 300/300 [00:05<00:00, 50.49it/s]


测试BPE分词器的分词效果

In [37]:
test_word = "naturallanguageprocessing"

print("naturallanguageprocessing 的分词结果为：")
print_bpe_tokenize(test_word, training_bpe_tokens)
for item in training_bpe_tokens:
    if item[0] == 'w':
        print(item)

naturallanguageprocessing 的分词结果为：


'n a tur all an g u ag e pro c es s ing</w>'

In [None]:
training_iter_nums = [i for i in range(500,10000,500)]
res = []
for training_iter_num in training_iter_nums:
    training_bpe_vocab = build_bpe_vocab(training_corpus)
    for i in tqdm(range(training_iter_num),total=training_iter_num):
        # TODO: 完成训练循环内的代码逻辑（2分）
        bigram_freq = get_bigram_freq(training_bpe_vocab)
    
        bigram_freq_items = list(bigram_freq.items())
        bigram_freq_items.sort(key = sort_by_freq,reverse = True)
        most_freq_item = bigram_freq_items[0]
        training_bpe_vocab = refresh_bpe_vocab_by_merging_bigram(bigram=most_freq_item[0],old_bpe_vocab=training_bpe_vocab)
    training_bpe_tokens = get_bpe_tokens(training_bpe_vocab)
    res.append(print_bpe_tokenize(test_word, training_bpe_tokens))
np.save("res.npy",res)

100%|██████████| 500/500 [00:09<00:00, 54.38it/s]
100%|██████████| 1000/1000 [00:16<00:00, 60.81it/s]
100%|██████████| 1500/1500 [00:22<00:00, 65.58it/s]
100%|██████████| 2000/2000 [00:29<00:00, 68.18it/s]
100%|██████████| 2500/2500 [00:35<00:00, 70.20it/s]
 23%|██▎       | 687/3000 [00:17<00:35, 64.86it/s]

# 实验总结
在该实验中，我初步完成了BPE分词。其算法流程如下：
1. 初始词汇表：将语料中的单词和其频数提取出来
2. 迭代：
（1）从词汇表中计算排序得到频数最高的bi-gram，将之捏合为新的token
（2）将原词汇表中的这个bi-gram替换为新的token，得到新的词汇表

3. 终止：迭代达到预先设定的最大次数时停止

经过实验发现，分词的效果和迭代次数有很大关系. 迭代次数并非越多越好：
* 300iter: n a tur all an g u ag e pro c es s ing</w>，迭代次数仍不足，分词中仍出现大量单字母。
* 5000iter: n a tur al langu ag e pro c es sing</w>，迭代次数适中，分词结果较好。
* 10000iter: <unknown> t ur <unknown> ang <unknown> e <unknown> o c es s ing</w>，迭代次数过多，合并的bi-gram过多，导致出现 <unknown>

因此，想要获得好的效果，用一个合适的training iteration num来训练BPE分词器非常重要。