In [2]:
import nltk
import os
from sklearn.model_selection import ShuffleSplit, KFold, train_test_split
import operator
import numpy as np
from functools import reduce
from sklearn.metrics import roc_curve, auc 
from sklearn import metrics

In [3]:
fname = "./ptb.txt"
with open(fname, 'r', encoding="UTF-8-sig") as f:
    source_tagger = f.read()
# source_tagger.strip("\\ufeff")        #带签名

tagger_words = [nltk.tag.str2tuple(t) for t in source_tagger.split()]

In [4]:
fd = nltk.FreqDist(tagger_words)
fd

FreqDist({(',', ','): 22625, ('the', 'DT'): 19528, ('.', '.'): 18238, ('of', 'IN'): 10752, ('to', 'TO'): 10408, ('a', 'DT'): 9072, ('and', 'CC'): 7679, ('in', 'IN'): 7040, ("'s", 'POS'): 3862, ('for', 'IN'): 3764, ...})

介词在介词短语中是两个实质性结构成分中（介词+名词性词语）的一个，而连词在实质性结构中只起连接作用
 > 当“和”作介词的时候,“和”前后的成分不能互换,前面可加副词作状语,后面可以有停顿
 
 > 当“和”作连词的时候,“和”前后的成分可以互换,前面不能加副词性修饰成分,后面不能停顿.

In 作为介词In-prep, 连词In-conj:
#### 规则：
如果下一次词性标记的前两个字母位 NN， 即后面为名词 , IN为介词

In [5]:
tagger_words
tagger_words_IN = []
NN_dict = {'NN', 'NNP', 'NNPS', 'NNS'}
for index, (word, tag) in enumerate(tagger_words[:-1]):
    if tag == "IN":
        # 如果下一次词性标记的前两个字母位 NN， 即后面为名词 , IN为介词
        if tagger_words[index + 1][1][:2] == 'NN': 
            tagger_words_IN.append( (word, 'IN-prep') )
        # 否则IN为 连词
        else:
            tagger_words_IN.append( (word, 'IN-conj') )
        continue
    tagger_words_IN.append( (word, tag) )

In [6]:
def addStartEnd(tagger_words_list):
    tagger_words_SE = []
    NewSentence = True
    for index, (word, tag) in enumerate(tagger_words_list):
        if NewSentence == True:
            tagger_words_SE.append([])
            tagger_words_SE[-1].append( ("START", "START") )
            NewSentence = False
        tagger_words_SE[-1].append( (tag, word) )                # 注意这里是 （标签， 单词）

        if word == '.' and tag == '.':
            tagger_words_SE[-1].append( ("END", "END") )
            NewSentence = True
    return tagger_words_SE

tagger_words_sents = addStartEnd(tagger_words)
tagger_words_IN_sents = addStartEnd(tagger_words_IN)

In [7]:
# tagger_words_total = sum(tagger_words_sents, [])
tagger_words_total = reduce(operator.add, tagger_words_sents)
tagger_words_IN_total = reduce(operator.add, tagger_words_IN_sents)

词性**tag**是**隐含状态**，单词是**观测**

1. 首先训练得到HMM模型，即Training问题（不过这里的Training不必用BW算法，直接根据语料库，进行数量的统计和概率的计算就可以完成对转移矩阵和发射矩阵的训练)
2. 然后可利用模型，进行词性标注，即Recognition问题，使用**Viterbi算法**即可

某个POS发射出某个单词的概率，即观测(**发射**)概率矩阵B, 大小为$(w, t)$。只是简单粗暴地统计出单词和POS之间的个数关系即可。$$P(w_i|t_i)=\frac {count(w_i,t_i)}{count(t_i)}$$

还需要计算**状态转移矩阵A**, 大小为$(t, t)$，这个只要简单粗暴地计算出由某个状态转移到某个状态的概率即可。
$$P(t_i|t_{i−1})= \frac {count(t_{i−1},t_i)} {count(t_{i−1})}$$

In [8]:
def train_HMM(tagger_words):
    # conditional frequency distribution  
    cfd_tagwords = nltk.ConditionalFreqDist(tagger_words)
    # conditional probability distribution       发射概率
    cpd_tagwords = nltk.ConditionalProbDist(cfd_tagwords, nltk.MLEProbDist)

    treebank_tags = [ tag for (tag, word) in tagger_words ]
    # count(t{i-1} ti)
    # bigram的意思是 前后两个一组，联在一起
    cfd_tags= nltk.ConditionalFreqDist(nltk.bigrams(treebank_tags))
    # P(ti | t{i-1})                            转移概率   
    cpd_tags = nltk.ConditionalProbDist(cfd_tags, nltk.MLEProbDist)
    
    return cpd_tags, cpd_tagwords

