# 02 N grams

英语中, "I want" 后面时常跟着 "to"。 "I want to" 后面一般跟着动词。假设现有 $(n-1)$ 个词 $w_1,\dotsc,w_{n-1}$, 预测下一个词为 $w_n$. 则不同的词 $w_n$ 出现的条件概率不同

$$\mathbb P(w_n| w_1,\dotsc,w_{n-1})$$

例如上述例子说明 $\mathbb P({\rm to|\ I \ want})$ 比较大, 但是如 $\mathbb P({\rm apple|\ I\ want\ to})$ 很小。

如果一个模型可以输入 $w_1,\dotsc,w_{n-1}$, 且对于任何一个词 $w_n$ 输出它是下一个词的概率 $\mathbb P(w_n| w_1,\dotsc,w_{n-1})$, 则被称为语言模型 (language model).

<br>

如果语言文本的数据集, 可以统计序列 $(w_1,\dotsc,w_{n-1})$ 的出现次数与 $(w_1,\dotsc,w_{n-1},w_n)$ 的出现次数, 根据比值求得近似的概率:

$$\mathbb P(w_n| w_1,\dotsc,w_{n-1}) \approx \frac{N(w_1,\dotsc,w_{n-1},w_{n})}{N(w_1,\dotsc,w_{n-1})}$$


## N grams

数据集词语组合太多, 数据集不可能所有组合都包含足够多次. 用 $N(w_1,\dotsc,w_n) / N(w_1,\dotsc,w_{n-1})$ 不合适。

可以截断只有最近的几个词近似. 例如二元模型 (bigram):

$$\mathbb P(w_n| w_1,\dotsc,w_{n-1})\approx \mathbb P(w_n| w_{n-1}) \approx  \frac{N(w_{n-1},w_n)}{N(w_{n-1})}$$

例如 "I want to" 的下一个词 (很可能是动词) 主要取决于 "to". 这样一来样本数量 $N(w_{n-1},w_n), N(w_{n-1})$ 远高于 $N(w_1,\dotsc,w_n), N(w_1,\dotsc,w_{n-1})$, 估计效果更好。

同理有一元模型 (unigram) $\mathbb P(w_n| w_1,\dotsc,w_{n-1})\approx \mathbb P(w_n)$, 三元模型 (trigram) $\mathbb P(w_n| w_1,\dotsc,w_{n-1})\approx \mathbb P(w_n| w_{n-1},w_{n-2}) \approx \frac{N(w_{n-2},w_{n-1},w_n)}{N(w_{n-2},w_{n-1})}$ 等。

形如二元模型中的 $(w_{n-1},w_n)$ 被称为词袋 (gram)。

<br>



### 陌生词

如果在测试集上遇到陌生的词汇, 输出概率为零就会产生严重误差。可以**先确定一个常用词汇表 (vocabulary)** $V$，**所有不在** $V$ **中的单词被看做一个“新词”**(unknown word, unk)。

词汇表可以选择为训练集中出现频率高于一定次数的单词。

以 $46$ MB 的英文维基数据集为例, $120000$ 个单词中仅有 $7000$ 多个出现次数 $>100$。

<br>

### 平滑

可以用拉普拉斯平滑 (Laplacian/Bayes smoothing): 设 $|V|$ 为所有单词数量，以二元模型为例

$$\hat {\mathbb P}(w_n| w_{n-1}) = \frac{N(w_{n-1},w_n)+1}{N(w_{n-1})+|V|}$$

分子加 $1$， 分母加 $|V|$，这样子仍然能保证 $|V|$ 个词袋概率之和为一:

$$\sum_{w_n\in V}\hat {\mathbb P}(w_n| w_{n-1})= \frac{\sum_{w_n\in V}\left(N(w_{n-1},w_n)+1\right)}{N(w_{n-1})+|V|}=1$$

<br>

引进陌生词后，分母还要包括陌生词的一个，公式应修正为：

$$\hat {\mathbb P}(w_n| w_{n-1}) = \frac{N(w_{n-1},w_n)+1}{N(w_{n-1})+|V|+1}$$

训练时可以跳过含有陌生词的词袋，因为平滑后保证遇到陌生词也有非零的概率。

In [1]:
from collections import defaultdict
from copy import deepcopy
from functools import reduce
from typing import List, Union, Optional

from tqdm import tqdm
import numpy as np
from nltk import sent_tokenize

class SizedDefaultDict(defaultdict):
    def __init__(self, *args, **kwargs):
        """
        An override of defaultdict that has an extra argument to store info.
        """
        super().__init__(*args, **kwargs)
        self._boxsize = 0

