# n元语言模型回退算法

本次作业要求补全本笔记中的n元语言模型的采用Good-Turing折扣的Katz回退算法。

### 预处理

首先创建一些预处理函数。

引入必要的模块，定义些类型别名。

In [3]:
import re
import itertools

from typing import List, Dict, Tuple

Sentence = List[str]
IntSentence = List[int]

Corpus = List[Sentence]
IntCorpus = List[IntSentence]

Gram = Tuple[int]

下面的函数用于将文本正则化并词元化。该函数会将所有英文文本转为小写，去除文本中所有的标点，简单起见将所有连续的数字用一个`N`代替，将形如`let's`的词组拆分为`let`和`'s`两个词。

In [4]:
_splitor_pattern = re.compile(r"[^a-zA-Z']+|(?=')")
_digit_pattern = re.compile(r"\d+")
def normaltokenize(corpus: List[str]) -> Corpus:
    """
    Normalizes and tokenizes the sentences in `corpus`. Turns the letters into
    lower case and removes all the non-alphadigit characters and splits the
    sentence into words and added BOS and EOS marks.

    Args:
        corpus - list of str

    Return:
        list of list of str where each inner list of str represents the word
          sequence in a sentence from the original sentence list
    """

    tokeneds = [ ["<s>"]
               + list(
                   filter(lambda tkn: len(tkn)>0,
                       _splitor_pattern.split(
                           _digit_pattern.sub("N", stc.lower()))))
               + ["</s>"]
                    for stc in corpus
               ]
    return tokeneds

接下来定义两个函数用来从训练语料中构建词表，并将句子中的单词从字符串表示转为整数索引表示。

In [5]:
def extract_vocabulary(corpus: Corpus) -> Dict[str, int]:
    """
    Extracts the vocabulary from `corpus` and returns it as a mapping from the
    word to index. The words will be sorted by the codepoint value.

    Args:
        corpus - list of list of str

    Return:
        dict like {str: int}
    """

    vocabulary = set(itertools.chain.from_iterable(corpus))
    vocabulary = dict(
            map(lambda itm: (itm[1], itm[0]),
                enumerate(
                    sorted(vocabulary))))
    return vocabulary

def words_to_indices(vocabulary: Dict[str, int], sentence: Sentence) -> IntSentence:
    """
    Convert sentence in words to sentence in word indices.

    Args:
        vocabulary - dict like {str: int}
        sentence - list of str

    Return:
        list of int
    """

    return list(map(lambda tkn: vocabulary.get(tkn, len(vocabulary)), sentence))

接下来读入训练数据，将数据预处理。

In [9]:
import functools

with open("data/news.2007.en.shuffled.deduped.train", encoding='UTF-8') as f:
    texts = list(map(lambda l: l.strip(), f.readlines()))

print("Loaded training set.")

corpus = normaltokenize(texts)
vocabulary = extract_vocabulary(corpus)
corpus = list(
        map(functools.partial(words_to_indices, vocabulary),
            corpus))

print("Preprocessed training set.")

Loaded training set.
Preprocessed training set.


In [15]:
print(len(corpus[0]), len(corpus[1]))

print(corpus[0])
print(corpus[1])

36 32
[3581, 165570, 86217, 62968, 165570, 34971, 167440, 82721, 178887, 132056, 12791, 165570, 142662, 142568, 6618, 134164, 176515, 9880, 131488, 7782, 99960, 165570, 169869, 80313, 3582, 9880, 165570, 109285, 8702, 144231, 119677, 165570, 68036, 80313, 3582, 3580]
[3581, 165570, 178819, 62968, 165570, 58547, 180493, 41355, 82173, 135885, 51060, 166148, 179627, 169773, 36804, 165541, 83663, 106975, 76095, 118831, 63602, 135877, 41396, 120257, 55961, 37434, 135208, 165710, 79221, 81152, 5006, 3580]


### 设计模型

参照公式

$$
P_{\text{bo}}(w_k | W_{k-n+1}^{k-1}) = \begin{cases}
    d(W_{k-n+1}^k) \dfrac{C(W_{k-n+1}^k)}{C(W_{k-n+1}^{k-1})} &  C(W_{k-n+1}^k) > 0 \\
    \alpha(W_{k-n+1}^{k-1}) P_{\text{bo}}(w_k | W_{k-n+2}^{k-1}) &  \text{否则} \\
\end{cases}
$$

实现n元语言模型及采用Good-Turing折扣的Katz回退算法。

需要实现的功能包括：

1. 统计各词组（gram）在训练语料中的频数
2. 计算同频词组个数$N_r$
3. 计算$d(W_{k-n+1}^k)$
4. 计算$\alpha(W_{k-n+1}^{k-1})$
5. 根据公式计算回退概率
6. 计算概率对数与困惑度（PPL）