# 实现维特比

In [9]:
distinct_tags = set([tag for (tag, word) in tagger_words_total ])
distinct_tags_IN = set([tag for (tag, word) in tagger_words_total ])
# 一共有 47个标签（IN不分开）
len(distinct_tags)

47

In [15]:
# 识别函数，参数列表： 转移概率，发射概率，原句，标签集合，已标记的标签
def recognize_HMM(cpd_tags, cpd_tagwords, sentence, distinct_tags, labeled_tags = [], DEBUG=False):
    viterbi = [ ]           # 维特比链
    backpointer = [ ]       # 回溯器
    
    first_viterbi = { }
    first_backpointer = { }
    for tag in distinct_tags:
        # don't record anything for the START tag
        # Y 是 第一个单词 的每一种可能的Tag
        # P(Y | "START") * P( "第一个单词" | "Y")
        if tag == "START": continue
        first_viterbi[ tag ] = cpd_tags["START"].prob(tag) * cpd_tagwords[tag].prob( sentence[0] )
        first_backpointer[ tag ] = "START"
    viterbi.append(first_viterbi)
    backpointer.append(first_backpointer)
    
    # 这里是 求 (START, END), 因为如果把 "END" 也算入，循环之后取出来的概率就是"END"的Tag（错误)，而不是"END"之前的那个Tag
    for wordindex in range(1, len(sentence) - 1):
        this_viterbi = { }
        this_backpointer = { }
        prev_viterbi = viterbi[-1]
        for tag in distinct_tags:
            # START没有卵用的，我们要忽略
            if tag == "START": continue
            # 如果现在这个tag是X，现在的单词是w，
            # 我们想找前一个tag Y，并且让最好的tag sequence以 Y X 结尾。
            # 也就是说
            # Y要能最大化：
            # prev_viterbi[ Y ] * P(X | Y) * P( w | X)
            best_previous = max(prev_viterbi.keys(),
                                key = lambda prevtag: \
                        prev_viterbi[ prevtag ] * cpd_tags[prevtag].prob(tag) * cpd_tagwords[tag].prob(sentence[wordindex]))
            this_viterbi[ tag ] = prev_viterbi[ best_previous ] * \
                        cpd_tags[ best_previous ].prob(tag) * cpd_tagwords[ tag].prob(sentence[wordindex])
            this_backpointer[ tag ] = best_previous
        # 每次遍历Tag集找完Y 我们把目前最好的 X = currbest存一下
        currbest = max(this_viterbi.keys(), key = lambda tag: this_viterbi[ tag ])
        if DEBUG:
            print( "Word", "'" + sentence[ wordindex] + "'", "current best two-tag sequence:", this_backpointer[currbest], currbest)
        # 完结
        # 全部存下来
        viterbi.append(this_viterbi)
        backpointer.append(this_backpointer)
    
    # 找所有以END结尾的tag sequence
    # prev_viterbi[ Y ] * P("END" | Y), Y是“END"之前的tag, 这里是发射概率
    prev_viterbi = viterbi[-1]
    best_previous = max(prev_viterbi.keys(),
                        key = lambda prevtag: prev_viterbi[ prevtag ] * cpd_tags[prevtag].prob("END"))
    prob_tagsequence = prev_viterbi[ best_previous ] * cpd_tags[ best_previous].prob("END")
    # 我们这会儿是倒着存的。因为好的在后面
    best_tagsequence = [ "END", best_previous ]
    # 同理 这里也有倒过来
    backpointer.reverse()
    # 回溯 最好的tag
    current_best_tag = best_previous
    for bp in backpointer[:-2]:
        best_tagsequence.append(bp[current_best_tag])
        current_best_tag = bp[current_best_tag]
    # 因为"START" "NNP" 中 "NNP" 总是能在第一个单词的 Y 中最大化
    best_tagsequence.append("START")
    best_tagsequence.reverse()

    if DEBUG:
        print( "The sentence was:", end = " ")
        for w in sentence: print( w, end = " ")
        print("\n")
        print( "The best tag sequence is:", end = " ")
        for t in best_tagsequence: print (t, end = " ")
        print("\n")
        print( "The labeled tag sequence is:", end = " ")
        for l in labeled_tags: print(l, end = " ")
        print("\n")
        print( "The probability of the best tag sequence is:", prob_tagsequence)
        
    return best_tagsequence

