# 作业一：实现N-gram语言模型平滑算法

### 预处理

首先创建一些预处理函数。

引入必要的模块，定义些类型别名。

In [1]:
import re
import itertools

from typing import List, Dict, Tuple

Sentence = List[str]
IntSentence = List[int]

Corpus = List[Sentence]
IntCorpus = List[IntSentence]

Gram = Tuple[int]

下面的函数用于将文本正则化并词元化。该函数会将所有英文文本转为小写，去除文本中所有的标点，简单起见将所有连续的数字用一个`N`代替，将形如`let's`的词组拆分为`let`和`'s`两个词。

In [2]:
_splitor_pattern = re.compile(r"[^a-zA-Z']+|(?=')")
_digit_pattern = re.compile(r"\d+")
def normaltokenize(corpus: List[str]) -> Corpus:
    """
    Normalizes and tokenizes the sentences in `corpus`. Turns the letters into
    lower case and removes all the non-alphadigit characters and splits the
    sentence into words and added BOS and EOS marks.

    Args:
        corpus - list of str

    Return:
        list of list of str where each inner list of str represents the word
          sequence in a sentence from the original sentence list
    """

    tokeneds = [ ["<s>"]
               + list(
                   filter(lambda tkn: len(tkn)>0,
                       _splitor_pattern.split(
                           _digit_pattern.sub("N", stc.lower()))))
               + ["</s>"]
                    for stc in corpus
               ]
    return tokeneds

接下来定义两个函数用来从训练语料中构建词表，并将句子中的单词从字符串表示转为整数索引表示。

In [3]:
def extract_vocabulary(corpus: Corpus) -> Dict[str, int]:
    """
    Extracts the vocabulary from `corpus` and returns it as a mapping from the
    word to index. The words will be sorted by the codepoint value.

    Args:
        corpus - list of list of str

    Return:
        dict like {str: int}
    """

    vocabulary = set(itertools.chain.from_iterable(corpus))
    vocabulary = dict(
            map(lambda itm: (itm[1], itm[0]),
                enumerate(
                    sorted(vocabulary))))
    return vocabulary

def words_to_indices(vocabulary: Dict[str, int], sentence: Sentence) -> IntSentence:
    """
    Convert sentence in words to sentence in word indices.

    Args:
        vocabulary - dict like {str: int}
        sentence - list of str

    Return:
        list of int
    """

    return list(map(lambda tkn: vocabulary.get(tkn, len(vocabulary)), sentence))

接下来读入训练数据，将数据预处理。

In [4]:
import functools

with open("data/news.2007.en.shuffled.deduped.train", encoding="utf-8") as f:
    texts = list(map(lambda l: l.strip(), f.readlines()))

print("Loaded training set.")

corpus = normaltokenize(texts)
vocabulary = extract_vocabulary(corpus)
corpus = list(
        map(functools.partial(words_to_indices, vocabulary),
            corpus))

print("Preprocessed training set.")

FileNotFoundError: [Errno 2] No such file or directory: 'data/news.2007.en.shuffled.deduped.train'

### 设计模型

参照公式

$$
P(w_k | W_{k-n+1}^{k-1}) = \begin{cases}
    d(W_{k-n+1}^k) * \dfrac{C(W_{k-n+1}^k)}{C(W_{k-n+1}^{k-1})} &  C(W_{k-n+1}^k) > 0 \\
    \alpha(W_{k-n+1}^{k-1}) * P(w_k | W_{k-n+2}^{k-1}) &  \text{否则} \\
\end{cases}
$$

实现N-gram语言模型及采用带Good-Turing折扣的Katz回退算法。

需要实现的功能包括：

1. 统计各词组（gram）在训练语料中的频数
2. 计算同频词组个数$N_r$
3. 计算$d(W_{k-n+1}^k)$
4. 计算$\alpha(W_{k-n+1}^{k-1})$
5. 根据公式计算回退概率
6. 计算概率对数与困惑度（PPL）


In [None]:
import math