$d$与$\alpha$如何计算可以参考作业文件中的算法说明以及[SRILM](http://www.speech.sri.com/projects/srilm/)的[`ngram-discount(7)`手册页](http://www.speech.sri.com/projects/srilm/manpages/ngram-discount.7.html)。

In [None]:
import math

class NGramModel:
    def __init__(self, vocab_size: int, n: int = 4):
        """
        Constructs `n`-gram model with a `vocab_size`-size vocabulary.

        Args:
            vocab_size - int
            n - int
        """

        self.vocab_size: int = vocab_size
        self.n: int = n

        self.frequencies: List[Dict[Gram, int]]\
            = [{} for _ in range(n)]
        self.disfrequencies: List[Dict[Gram, int]]\
            = [{} for _ in range(n)]

        self.ncounts: Dict[ Gram
                          , Dict[int, int]
                          ] = {}

        self.discount_threshold:int = 7
        self._d: Dict[Gram, Tuple[float, float]] = {}
        self._alpha: List[Dict[Gram, float]]\
            = [{} for _ in range(n)]

        self.eps = 1e-10
    
    def learn(self, corpus: IntCorpus):
        """
        Learns the parameters of the n-gram model.

        Args:
            corpus - list of list of int
        """
        
        # self.n: lenth of n-gram
        for stc in corpus:
            for i in range(1, len(stc)+1):  # start from point i, 0th not counted
                for j in range(min(i, self.n)):
                    # TODO: count the frequencies of the grams
                    # len of gram = j
                    if (i+j-1)<=len(stc) :
                        tmp_gram = stc[i:i+j]
                        tmp_gram = tuple(tmp_gram)
                        self.frequencies[j][tmp_gram] = self.frequencies[j][tmp_gram] + 1

        for i in range(1, self.n):
            grams = itertools.groupby(
                    sorted(
                        sorted(
                            map(lambda itm: (itm[0][:-1], itm[1]),
                                 self.frequencies[i].items()),
                               key=(lambda itm: itm[1])),
                        key=(lambda itm: itm[0])))
            # TODO: calculates the value of $N_r$
            

    def d(self, gram: Gram) -> float:
        """
        Calculates the interpolation coefficient.

        Args:
            gram - tuple of int

        Return:
            float
        """

        if gram not in self._d:
            # TODO: calculates the value of $d'$
            pass
            self._d[gram] = (numerator1/denominator, - numerator2/denominator)
        return self._d[gram]

    def alpha(self, gram: Gram) -> float:
        """
        Calculates the back-off weight alpha(`gram`)

        Args:
            gram - tuple of int

        Return:
            float
        """

        n = len(gram)
        if gram not in self._alpha[n]:
            if gram in self.frequencies[n-1]:
                # TODO: calculates the value of $\alpha$
                pass
                self._alpha[n][gram] = numerator/denominator
            else:
                self._alpha[n][gram] = 1.
        return self._alpha[n][gram]

    def __getitem__(self, gram: Gram) -> float:
        """
        Calculates smoothed conditional probability P(`gram[-1]`|`gram[:-1]`).

        Args:
            gram - tuple of int

        Return:
            float
        """

        n = len(gram)-1
        if gram not in self.disfrequencies[n]:
            if n>0:
                # TODO: calculates the smoothed probability value according to the formulae
                pass
            else:
                self.disfrequencies[n][gram] = self.frequencies[n].get(gram, self.eps)/float(len(self.frequencies[0]))
        return self.disfrequencies[n][gram]

    def log_prob(self, sentence: IntSentence) -> float:
        """
        Calculates the log probability of the given sentence. Assumes that the
        first token is always "<s>".

        Args:
            sentence: list of int

        Return:
            float
        """

        log_prob = 0.
        for i in range(2, len(sentence)+1):
            # TODO: calculates the log probability
            pass
        return log_prob

    def ppl(self, sentence: IntSentence) -> float:
        """
        Calculates the PPL of the given sentence. Assumes that the first token
        is always "<s>".

        Args:
            sentence: list of int

        Return:
            float
        """

        # TODO: calculates the PPL
        pass

### 训练与测试

现在数据与模型均已齐备，可以训练并测试了。

训练模型：

In [None]:
import pickle as pkl

model = NGramModel(len(vocabulary))
model.learn(corpus)
with open("model.pkl", "wb") as f:
    pkl.dump(vocabulary, f)
    pkl.dump(model, f)

print("Dumped model.")

在测试集上测试计算困惑度：

In [None]:
with open("model.pkl", "rb") as f:
    vocabulary = pkl.load(f)
    model = pkl.load(f)
print("Loaded model.")

with open("data/news.2007.en.shuffled.deduped.test") as f:
    test_set = list(map(lambda l: l.strip(), f.readlines()))
test_corpus = normaltokenize(test_set)
test_corpus = list(
        map(functools.partial(words_to_indices, vocabulary),
            test_corpus))
ppls = []
for t in test_corpus:
    ppls.append(model.ppl(t))
    print(ppls[-1])
print("Avg: ", sum(ppls)/len(ppls))