## 简单HMM
给出序列和隐状态序列

In [1]:
import nltk
import sys
from nltk.corpus import brown

In [2]:
# 预处理词库
brown_tags_words = [ ]
for sent in brown.tagged_sents():
    # 先加开头
    brown_tags_words.append( ("START", "START") )
    # 把tag都省略成前两个字母，可不加
    brown_tags_words.extend([ (tag[:2], word) for (word, tag) in sent ])
    # 加个结尾
    brown_tags_words.append( ("END", "END") )

In [5]:
brown_tags_words[:20]

[('START', 'START'),
 ('AT', 'The'),
 ('NP', 'Fulton'),
 ('NN', 'County'),
 ('JJ', 'Grand'),
 ('NN', 'Jury'),
 ('VB', 'said'),
 ('NR', 'Friday'),
 ('AT', 'an'),
 ('NN', 'investigation'),
 ('IN', 'of'),
 ('NP', "Atlanta's"),
 ('JJ', 'recent'),
 ('NN', 'primary'),
 ('NN', 'election'),
 ('VB', 'produced'),
 ('``', '``'),
 ('AT', 'no'),
 ('NN', 'evidence'),
 ("''", "''")]

## NLTK中的CRF

In [6]:
# P(wi | ti) = count(wi, ti) / count(ti)

# conditional frequency distribution
cfd_tagwords = nltk.ConditionalFreqDist(brown_tags_words)
# conditional probability distribution
cpd_tagwords = nltk.ConditionalProbDist(cfd_tagwords, nltk.MLEProbDist)

In [7]:
# test
print("The probability of an adjective (JJ) being 'new' is", cpd_tagwords["JJ"].prob("new"))
print("The probability of a verb (VB) being 'duck' is", cpd_tagwords["VB"].prob("duck"))

The probability of an adjective (JJ) being 'new' is 0.01472344917632025
The probability of a verb (VB) being 'duck' is 6.042713350943527e-05


In [9]:
# P(ti | t{i-1}) = count(t{i-1}, ti) / count(t{i-1})

brown_tags = [tag for (tag, word) in brown_tags_words ]

# bigram的意思是 前后两个一组，联在一起
cfd_tags = nltk.ConditionalFreqDist(nltk.bigrams(brown_tags))
# P(ti | t{i-1})
cpd_tags = nltk.ConditionalProbDist(cfd_tags, nltk.MLEProbDist)

In [10]:
# test
print("If we have just seen 'DT', the probability of 'NN' is", cpd_tags["DT"].prob("NN"))
print( "If we have just seen 'VB', the probability of 'JJ' is", cpd_tags["VB"].prob("DT"))
print( "If we have just seen 'VB', the probability of 'NN' is", cpd_tags["VB"].prob("NN"))

If we have just seen 'DT', the probability of 'NN' is 0.5057722522030194
If we have just seen 'VB', the probability of 'JJ' is 0.016885067592065053
If we have just seen 'VB', the probability of 'NN' is 0.10970977711020183


In [11]:
# test
prob_tagsequence = cpd_tags["START"].prob("PP") * cpd_tagwords["PP"].prob("I") * \
    cpd_tags["PP"].prob("VB") * cpd_tagwords["VB"].prob("want") * \
    cpd_tags["VB"].prob("TO") * cpd_tagwords["TO"].prob("to") * \
    cpd_tags["TO"].prob("VB") * cpd_tagwords["VB"].prob("race") * \
    cpd_tags["VB"].prob("END")

print( "The probability of the tag sequence 'START PP VB TO VB END' for 'I want to race' is:", prob_tagsequence)

The probability of the tag sequence 'START PP VB TO VB END' for 'I want to race' is: 1.0817766461150474e-14


## Viterbi 的实现

如果我们手上有一句话，怎么知道最符合的tag是哪组呢？

In [12]:
# state集合
distinct_tags = set(brown_tags)

sentence = ["I", "want", "to", "race" ]
sentlen = len(sentence)

In [47]:
# 从start开始的初始转移概率
def start(tag):
    return cpd_tags["START"].prob(tag)

In [53]:
# 转移概率
# trans_p = {}
# for i in distinct_tags:
#     trans_p[i] = {j: cpd_tags[i].prob(j) for j in distinct_tags}
# print(trans_p)
def trans(lasttag, curtag):
    return cpd_tags[lasttag].prob(curtag)

In [49]:
# 释放概率
def emission(tag, word):
    return cpd_tagwords[tag].prob(word)

In [87]:
def viterbi_decode(sentence, states):
    # path[s] 以s结尾的path
    path = {s:[] for s in states}
    # 当前step的得分
    cur_score = {}
    
    # 从“START”到sentence[0]的得分
    for s in states:
        cur_score[s] = start(s) * emission(s, sentence[0])
        
    # 由n-1步计算结果，求第n步的所有可能state的得分
    for i in range(1, len(sentence)):
        word = sentence[i]
        last_score = cur_score
        cur_score = {}
        for cur_state in states:
            # 第n步cur_state的最高得分，及对应的前一个state
            max_score, last_state = max(((last_score[state]*trans(state, cur_state)*
                                          emission(cur_state, word), state) for state in states))
            cur_score[cur_state] = max_score
            path[cur_state].append(last_state)
            # 最后一个step，加上cur_state
            if i == len(sentence) - 1:
                path[cur_state].append(cur_state)
    
    # 以最大的'END'结束状态序列为结果
    max_p, prev_state = max(((cur_score[s]*trans(s, 'END'), s) for s in states))
    max_path = path[prev_state]

    # 根据最后一个时刻的max_p，选择path，未考虑'END'的情况
    # 对应路线，由path字典全纪录(Dynamic Programming)
    # max_p = 0
    # max_path = None
    # for s in states:
    #     if cur_score[s] > max_p:
    #         max_path = path[s]
    #         max_p = cur_score[s]

    return max_path, max_p

In [88]:
print(viterbi_decode(sentence, distinct_tags))  # 训练语料有限

(['PP', '``', 'IN', 'NN'], 5.71772824864617e-14)