class Ngrams:
    def __init__(self, 
            data: List[Union[List[str], str]] = [], 
            n: int = 2, 
            vocab: Optional[dict] = None,
            bos_token: str = '<s>',
            eos_token: str = '<e>',
            unk_token: str = '<unk>',
            smooothk: float = 1., 
            need_reg: bool = True,
            ignore_unk: bool = True,
        ) -> None:
        """
        Construct an ngram model on some data.

        Parameters
        -------
        data: List[Union[List[str]], str]
            A corpus of list of articles or list of tokenized sentences.
            No need to provide bos/eos token manually.
        n: int
            Length N for N-gram model.
        vocab: Optional[dict]
            Prior vocabulary dictionary.
            If None, use all the vocabulary on the training dataset.
        bos_token: str
            Beginning-of-sentence token.
        eos_token: str
            Ending-of-sentence token.
        unk_token: str
            Unknown word token.
        smoothk: float
            Bayes smoothing constant. Defaults to 1 (Laplacian smoothing).
        need_reg: bool
            Whether perform regularization on dataset 
            (remove punctuation and transform to lower case).
        ignore_unk: bool
            If True, ignore ngrams with unknown words in the training dataset.
            If False, unknown words in training dataset are treated as unk_token.
        """
        self.data = data
        self.n = n
        self.smoothk = smooothk

        def reg(s):
            def strreg(s):
                return ''.join(filter(
                    lambda x: 96 < ord(x) < 123 or x == ' ' or 47 < ord(x) < 58, s.lower()))

            if isinstance(s, str):
                return strreg(s)
            return [strreg(_) for _ in s]

        self.reg = reg

        self.prob = reduce(lambda x, y: SizedDefaultDict(lambda: deepcopy(x)), [0] * (n+1))
        self.bos_token = bos_token
        self.eos_token = eos_token
        self.unk_token = unk_token
        self.ignore_unk = ignore_unk

        self.vocab = vocab
        if self.vocab is None:
            # if no vocabulary is provided, initialize it with the whole dataset
            vocab = set([self.bos_token, self.eos_token, self.unk_token])
        else:
            # write bos/eos/unk tokens to the vocabulary list
            for token in (self.bos_token, self.eos_token, self.unk_token):
                self.vocab[token] = 0

        for article in tqdm(data):
            if isinstance(article, str):
                article = sent_tokenize(article)
            
            for sents in article:
                # train with each sentence
                split_sent = sents.split()

                # add all words to the vocabulary list if not given
                if self.vocab is None:
                    vocab.update(split_sent)

                # pad each sentence with bos and eos tokens
                words = [self.bos_token] * (n-1) + split_sent
                words.append(self.eos_token)

                if not ignore_unk:
                    words = self._mask_unknown_word(words)

                for i in range(max(0, len(words) - n + 1)):
                    if (not ignore_unk) or self._is_known_ngram(words[i:i+n]):
                        box = reduce(lambda x, y: x[y], words[i:i+n-1], self.prob)
                        box[words[i+n-1]] += 1

                        # it is desperately essential to compute the count of (w1,...,w{n-1}) 
                        # by accumulation in advance
                        box._boxsize += 1

        if self.vocab is None:
            self.vocab = vocab

    def _is_known_word(self,
            word: Union[List[str], str]
        ) -> Union[List[bool], bool]:
        """
        Check whether a single word or a list of word is in the vocabulary list.
        When `self.vocab` is None, returns True defaultedly.
        """
        if isinstance(word, str):
            if self.vocab is None:
                return True
            return self.vocab.get(word) is not None
        return [self._is_known_word(w) for w in word]

    def _is_known_ngram(self, 
            ngram: List[str]
        ) -> bool:
        """
        Check whether each word in the ngram is in the vocabulary list.
        When `self.vocab` is None, returns True defaultedly.
        """
        if self.vocab is None:
            return True
        return not any(self.vocab.get(word) is None for word in ngram)

    def _mask_unknown_word(self,
            word: Union[List[str], str]
        ) -> str:
        """
        Convert a single word or a list of word to unk token if it is not 
        in the vocabulary list.
        """
        if isinstance(word, str):
            if self.vocab is not None and self.vocab.get(word) is None:
                return self.unk_token
            return word
        return [self._mask_unknown_word(w) for w in word]

    def get(self, *args, **kwargs) -> Union[float, SizedDefaultDict]:
        """
        Get the probability of some ngram or the frequency dict of the ngram. 
        See details at __getitem__.
        """
        return self.__getitem__(*args, **kwargs)
    
    def __getitem__(self, 
            pos: Union[List[str], str], 
            need_reg: bool = False, 
            need_lower: bool = True
        ) -> Union[float, SizedDefaultDict]:
        """
        Get the probability of kgram. If k == n, return the probability.
        If k < n: return the frequency dict of the kgram.

        Parameters
        -------
        pos: Union[List[str], str]
            The kgram in the form of spaced str or listed strs.    
        need_reg: bool
            Whether perform regularization on the kgram.
        need_lower: bool
            Whether convert the kgram into lower case.
        """

        if isinstance(pos, str):
            if need_reg: pos = self.reg(pos)
            pos = pos.split()
        
        if need_lower:
            pos = [_.lower() for _ in pos]
            
        if len(pos) < self.n:
            return reduce(lambda x, y: x[y], pos, self.prob)
        
        box = reduce(lambda x, y: x[y], pos[:-1], self.prob)

        return (box[pos[-1]] + self.smoothk) /\
                    (box._boxsize + len(self.vocab) * self.smoothk)

    def perplexity(self, 
            sentences: Union[List[str], str], 
            need_reg: bool = False, 
            need_lower: bool = True, 
            verbose: bool = False, 
            avg: bool = True
        ) -> Union[np.ndarray, float]:
        """
        Compute the perplexity of a sentence or multiple sentences. The perplexity 
        is geometrically averaged on multiple sentences.

        Parameters
        -------
        sentences: Union[List[str], str]
            Sentences to compute perplexity on. No need to provide bos/eos token manually.
        need_reg: bool
            Whether perform regularization on each sentence.
        need_lower: bool
            Whether convert each sentence to lower case.
        verbose: bool
            If True, display a tqdm bar.        
        avg: bool
            If False, return all the perplexities on the multiple sentences instead of 
            just the average.
        """
        only_one = isinstance(sentences, str)
        if only_one:
            sentences = [sentences]

        result = []
        verbose = (lambda x: x) if not verbose else tqdm
        n = self.n
        for words in verbose(sentences):
            if need_reg: words = self.reg(words)
            words = [self.bos_token] * (n-1) + words.split()
            if False:# len(words) < n:
                result.append(np.nan)
            else:
                words.append(self.eos_token)

                if not self.ignore_unk:
                    words = self._mask_unknown_word(words)

                p = -np.log([self.__getitem__(words[i:i+n], need_reg = need_reg, need_lower = need_lower)
                            for i in range(max(0, len(words) - n + 1))]).sum()

                # compute the (geometric) mean
                p /= (len(words))
                result.append(np.exp(p))

        if only_one: result = result[0]
    
        result = np.array(result)
        if avg: result = np.exp(np.nanmean(np.log(result)))
        return result

    def gen(self, 
            start: Union[List[str], str], 
            length: int = 10
        ) -> Union[List[str], str]:
        """
        Generate sentences with given starts. Sentences are truncated 
        when encountering eos_token in generation.
        
        Parameters
        --------
        start: Union[List[str], str]
            A single start or multiple starts (for multiple sentence generation).
            No need to provide bos/eos token manually.
        length: int
            Maximum generation length.
        """
        only_one = isinstance(start, str)
        if only_one:
            start = [start]

        sentences = []
        n = self.n
        for words in start:
            sentence = [self.bos_token] * (n-1) + self.reg(words).split()
            buffer_ = sentence[1-n:].copy()
            for _ in range(length):
                box = reduce(lambda x, y: x[y], buffer_, self.prob) if self.n > 1 else self.prob
                if box._boxsize == 0:
                    break
                
                # we do not consider the unknown words / unseen ngrams here
                prob = (np.array(list(box.values())) + 1) / (box._boxsize + len(box))
                next_word = np.random.choice(list(box.keys()), p = prob)

                # pop the first word from and add the new word to the buffer
                buffer_ = buffer_[1:]
                buffer_.append(next_word)
                sentence.append(next_word)
            sentences.append(' '.join(' '.join(sentence).replace(
                        self.bos_token,'').replace(self.eos_token,'').strip().split()))
            
        if only_one:
            sentences = sentences[0]
        return sentences