先用一句话测试

In [21]:
tagger_words_array = np.array(tagger_words_sents)           # 一定要array

(cpd_tags, cpd_tagwords) = train_HMM(tagger_words_total)
Sample = np.random.choice(tagger_words_array, size = 1)[0]
Sample_sentence = [word for (tag, word) in Sample]
Sample_tags = [tag for (tag, word) in Sample]
_ = recognize_HMM(cpd_tags, cpd_tagwords, Sample_sentence, distinct_tags, Sample_tags, DEBUG=True)

Word 'In' current best two-tag sequence: NNP IN
Word 'an' current best two-tag sequence: IN DT
Word 'internal' current best two-tag sequence: DT JJ
Word 'memo' current best two-tag sequence: JJ NN
Word ',' current best two-tag sequence: NN ,
Word 'Alex' current best two-tag sequence: , NNP
Word 'Kroll' current best two-tag sequence: NNP NNP
Word ',' current best two-tag sequence: NNP ,
Word 'the' current best two-tag sequence: , DT
Word 'agency' current best two-tag sequence: DT NN
Word ''s' current best two-tag sequence: NN POS
Word 'chairman' current best two-tag sequence: POS NN
Word ',' current best two-tag sequence: NN ,
Word 'said' current best two-tag sequence: , VBD
Word 'Mr.' current best two-tag sequence: VBD NNP
Word 'Klein' current best two-tag sequence: NNP NNP
Word 'decided' current best two-tag sequence: NNP VBD
Word 'to' current best two-tag sequence: VBD TO
Word 'remove' current best two-tag sequence: TO VB
Word 'himself' current best two-tag sequence: VB PRP
Word 'to'

| | 观察结果T | 观察结果F |
| --- | --- | --- |
| 实际结果T | True Positive | False Positive |
| 实际结果F | False Negative | True Negative|

定义准确率$precision$，召回率$recall$:
$$precision = \frac{tp} {tp + fp}$$
$$recall = \frac{tp} {tp + fn}$$

但是这里的47个tag集合并不是二分类问题，所以我使用 ***sklearn.metrics.precision_score()*** 来计算准确率，这里的多标签需要一个参数：
> **average** : string, [None, ‘binary’ (default), ‘micro’, ‘macro’, ‘samples’, ‘weighted’]

> This parameter is required for multiclass/multilabel targets. If None, the 
scores for each class are returned. Otherwise, this
determines the type of averaging performed on the data:

另外一种计算方法假设标准答案标注的个数是$N$，你的模型给出结果和标准答案相同的分词个数是$c$，你的模型给出的其他单词个数是$c$，也就是标错的个数，那么：
$$precision = \frac{c} {c + e}$$
$$recall = \frac{c} {N}$$
$$F_1 = \frac {2 * P * R}{P + R}$$

训练并且测试**不带IN-prep,IN-conj**的标记集，分四次交叉验证，训练集：测试集$ = 9 : 1$

In [18]:
# for train_index, test_index in ss.split(tagger_words_sents):
#     print('train_index', train_index, 'test_index', test_index)
# #     train_set = tagger_words_sents[list(train_index)]
kf = KFold(n_splits=10, random_state=0)     
ss = ShuffleSplit(n_splits=4, random_state=0, test_size=0.1)

for train_index, test_index in ss.split(tagger_words_array):
    train_set, test_set = tagger_words_array[train_index], tagger_words_array[test_index] 
    train_total = reduce(operator.add, train_set)
    cpd_tags, cpd_tagwords = train_HMM(train_total)
    true_tags, pred_tags = [], []
    for test in test_set:
        sentence = [ word for (tag, word) in test ] 
        labeled_tag = [ tag for (tag, word) in test]
        pred_tagsequence = recognize_HMM(cpd_tags, cpd_tagwords, sentence, distinct_tags)   
        pred_tags.extend(pred_tagsequence)
        true_tags.extend(labeled_tag)
        

    print("Precision_score:", metrics.precision_score(true_tags, pred_tags, average='macro'))
    print("Recell_score:", metrics.recall_score(true_tags, pred_tags, average='macro'))

  _warn_prf(average, modifier, msg_start, len(result))


Precision_score: 0.8307542190003039
Recell_score: 0.4990927959270452
Precision_score: 0.8201363382987837
Recell_score: 0.4605536037371405
Precision_score: 0.8605353133423356
Recell_score: 0.5212756230317562
Precision_score: 0.8394670065801401
Recell_score: 0.5029218544492381


