# n元语言模型回退算法

本次作业要求补全本笔记中的n元语言模型的采用Good-Turing折扣的Katz回退算法。

### 预处理

首先创建一些预处理函数。

引入必要的模块，定义些类型别名。

In [12]:
import re
import itertools
import os
os.environ["KMP_DUPLICATE_LIB_OK"]="TRUE"

from typing import List, Dict, Tuple

Sentence = List[str]
IntSentence = List[int]

Corpus = List[Sentence]
IntCorpus = List[IntSentence]

Gram = Tuple[int]

下面的函数用于将文本正则化并词元化。该函数会将所有英文文本转为小写，去除文本中所有的标点，简单起见将所有连续的数字用一个`N`代替，将形如`let's`的词组拆分为`let`和`'s`两个词。

In [13]:
_splitor_pattern = re.compile(r"[^a-zA-Z']+|(?=')")
_digit_pattern = re.compile(r"\d+")
def normaltokenize(corpus: List[str]) -> Corpus:
    """
    Normalizes and tokenizes the sentences in `corpus`. Turns the letters into
    lower case and removes all the non-alphadigit characters and splits the
    sentence into words and added BOS and EOS marks.

    Args:
        corpus - list of str

    Return:
        list of list of str where each inner list of str represents the word
          sequence in a sentence from the original sentence list
    """

    tokeneds = [ ["<s>"]
               + list(
                   filter(lambda tkn: len(tkn)>0,
                       _splitor_pattern.split(
                           _digit_pattern.sub("N", stc.lower()))))
               + ["</s>"]
                    for stc in corpus
               ]
    return tokeneds

接下来定义两个函数用来从训练语料中构建词表，并将句子中的单词从字符串表示转为整数索引表示。

In [14]:
def extract_vocabulary(corpus: Corpus) -> Dict[str, int]:
    """
    Extracts the vocabulary from `corpus` and returns it as a mapping from the
    word to index. The words will be sorted by the codepoint value.

    Args:
        corpus - list of list of str

    Return:
        dict like {str: int}
    """

    vocabulary = set(itertools.chain.from_iterable(corpus))
    vocabulary = dict(
            map(lambda itm: (itm[1], itm[0]),
                enumerate(
                    sorted(vocabulary))))
    return vocabulary

def words_to_indices(vocabulary: Dict[str, int], sentence: Sentence) -> IntSentence:
    """
    Convert sentence in words to sentence in word indices.

    Args:
        vocabulary - dict like {str: int}
        sentence - list of str

    Return:
        list of int
    """

    return list(map(lambda tkn: vocabulary.get(tkn, len(vocabulary)), sentence))

接下来读入训练数据，将数据预处理。

In [15]:
import functools

with open("data/news.2007.en.shuffled.deduped.train", encoding="utf-8") as f:
    texts = list(map(lambda l: l.strip(), f.readlines()))

print("Loaded training set.")

corpus = normaltokenize(texts)
vocabulary = extract_vocabulary(corpus)
corpus = list(
        map(functools.partial(words_to_indices, vocabulary),
            corpus))

print("Preprocessed training set.")

Loaded training set.
Preprocessed training set.


### 设计模型

参照公式

$$
P_{\text{bo}}(w_k | W_{k-n+1}^{k-1}) = \begin{cases}
    d(W_{k-n+1}^k) \dfrac{C(W_{k-n+1}^k)}{C(W_{k-n+1}^{k-1})} &  C(W_{k-n+1}^k) > 0 \\
    \alpha(W_{k-n+1}^{k-1}) P_{\text{bo}}(w_k | W_{k-n+2}^{k-1}) &  \text{否则} \\
\end{cases}
$$

实现n元语言模型及采用Good-Turing折扣的Katz回退算法。

需要实现的功能包括：

1. 统计各词组（gram）在训练语料中的频数
2. 计算同频词组个数$N_r$
3. 计算$d(W_{k-n+1}^k)$
4. 计算$\alpha(W_{k-n+1}^{k-1})$
5. 根据公式计算回退概率
6. 计算概率对数与困惑度（PPL）