## Perplexity

假设已知 $w_1,\dotsc,w_{n-1}$, 语言模型预测下一个词是 $w_n$, 定义生成的句子的困惑度 (perplexity) 为

$${\rm PP}=\sqrt[n]{\frac{1}{\mathbb P(w_1,\dotsc,w_{n-1},w_n)}}$$

一定程度上，困惑度越小越好, 说明生成结果的实际出现概率越大。

以三元模型为例，一句话的概率为

$$\mathbb P(w_1,\dotsc,w_{n-1},w_n) = \mathbb P(w_n|w_{n-1},w_{n-2})\cdot \mathbb P(w_{n-1}|w_{n-2},w_{n-3})
\cdot \dotsm \cdot \mathbb P(w_3|w_2,w_1) \cdot \mathbb P(w_2|w_1)\cdot \mathbb P(w_1)$$

注意上式最右边的初始项。

### 填充

可以将每一句话的开头填充 (pad) 足够多用来表示开头的特殊词 “bos” (beginning of sentence) 或 “\<s\>”。以三元模型为例，如果每句话前面加上**两个** “bos”

$$\begin{aligned}\mathbb P(w_1,\dotsc,w_{n-1},w_n) &= \mathbb P(w_n|w_{n-1},w_{n-2})\cdot \mathbb P(w_{n-1}|w_{n-2},w_{n-3})
\cdot \dotsm \cdot \mathbb P(w_3|w_2,w_1) \cdot \mathbb P(w_2|w_1,{\rm bos})\cdot \mathbb P(w_1|{\rm bos}, {\rm bos})
\\ &= \prod_{k=1}^n \mathbb P(w_k|w_{k-1},w_{k-2})\end{aligned}$$

其中 $w_0=w_{-1}={\rm bos}$。形式统一便于写代码处理。

另外，每一句话结尾可以添加特殊词 “eos” (end of sentence) 或 “\<\\s\>”，用来表示句子结束的概率。