同样训练并且测试**带IN-prep,IN-conj**的标记集

In [20]:
def test_HMM(tagger_words_array, distinct_tags):
    ss = ShuffleSplit(n_splits=4, random_state=0, test_size=0.1)

    for train_index, test_index in ss.split(tagger_words_array):
        train_set, test_set = tagger_words_array[train_index], tagger_words_array[test_index] 
        train_total = reduce(operator.add, train_set)
        cpd_tags, cpd_tagwords = train_HMM(train_total)
        true_tags, pred_tags = [], []
        for test in test_set:
            sentence = [ word for (tag, word) in test ] 
            labeled_tag = [ tag for (tag, word) in test]
            pred_tagsequence = recognize_HMM(cpd_tags, cpd_tagwords, sentence, distinct_tags)   
            pred_tags.extend(pred_tagsequence)
            true_tags.extend(labeled_tag)
            
        precision = metrics.precision_score(true_tags, pred_tags, average='macro')
        recall = metrics.recall_score(true_tags, pred_tags, average='macro')
        F_score = (2 * precision * recall) / ( precision + recall )
        print("Precision_score:", precision)
        print("Recell_score:", recall)
        print("F1_score:", F_score)
        print("\n")        
        
tagger_words_IN_array = np.array(tagger_words_IN_sents)           # 一定要array
test_HMM(tagger_words_IN_array, distinct_tags_IN)

  _warn_prf(average, modifier, msg_start, len(result))


Precision_score: 0.749343022298273
Recell_score: 0.3370602896190849
F1_score: 0.4649723971738213
Precision_score: 0.7399626601775345
Recell_score: 0.32607049293336027
F1_score: 0.4526688286424614
Precision_score: 0.7605907477905459
Recell_score: 0.36391609211102033
F1_score: 0.4922890689681586
Precision_score: 0.7285994937792246
Recell_score: 0.34363141151554255
F1_score: 0.4670070061225022


### 结论：
区分IN-conj和IN-prep之后，准确率和召回率都**下降**了。两个指标都大代表效果越好。
1. 可能因为IN的一些词语既有**conj连词**又有**prep介词**的词性。
2. 我区分**IN-conj连词**和**IN-prep介词**的手动规则太简单了，就只根据后面是不是名词来区分。
3. 训练时分开，最后测试得出结果时将**conj连词**和**prep介词**合并为IN的标记，再和初始标记对比,下面实现下

In [22]:
ss = ShuffleSplit(n_splits=4, random_state=0, test_size=0.1)

for train_index, test_index in ss.split(tagger_words_IN_array):
    # 训练IN分开， 测试IN合并
    train_set, test_set = tagger_words_IN_array[train_index], tagger_words_array[test_index] 
    train_total = reduce(operator.add, train_set)
    cpd_tags, cpd_tagwords = train_HMM(train_total)
    true_tags, pred_tags = [], []
    for test in test_set:
        sentence = [ word for (tag, word) in test ] 
        labeled_tag = [ tag for (tag, word) in test]
        pred_tagsequence = recognize_HMM(cpd_tags, cpd_tagwords, sentence, distinct_tags)   
        pred_tags.extend(pred_tagsequence)
        true_tags.extend(labeled_tag)
        
    pred_tags = [tag if tag != 'IN-prep' and tag != 'IN-conj' else 'IN' for tag in pred_tags]
    print("Precision_score:", metrics.precision_score(true_tags, pred_tags, average='macro'))
    print("Recell_score:", metrics.recall_score(true_tags, pred_tags, average='macro'))
    print("\n")

  _warn_prf(average, modifier, msg_start, len(result))


Precision_score: 0.7652864908578106
Recell_score: 0.34423178514289526


Precision_score: 0.7557065465642907
Recell_score: 0.3330081629957722


Precision_score: 0.7771253292642534
Recell_score: 0.37182731150473813


Precision_score: 0.7441016106681443
Recell_score: 0.35094271814353284




结果依然不如最初没有把IN分开的情况

## 引用
[^1] [维特比算法实现](https://codingcat.cn/article/16#Recognition%E9%97%AE%E9%A2%98%E7%9A%84%E8%A7%A3%E6%B3%95)

[^2] [准确率和召回率](https://zhuanlan.zhihu.com/p/24322275)

[^3] [sklearn.metrics.precision_score()的参数问题](https://stackoverflow.com/questions/52269187/facing-valueerror-target-is-multiclass-but-average-binary)