In [2]:
from sacremoses import MosesTokenizer
from collections import deque, defaultdict
import math

In [3]:
train_file_path = "../data/wiki-en-train.word"
test_file_path = "../data/wiki-en-test.word"

with open(train_file_path, "r", encoding="utf-8") as f:
    train_file = [line.splitlines()[0] for line in f.readlines()]
    
with open(test_file_path, "r", encoding="utf-8") as f:
    test_file = [line.splitlines()[0] for line in f.readlines()]

In [19]:
class Ngram:
    def __init__(self, n):
        if not isinstance(n, int):
            raise TypeError
        if n < 1:
            raise ValueError
        self.n = n
        self._trained = False
        self.counts = None
        self.context_counts = None
        self.counts_inv = None
        self.prob = None

        self.mt = MosesTokenizer(lang='en')
        
    def __call__(self, text: str):
        if not self._trained:
            print("Call train before predicting")
            return
        return self.mle_with_entropy(text)
        
    def train(self, corpus):
        """
        Train Ngram with the corpus.
        """
        counts = defaultdict(lambda: defaultdict(int))
        counts_inv = defaultdict(lambda: defaultdict(int))
        
        for line in corpus:
            tokens = deque(self.mt.tokenize(line))
            if self.n == 1:
                tokens.appendleft("<s>")
                tokens.append("</s>")
            else:
                for _ in range(self.n-1):
                    tokens.appendleft("<s>")
                    tokens.append("</s>")
            tokens = list(tokens)
            for i in range(len(tokens)-self.n+1):
                counts[" ".join(tokens[i:i+self.n-1])][tokens[i:i+self.n][-1]] += 1
                counts_inv[tokens[i:i+self.n][-1]][" ".join(tokens[i:i+self.n-1])] += 1
            self.counts = counts
            self.counts_inv = counts_inv
            
        self.prob = self.kneser_ney_smoothing()
        self._trained = True
    
    def kneser_ney_smoothing(self, d=0.75):
        prob = defaultdict(lambda: defaultdict(int))
        num_ngram_types = len([context for context_count in self.counts_inv.values() for context in context_count.keys()])
        for context, word_count in self.counts.items():
            context_count = sum([count for count in word_count.values()])
            num_context = sum([count for count in word_count.values()])
            for word, count in word_count.items():
                lmd = d * len(word_count) / num_context
                p_cont = len(self.counts_inv[word]) / num_ngram_types
                prob[context][word] = max(count-d, 0) / context_count + lmd * p_cont
        return prob

    def mle(self, text: str, return_tokens=False) -> float:
        prob = 0
        tokens = deque(self.mt.tokenize(text))
        if self.n == 1:
            tokens.appendleft("<s>")
            tokens.append("</s>")
        else:
            for _ in range(self.n-1):
                tokens.appendleft("<s>")
                tokens.append("</s>")
        tokens = list(tokens)
        for i in range(len(tokens)-self.n+1):
            p = self.prob[" ".join(tokens[i:i+self.n-1])][tokens[i:i+self.n][-1]]
            if p == 0:
                if return_tokens:
                    return 0, tokens
                else:
                    return 0
            else:
                prob += math.log(p)
        if return_tokens:
            return math.exp(prob), tokens
        else:
            return math.exp(prob)
    
    def entropy(self, prob, N) -> float:
        return - math.log(prob) / N
    
    def mle_with_entropy(self, text: str) -> float:
        prob = 0
        tokens = deque(self.mt.tokenize(text))
        if self.n == 1:
            tokens.appendleft("<s>")
            tokens.append("</s>")
        else:
            for _ in range(self.n-1):
                tokens.appendleft("<s>")
                tokens.append("</s>")
        tokens = list(tokens)
        for i in range(len(tokens)-self.n+1):
            p = self.prob[" ".join(tokens[i:i+self.n-1])][tokens[i:i+self.n][-1]]
            if p == 0:
                continue
            else:
                prob += math.log(p)
        return - prob / len(tokens)

In [26]:
for n in [2, 3, 5]:
    ngram = Ngram(n)
    ngram.train(train_file)
    entropy_list = [ngram(line) for line in test_file]
    print(f"The mean of entropy when {n}-gram:", sum(entropy_list) / len(entropy_list))

The mean of entropy when 2-gram: 1.3907674549577471
The mean of entropy when 3-gram: 0.3258129953501907
The mean of entropy when 5-gram: 0.16083540294889437
