In [8]:
from abc import ABC
# import tkr

In [9]:
def setup_estimator(self):
    Nr = {}
    for _, freq in self.ngram_freq.items():
        if freq in Nr:
            Nr[freq] += 1
        else:
            Nr[freq] = 1

    Zr = {}
    Zr[1] = Nr[1]

    Nr_items = sorted(Nr.items())
    # print(Nr_items)

    for i, thing in enumerate(Nr_items[1:-1], start=1):
        Zr[thing[0]] = (thing[1] * 2) / (Nr_items[i+1][0] - Nr_items[i-1][0])

    # print(sorted(Zr.items()))
    X = [[log(i[0])] for i in Zr.items()]
    y = [log(i[1]) for i in Zr.items()]

    md = LinearRegression()
    md.fit(X, y)

    self.Zr = Zr
    self.md = md

    # For each value in Zr, compute Turing Estimate directly, and also using the model
    # Compute the variance using the known Zr values
    swap_value = 0

    estimates = []

    for r in range(1, max(Zr)):
        try:
            turing_estimate = (r + 1) * (Nr[r+1]) / Nr[r]

            estimates.append(turing_estimate)

            pred_logs = md.predict([[log(r+1)], [log(r)]])
            pred_estimate = (r + 1) * (exp(pred_logs[0])) / exp(pred_logs[1])

            variance = ((r + 1) ** 2) * (Nr[r+1] / (Nr[r] ** 2)) * (1 + ((Nr[r+1] / (Nr[r]))))

            if abs(turing_estimate - pred_estimate) < GT_CONFIDENCE * (variance ** 0.5):
                swap_value = r
                break

        except KeyError:
            swap_value = r
            break

    # print(swap_value)

    for r in range(swap_value, max(Zr)):
        pred_logs = md.predict([[log(r+1)], [log(r)]])
        pred_estimate = (r + 1) * (exp(pred_logs[0])) / exp(pred_logs[1])

        estimates.append(pred_estimate)

    # self.swap_value = swap_value
    # print(len(estimates))
    # print(Nr_items)
    self.estimates = estimates

In [10]:
class LM_Base(ABC):
    def __init__(self, N, text, tokenizer_function=tkr.tokenize_string):
        self.N = N
        self.tokenizer_function = tokenizer_function
        self.document = self.tokenizer_function(text)

        self.train_doc = None
        self.test_doc = None

    def fit(self):
        """
        Given the document, compute the ngram_frequencies

        """
        # self.document = self.tokenizer_function(text)
        # For each token in the document, add N of the tokens to a tuple

        if self.train_doc is not None:
            # Use train_doc, else use the entire document.
            self.document = self.train_doc


        ngrams = []
        for sentence in self.document:

            sentence.insert(0, '<s>')
            sentence.append('</s>')

            for i in range(len(sentence)):
                n_window = sentence[i:i+self.N]
                if len(n_window) == self.N:
                    ngrams.append(tuple(n_window))

        ngram_freq = {}
        for ngram in ngrams:
            if ngram in ngram_freq:
                ngram_freq[ngram] += 1
            else:
                ngram_freq[ngram] = 1

        self.ngrams = ngrams
        self.ngram_freq = ngram_freq

    @abstractmethod
    def ngram_estimator(self, ngram):
        pass

    @abstractmethod
    def setup_estimator(self):
        pass



    def train_test_split(self, test_size=1000, seed=None):
        if self.document is None:
            raise Exception('Model not fitted yet')
        else:
            if seed is not None:
                random.seed(seed)
            random.shuffle(self.document)

            self.train_doc = self.document[:-test_size]
            self.test_doc = self.document[-test_size:]

        return self.train_doc, self.test_doc

    def perplexity(self, sentence):
        if self.document is None:
            raise Exception('Model not fitted yet')

        if type(sentence) == str:  # Tokenize and add beginning, end of sentence tags.
            sents = self.tokenizer_function(sentence)
            for st in sents:
                st.insert(0, '<s>')
                st.append('</s>')

        elif type(sentence) == list:  # If sentence is already tokenized.
            sents = sentence

        n = len(sents[0])

        pred_score = self.predict(sentence)
        return pred_score ** (-1/n)


    def predict(self, sentence):
        if type(sentence) == str:  # Tokenize and add beginning, end of sentence tags.
            sents = self.tokenizer_function(sentence)
            for st in sents:
                st.insert(0, '<s>')
                st.append('</s>')

        elif type(sentence) == list:  # If sentence is already tokenized.
            sents = sentence

        else:
            raise ValueError("Sentence must be a string or a list of tokens.")

        sent_ngrams = []
        for sentence in sents:
            for i in range(len(sentence)):
                n_window = sentence[i:i+self.N]
                if len(n_window) == self.N:
                    sent_ngrams.append(tuple(n_window))

        sent_prob = 1
        for ngram in sent_ngrams:
            sent_prob *= self.ngram_estimator(ngram)
            # print(sent_prob, self.ngram_freq.get(ngram, 0))

        return sent_prob

    def generate(self, sentence, k:int):
        if type(sentence) == str:  # Tokenize and add beginning, end of sentence tags.
            sents = self.tokenizer_function(sentence)
            for st in sents:
                st.insert(0, '<s>')
                st.append('</s>')

        elif type(sentence) == list:  # If sentence is already tokenized.
            sents = sentence

        else:
            raise ValueError("Sentence must be a string or a list of tokens.")

        last_sent = sents[-1]
        last_window = last_sent[- self.N :-1]

        possible_ngrams = []
        for ngram in self.ngrams:
            for i, token in enumerate(last_window):
                if ngram[i] != token:
                    break
            else:
                if ngram not in possible_ngrams:
                    possible_ngrams.append(ngram)

        outputs = {ng: self.predict([' '.join(ng)]) / self.predict(' '.join(last_sent[- self.N:-1])) for ng in possible_ngrams}
        # for ng in possible_ngrams:
        #    print(ng, self.predict([' '.join(ng)]), self.predict(' '.join(last_sent[- self.N:-1])))
        # print(sorted(outputs.items(), key=lambda x: x[1], reverse=True))
        return sorted(outputs.items(), key=lambda x: x[1], reverse=True)[:k]