class NGramModel:
    def __init__(self, vocab_size: int, n: int = 4):
        """
        Constructs `n`-gram model with a `vocab_size`-size vocabulary.

        Args:
            vocab_size - int
            n - int
        """

        # 字典的大小
        self.vocab_size: int = vocab_size

        # 表示该语言模型n-gram的取值
        self.n: int = n

        # 记录语料库中，某个gram的出现的频次
        # 例如 self.frequencies[1][(12, 23)] 应等于 (12, 23) 这个 2-gram 在语料库中出现的次数
        # 注：第一个下标表示长度，调用时下标要减一
        self.frequencies: List[Dict[Gram, int]] = [{} for _ in range(n)]

        # 缓存语言模型提供的概率值，即对于某个gram而言 P(`gram[-1]`|`gram[:-1]`)的值
        # 例如 self.disfrequencies[2][(12, 23, 77)] 应等于 P(`77`|`12, 23`) 的值
        # 注：第一个下标表示长度，调用时下标要减一
        self.disfrequencies: List[Dict[Gram, float]] = [{} for _ in range(n)]

        # 记录number of count，即在前缀是某个(N-1)-gram的情况下，语料库中出现次数是r次的N-gram的数量
        # 例如 self.ncounts[((12, 23)][7] 应等于 语料库中以(12, 23)为前缀的3-gram中，出现次数为7次的种类数
        self.ncounts= [{} for _ in range(n)]

        # 说明文档中所提到的折扣阈值θ
        self.discount_threshold:int = 7

        # 对于某个gram而言的插值系数lambda，以 (lambda, 1-lambda) 形式保存
        # 例如 self._d[(122, 323)] 应等于 (0.1, 0.9)，（如果lambda=0.1的话）
        # 注：这里虽然写作d，但其实应该是公式里的lambda
        self._d: Dict[Gram, Tuple[float, float]] = {}

        # 缓存对于某个gram而言的回退系数α
        # 例如 self._alpha[2][(122, 323)] 应等于α(`122, 323`)
        # 注意：调用时，第一个下标虽然表示长度，但不需要减一
        self._alpha: List[Dict[Gram, float]] = [{} for _ in range(n)]

        # 当计算结果为0时，应取该值，避免潜在的除零错误
        self.eps = 1e-10

        self.grams = [{}]

        self.frq = 0

    def learn(self, corpus: IntCorpus):
        """
        Learns the parameters of the n-gram model.

        Args:
            corpus - list of list of int
        """

        for stc in corpus:
            for i in range(1, len(stc)+1):
                for j in range(min(i, self.n)):
                    # TODO: count the frequencies of the grams
                    tp = tuple(stc[i-1-j:i])
                    self.frequencies[j][tp] = self.frequencies[j].get(tp, 0) + 1

        for i in range(1, self.n):
            self.grams.append(dict(itertools.groupby(
                    sorted(
                        sorted(
                            map(lambda itm: (itm[0][:-1], itm[1], itm[0]),
                                 self.frequencies[i].items()),
                            key=(lambda itm: itm[1])),
                        key=(lambda itm: itm[0])
                        ), key=lambda itm: itm[0])))

        for i, x in enumerate(self.frequencies):
            for _, num in x.items():
                self.ncounts[i][num] = self.ncounts[i].get(num, 0) + 1

        self.frq = float(sum([item[1] for item in self.frequencies[0].items()]))

    def d(self, gram: Gram) -> float:
        """
        Calculates the interpolation coefficient.

        Args:
            gram - tuple of int

        Return:
            float
        """
        n = len(gram) - 1
        if gram not in self._d:
            # 对于某个gram而言的插值系数lambda，返回元组 (lambda, 1-lambda)
            # TODO: calculates the value of $d'$
            numerator1 = self.ncounts[n][1]
            numerator2 = (self.discount_threshold + 1) * self.ncounts[n][self.discount_threshold + 1]
            denominator = numerator1 - numerator2
            self._d[gram] = (numerator1/denominator, - numerator2/denominator)
        return self._d[gram]

    def alpha(self, gram: Gram) -> float:
        """
        Calculates the back-off weight alpha(`gram`)

        Args:
            gram - tuple of int

        Return:
            float
        """

        n = len(gram)
        if gram not in self._alpha[n]:
            if gram in self.frequencies[n-1]:
                # 按照定义计算分子分母的 1 - sum (...) 的值即可，
                # 提示：式中的P(xxx|xxx)调用下面的"__getitem__"函数即可，无需在此单独展开。此函数调用方式为 self[gram]
                # 记得考虑分母为0应该怎么处理
                # TODO: calculates the value of $\alpha$
                numerator = 1 - sum(self[x[2]] for x in self.grams[n].get(gram, []))
                numerator = numerator if numerator else self.eps
                denominator = 1 - sum(self[x[2]] for x in self.grams[n-1].get(gram[1:], []))
                denominator = denominator if denominator else self.eps
                self._alpha[n][gram] = numerator / denominator
            else:
                self._alpha[n][gram] = 1.
        return self._alpha[n][gram]

    def __getitem__(self, gram: Gram) -> float:
        """
        Calculates smoothed conditional probability P(`gram[-1]`|`gram[:-1]`).

        Args:
            gram - tuple of int

        Return:
            float
        """

        n = len(gram)-1
        if gram not in self.disfrequencies[n]:
            if n > 0:
                # 按照P的公式，分类讨论count(gram)>0和count(gram)=0的情况
                # TODO: calculates the smoothed probability value according to the formulae
                count = self.frequencies[n].get(gram, 0)
                if count > 0:
                    if count > self.discount_threshold:
                        d = 1
                    else:
                        lambda_plus, lambda_minus = self.d(gram)
                        r = self.frequencies[n][gram]
                        N_r1 = self.ncounts[n].get(r+1, 0)
                        N_r1 = N_r1 if N_r1 else self.eps
                        N_r = self.ncounts[n].get(r, 0)
                        N_r = N_r if N_r else self.eps
                        d = lambda_plus * (r + 1) * N_r1 / r / N_r + lambda_minus

                    self.disfrequencies[n][gram] = d * self.frequencies[n].get(gram, self.eps) / self.frequencies[n-1].get(gram[:-1], self.eps)
                else:
                    self.disfrequencies[n][gram] = self.alpha(gram[:-1]) * self[gram[1:]]
            else:
                self.disfrequencies[n][gram] = self.frequencies[n].get(gram, self.eps) / self.frq
        return self.disfrequencies[n][gram]

    def log_prob(self, sentence: IntSentence) -> float:
        """
        Calculates the log probability of the given sentence. Assumes that the
        first token is always "<s>".

        Args:
            sentence: list of int

        Return:
            float
        """

        log_prob = 0
        sentence = tuple(sentence)
        for i in range(2, len(sentence)):
            # 注意这是对数运算的情况
            # 这里i的取值会把<s>和</s>考虑进来，可以自行修改使得不考虑这两个词
            # TODO: calculates the log probability
            log_prob += math.log(self[sentence[max(1, i-self.n): i]])
        return log_prob

    def ppl(self, sentence: IntSentence) -> float:
        """
        Calculates the PPL of the given sentence. Assumes that the first token
        is always "<s>".

        Args:
            sentence: list of int

        Return:
            float
        """
        # 和我们课件中的PPW是同一个东西，利用上面的log_prob加以计算
        # TODO: calculates the PPL
        return 1 / pow(math.exp(self.log_prob(sentence)), 1 / (len(sentence) - 2))