$d$与$\alpha$如何计算可以参考作业文件中的算法说明以及[SRILM](http://www.speech.sri.com/projects/srilm/)的[`ngram-discount(7)`手册页](http://www.speech.sri.com/projects/srilm/manpages/ngram-discount.7.html)。

In [16]:
import math

class NGramModel:
    def __init__(self, vocab_size: int, n: int = 4):
        """
        Constructs `n`-gram model with a `vocab_size`-size vocabulary.

        Args:
            vocab_size - int
            n - int
        """

        self.vocab_size: int = vocab_size
        self.n: int = n

        self.frequencies: List[Dict[Gram, int]]\
            = [{} for _ in range(n)]
        self.disfrequencies: List[Dict[Gram, int]]\
            = [{} for _ in range(n)]

        self.ncounts: Dict[ Gram
                          , Dict[int, int]
                          ] = {}
        self.Nr : Dict[int, int] = {}
        self.discount_threshold:int = 7
        self._d: Dict[Gram, Tuple[float, float]] = {}
        self._alpha: List[Dict[Gram, float]]\
            = [{} for _ in range(n)]

        self.eps = 1e-10
        self.sum = 0



    def learn(self, corpus: IntCorpus):
        """
        Learns the parameters of the n-gram model.

        Args:
            corpus - list of list of int
        """

        for stc in corpus:
            for i in range(1, len(stc)+1):
                for j in range(min(i, self.n)):
                    # TODO: count the frequencies of the grams
                    gram = tuple(stc[i-j-1:i])
                    if gram in self.frequencies[j]:
                        self.frequencies[j][gram] += 1
                    else:
                        self.frequencies[j][gram] = 1
                    
                    

        for i in range(1, self.n):
            grams = itertools.groupby(
                    sorted(
                        sorted(
                            map(lambda itm: (itm[0][:-1], itm[1]),
                                 self.frequencies[i].items()),
                               key=(lambda itm: itm[1])),    
                        key=(lambda itm: itm[0]))) 
                        #  该grouby函数有两个返回值，
                        #  其中第一个返回值的第一项表示了W_k之前的n-1个前序词信息, 第二个返回值表示该序列的频率
                        #  第一个返回值的迭代个数表示了以该前序出现的词的个数

            # TODO: calculates the value of $N_r$
            for past, num in grams :
                
                length = 0
                past_tuple = past[0]
                fre = past[1]
                for i in num:
                    length += 1

                if past_tuple in self.ncounts :
                    self.ncounts[past_tuple][fre] = length
                else :
                    self.ncounts[past_tuple] = {}
                    self.ncounts[past_tuple][fre] = length

        self.sum = 0
        for g in self.frequencies[0]:
            self.sum += self.frequencies[0][g]
        self.sum = float(self.sum)
        ## 计算1-gram的出现频率总和，将用于之后的__getitem__函数

        for i in range(self.n):
            for (gram, fre) in self.frequencies[i].items():
                if fre in self.Nr:
                    self.Nr[fre] += 1
                else:
                    self.Nr[fre] = 1
        ## 统计Nr

        return self.frequencies, self.ncounts
        ## 函数的第一个返回值记录了了不同n值（1至self.n）的n-gram以及其对应的出现频率
        ## 第二个返回值的第一个索引值表示的是可能的前序词，第二个索引值表示的是这个前序词出现的频率（r），
        ## 而最终得到的值即该前序词下该频率（r）的n-grams的出现次数(N_{r})

            
        

    def d(self, gram: Gram) -> float:
        """
        Calculates the interpolation coefficient.

        Args:
            gram - tuple of int

        Return:
            float
        """

        if gram not in self._d:
            # TODO: calculates the value of $d'$
            r = self.frequencies[len(gram)-1].get(gram, 0)
            assert r > 0
            if r > self.discount_threshold:   ## 若 C(W^{k-1}_{k-n+1})大于阈值，直接将d设为1
                self._d[gram] = 1
                return self._d[gram]
            else:

                lab = self.Nr[1] / (self.Nr[1] - (self.discount_threshold + 1) * self.Nr[self.discount_threshold + 1])
                N_r = self.Nr[r]
                N_r_1 = self.Nr[r+1]
                self._d[gram] = (lab * (r+1) * N_r_1 ) / (r*N_r) + (1-lab)
                # self._d[gram] = (numerator1/denominator, - numerator2/denominator)

                # assert self._d[gram] >= 0 
          
        return self._d[gram]

                ## 由于没有理解原函数给出的两个返回值的意义，我在自己理解的基础上将返回值改成了一个

    def alpha(self, gram: Gram) -> float:
        """
        Calculates the back-off weight alpha(`gram`)

        Args:
            gram - tuple of int

        Return:
            float
        """

        n = len(gram) 
        if gram not in self._alpha[n]:
            if gram in self.frequencies[n-1]: ## gram == W^{k-1}_{k-n+1}
                # TODO: calculates the value of $\alpha$

                numerator = 0
                denominator = 0

                for gram_n in self.frequencies[n]:
                    if gram == gram_n[:-1]:  ## gram_n即在W^{k-1}_{k-n+1}的基础上加上w_k
                        ## V_plus
                        numerator +=  self.__getitem__(gram_n)  ## P_{bo}(w_k | W^{k-1}_{k-n+1})
                        denominator += self.__getitem__(gram_n[1:])  ## P_{bo}(w_k | W^{k-1}_{k-n+2})

                numerator = 1 - numerator
                denominator = 1 - denominator
                
                # assert numerator > 0 and denominator > 0
                self._alpha[n][gram] = numerator/denominator
            else:
                self._alpha[n][gram] = 1.
        return self._alpha[n][gram]

    def __getitem__(self, gram: Gram) -> float:
        """
        Calculates smoothed conditional probability P(`gram[-1]`|`gram[:-1]`).

        Args:
            gram - tuple of int

        Return:
            float
        """

        n = len(gram)-1


        if gram not in self.disfrequencies[n]:
            if n>0:
                # TODO: calculates the smoothed probability value according to the formular
                if self.frequencies[n].get(gram, 0) > self.eps: ## C(W^{k}_{k-n+1}) > 0
                    self.disfrequencies[n][gram] = self.d(gram) * self.frequencies[n][gram] / self.frequencies[n-1][gram[:-1]]
                else:
                    self.disfrequencies[n][gram] = self.alpha(gram[:-1]) * self.__getitem__(gram[1:]) 
            else:
                self.disfrequencies[n][gram] = self.frequencies[n].get(gram, self.eps) / self.sum
                
        # assert self.disfrequencies[n][gram] > 0
        return self.disfrequencies[n][gram]

    def log_prob(self, sentence: IntSentence) -> float:
        """
        Calculates the log probability of the given sentence. Assumes that the
        first token is always "<s>".

        Args:
            sentence: list of int

        Return:
            float
        """

        log_prob = 0.
        for i in range(2, len(sentence)+1):
            # TODO: calculates the log probability
            j = min(i, self.n)
            gram = tuple(sentence[i-j:i]) 
            log_prob += math.log2(self.__getitem__(gram)) 
            ## 遍历所有长度为n的grams，累加其P值的对数
        log_prob *= (-1 / len(sentence))
        return log_prob

    def ppl(self, sentence: IntSentence) -> float:
        """
        Calculates the PPL of the given sentence. Assumes that the first token
        is always "<s>".

        Args:
            sentence: list of int

        Return:
            float
        """
        PPL = 1
        for i in range(2,len(sentence)+1):
            j = min(i, self.n)
            gram = tuple(sentence[i-j:i])
            PPL *= (1 / self.__getitem__(gram))
            ## 遍历所有长度为n的grams，累乘其P值的倒数
        print(PPL,1/float(len(sentence)-1))

        PPL = math.pow(PPL, 1/float(len(sentence)-1))
        # TODO: calculates the PPL
        return PPL

### 训练与测试

现在数据与模型均已齐备，可以训练并测试了。

训练模型：

In [17]:
import pickle as pkl

model = NGramModel(len(vocabulary))
model.learn(corpus)
with open("model.pkl", "wb") as f:
    pkl.dump(vocabulary, f)
    pkl.dump(model, f)

print("Dumped model.")

Dumped model.


在测试集上测试计算困惑度：

In [18]:

#with open("model.pkl", "rb") as f:
#    vocabulary = pkl.load(f)
#    model = pkl.load(f)
#print("Loaded model.")

with open("data/news.2007.en.shuffled.deduped.test", encoding="utf-8") as f:
    test_set = list(map(lambda l: l.strip(), f.readlines()))
test_corpus = normaltokenize(test_set)
test_corpus = list(
        map(functools.partial(words_to_indices, vocabulary),
            test_corpus))
ppls = []
for t in test_corpus:
    ppls.append(model.ppl(t))
    print(ppls[-1])
print("Avg: ", sum(ppls)/len(ppls))

6.587287847661497e+34 0.125
22508.073785375276
5.999742452490332e+19 0.14285714285714285
669.0329624144102
1.0942025201104746e+27 0.09090909090909091
287.1440303760335
1.496178395269224e+64 0.030303030303030304
88.0433311831805
2.1492655051770874e+31 0.14285714285714285
29925.501581854347
9.482145093731398e+21 0.07692307692307693
49.03783371751199
1.6093235170018954e+35 0.03333333333333333
14.912648403472929
2.246515784658003e+57 0.038461538461538464
160.6302323186319
2.0749654887986508e+89 0.03333333333333333
948.9289491259567
8.760193318442967e+45 0.04
68.81776241881947
8.168739512745794e+21 0.1111111111111111
272.07202820749205
8.265477899154348e+43 0.047619047619047616
123.39525923634163
1.212542289319791e+59 0.0625
4928.684923860772
1.4347888773107825e+70 0.045454545454545456
1545.0584343284559
2.5662332513794994e+63 0.041666666666666664
438.5852332782527
4.3079844789677725e+35 0.07142857142857142
350.9985278769381
172489293.29471415 0.5
13133.517932934577
1.1678045076348033e+79 0