NameError: name 'tkr' is not defined

In [None]:
class LM_Interpolation(LM_Base):
    def __init__(self, N, text, tokenizer_function=tkr.tokenize_string):
        super().__init__(N, text, tokenizer_function)

        self.all_estimators = None
        self.lambdas = None

        unigrams = []
        for sentence in self.document:
            for i in range(len(sentence)):
                n_window = sentence[i:i+1]
                if len(n_window) == 1:
                    unigrams.append(tuple(n_window))

        unigram_freq = {}
        for unigram in unigrams:
            if unigram not in unigram_freq:
                unigram_freq[unigram] = 1
            else:
                unigram_freq[unigram] += 1

        freq_of_freqs = {}
        for n, c in unigram_freq.items():
            if c not in freq_of_freqs:
                freq_of_freqs[c] = 1
            else:
                freq_of_freqs[c] += 1

        held_out_vocabulary = []
        for ng in unigram_freq:
            if unigram_freq[ng] == 1 and random.randint(1, 10) > 7:
                held_out_vocabulary.append(ng[0])

        for i in range(len(self.document)):
            for j in range(len(self.document[i])):
                if self.document[i][j] in held_out_vocabulary:
                    self.document[i][j] = '<UNK>'

        self.held_out_vocabulary = held_out_vocabulary

    def ngram_estimator(self, ngram):
        if self.ngrams is None:
            raise Exception('Model not fitted yet.')

        cleaned_ngram = []
        for token in ngram:
            if token in self.held_out_vocabulary:
                cleaned_ngram.append('<UNK>')
            if self.all_estimators[1].get((token, )) is None:
                cleaned_ngram.append('<UNK>')
            else:
                cleaned_ngram.append(token)
        cleaned_ngram = tuple(cleaned_ngram)

        if self.N != 3:
            raise NotImplementedError("Interpolation model only implemented for trigrams.")

        try:
            p3 = self.all_estimators[3].get(cleaned_ngram, 0) / self.all_estimators[2].get(cleaned_ngram[:-1], 0)
        except ZeroDivisionError:
            p3 = 0

        try:
            p2 = self.all_estimators[2].get(cleaned_ngram[1:],0) / self.all_estimators[1].get(cleaned_ngram[1:-1], 0)
        except ZeroDivisionError:
            p2 = 0

        try:
            p1 = self.all_estimators[1].get(cleaned_ngram[-1:], 0) / sum([v for k,v in self.all_estimators[1].items()])
        except ZeroDivisionError:
            p1 = 0

        p = (self.lambdas[1] * p1) + (self.lambdas[2] * p2) + (self.lambdas[3] * p3)

        # print(p)

        return (self.lambdas[1] * p1) + (self.lambdas[2] * p2) + (self.lambdas[3] * p3)


    def setup_estimator(self):
        if self.ngrams is None:
            raise Exception('Model not fitted yet.')

        if self.train_doc is not None:
            self.document = self.train_doc

        self.all_estimators = {}
        self.all_estimators[self.N] = self.ngram_freq
        # Count for all smaller windows
        for window in range(1, self.N):
            ngrams = []
            for sentence in self.document:
                for i in range(len(sentence)):
                    n_window = sentence[i:i+window]
                    if len(n_window) == window:
                        ngrams.append(tuple(n_window))

            ngram_freq = {}
            for ngram in ngrams:
                if ngram in ngram_freq:
                    ngram_freq[ngram] += 1
                else:
                    ngram_freq[ngram] = 1

            self.all_estimators[window] = ngram_freq

        if self.N != 3:
            raise NotImplementedError("Interpolation works for N=3, not others.")

        lambdas = {i:0 for i in range(1, self.N+1)}
        normalizing_factor = 0

        for ng in self.ngram_freq:
            try:
                l3_checker = (self.all_estimators[3][ng] - 1) / (self.all_estimators[2][ng[:2]] - 1)
            except ZeroDivisionError:
                l3_checker = 0

            try:
                l2_checker = (self.all_estimators[2][ng[1:]] - 1) / (self.all_estimators[1][ng[1:2]] - 1)
            except ZeroDivisionError:
                l2_checker = 0

            try:
                l1_checker = (self.all_estimators[1][ng[-1:]] - 1) / (len(self.ngrams) - 1)
            except ZeroDivisionError:
                l1_checker = 0


            max_check = max([l1_checker, l2_checker, l3_checker])

            if max_check == l3_checker:
                lambdas[3] += self.ngram_freq[ng]

            elif max_check == l2_checker:
                lambdas[2] += self.ngram_freq[ng]

            else:
                lambdas[1] += self.ngram_freq[ng]

            normalizing_factor += self.ngram_freq[ng]

        lambdas = {k: lambdas[k] / normalizing_factor for k in lambdas}

        self.lambdas = lambdas

NameError: name 'LM_Base' is not defined