### 训练与测试

现在数据与模型均已齐备，可以训练并测试了。

训练模型：

In [None]:
import pickle as pkl

model = NGramModel(len(vocabulary))
model.learn(corpus)
with open("model.pkl", "wb") as f:
    pkl.dump(vocabulary, f)
    pkl.dump(model, f)

print("Dumped model.")

Dumped model.


在测试集上测试计算困惑度：

In [None]:
with open("model.pkl", "rb") as f:
    vocabulary = pkl.load(f)
    model = pkl.load(f)
print("Loaded model.")

with open("data/news.2007.en.shuffled.deduped.test", encoding="utf-8") as f:
    test_set = list(map(lambda l: l.strip(), f.readlines()))
test_corpus = normaltokenize(test_set)
test_corpus = list(
        map(functools.partial(words_to_indices, vocabulary),
            test_corpus))
ppls = []
for t in test_corpus:
    ppls.append(model.ppl(t))
    print(ppls[-1])
print("Avg: ", sum(ppls)/len(ppls))

Loaded model.
41758.445503209725
819.1666149385499
284.4607938559403
81.01344582474758
105650.84533930507
64.02850793248672
21.085407002643866
185.41336489199156
824.9038152916542
70.74988919909318
287.50507199565027
109.88706089222921
5149.674473527617
1687.0411596550928
385.4693705649921
418.6058197700788
327429.50704225356
50615.642043398606
86.81172590440617
425.32242094930945
706.2501807720853
3916.798948946527
335.280509450962
326.60389244156363
86.43227357675555
73.25387975545632
367.3440475763735
27.97731576486108
382.47065036736865
215.76100722084104
250.24494617899
643.151773214768
89.72478492963967
169.35700001888958
854.1236833136531
84.0859437539743
632.3826031094769
517.796460985909
336.2301241227322
13234.038700198142
156.45780994755333
727.9335583367653
122.98729435474569
169.85986512441485
1283.0432920781466
1329.0107616248656
307.3947456436026
979.8574069785449
191.16551949541588
322.52075432451596
Avg:  11303.90237207942
