In [654]:
text = open('peace.txt', 'r', encoding = 'utf-8').read()[2:]
len(text)

3227579

In [655]:
import string
import re

def preprocess_text(text):
    # making lowercase
    text = text.lower()
    # replacing all punctuation except dots with spaces
    new_text = re.sub('[\!\?#$%\(\)\*\,\-\/\:\;\=\@\[\]\—\‘\’\“\”]', ' ', text)
    # collapsing multiple spaces into one '   ' -> ' '
    clean_text = re.sub('\s\s+', ' ', new_text)
    return clean_text

In [656]:
text = preprocess_text(text)
assert len(text) == 3141169

In [657]:
text = text.split('.')
text = [x.strip() for x in text]
text = list(filter(None, text))

In [658]:
from collections import Counter
import nltk
from sklearn.base import TransformerMixin
from itertools import islice

In [659]:
class BPE(TransformerMixin):
    def __init__(self, vocab_size=100):
        super(BPE, self).__init__()
        self.vocab_size = vocab_size
        # index to token
        self.itos = []
        # token to index
        self.stoi = {}
        
    def fit(self, text):
        """
        fit itos and stoi
        text: list of strings 
        """
        
        # tokenizing text by symbols
        self.itos = list(sorted(set(' '.join(text))))
        for indx, i in enumerate(self.itos):
            self.stoi[i] = indx
            
        encoded_text = []
        
        for line in text:
            arr_line = []
            
            for i in line:
                arr_line.append(self.stoi[i])
            encoded_text.append(arr_line)
            
        text = encoded_text
        
        to_append = self.vocab_size - len(self.itos)
        count = 0
        
        while len(self.itos) < self.vocab_size:
            # counting bigram freqencies in the text
            bigrams = Counter()
            for i in text:
                bigrams.update(zip(i, islice(i, 1, None)))
            
            for indx, i in enumerate(bigrams.most_common(to_append+1)):
                if indx == count:
                    new_token = bigrams.most_common(to_append+1)[indx][0]
                    break

            new_id = len(self.itos)
            
            self.itos.append(new_token)
            self.stoi[new_token] = new_id
    
            count += 1
            
        # finding occurences of the new_token in the text and replace them with new_id
        new_text = []

        for line in text:
            i = 0
            arr = []
            
            while i < len(line):
                if i == len(line) - 1:
                    arr.append(line[i])
                    
                elif (line[i], line[i + 1]) in self.itos:
                    tup = (line[i], line[i + 1])
                    arr.append(self.stoi[tup])
                    i += 1
                    
                else:
                    arr.append(line[i])
                    
                i += 1
                
            new_text.append(arr)
                    
        text = new_text
        return self
    
    def transform(self, text):
        """
        convert text to a sequence of token ids
        text: list of strings
        """
        encoded_text = []
        
        for line in text:
            arr_line = []
            
            for i in line:
                arr_line.append(self.stoi[i])
            encoded_text.append(arr_line)
            
        text = encoded_text    
        
        to_append = self.vocab_size - len(self.itos)
        count = 0
        
        while len(self.itos) < self.vocab_size:
            # counting bigram freqencies in the text
            bigrams = Counter()
            for i in text:
                bigrams.update(zip(i, islice(i, 1, None)))
            
            for indx, i in enumerate(bigrams.most_common(to_append+1)):
                if indx == count:
                    new_token = bigrams.most_common(to_append+1)[indx][0]
                    break

            new_id = len(self.itos)
            
            self.itos.append(new_token)
            self.stoi[new_token] = new_id
    
            count += 1
            
        # finding occurences of the new_token in the text and replace them with new_id
        new_text = []

        for line in text:
            i = 0
            arr = []
            
            while i < len(line):
                if i == len(line) - 1:
                    arr.append(line[i])
                    
                elif (line[i], line[i + 1]) in self.itos:
                    tup = (line[i], line[i + 1])
                    arr.append(self.stoi[tup])
                    i += 1
                    
                else:
                    arr.append(line[i])
                    
                i += 1
                
            new_text.append(arr)
                    
        text = new_text
        return text
    
    def decode_token(self, tok):
        """
        tok: int or tuple
        """
        
        def search(token):
            
            if isinstance(token, int) == True:
                t = self.itos[token]
            
                if isinstance(t, str) == True:
                    return t
            
                else:
                    return search(t)
                
            if isinstance(token, tuple) == True:
                return str(search(token[0]) + search(token[1]))
                    
        return search(tok)
            
    def decode(self, text):
        """
        convert token ids into text
        """
        return ''.join(map(self.decode_token, text))
        
        
vocab_size = 100
bpe = BPE(vocab_size)
tokenized_text = bpe.fit_transform(text)

In [660]:
assert bpe.decode(tokenized_text[0]) == text[0]

In [661]:
import numpy as np
        
    
start_token = vocab_size
end_token = vocab_size + 1
        
    
class LM:
    def __init__(self, vocab_size, delta=1):
        self.delta = delta
        self.vocab_size = vocab_size + 2
        self.proba = Counter()
        
    def infer(self, a, b, tau=1):
        """
        return vector of probabilities of size self.vocab for 3-grams which start with (a,b) tokens
        a: first token id
        b: second token id
        tau: temperature
        """
        result = []
        
        for token in range(self.vocab_size):
            result.append(self.get_proba(a, b, token, tau))
            
        return np.array(result)
        
    def get_proba(self, a, b, c, tau=1):
        """
        get probability of 3-gram (a,b,c)
        a: first token id
        b: second token id
        c: third token id
        tau: temperature
        """
        number = []
        delta = 1
        
        for token in range(self.vocab_size):
            smooth = (self.proba[(a, b, token)] + delta) ** (1 / tau)
            number.append(smooth)
            
        result = ((self.proba[(a, b, c)] + delta) ** (1 / tau)) / sum(number)
        
        return result
    
    def fit(self, text):
        """
        train language model on text
        text: list of lists
        """
        new = []
        for line in text:
            new.append([start_token] + line + [end_token])
        
        three = []
        for line in new:
            for i in range(len(line) - 2):
                three.append((line[i], line[i + 1], line[i + 2]))
        
        self.proba = Counter(three)
        
        return self
    
lm = LM(vocab_size, 1).fit(tokenized_text)

In [725]:
def trigrams(t):
    
    new = [start_token] + t + [end_token]
    
    three = []
    
    for i in range(len(new) - 2):
        three.append((new[i], new[i + 1], new[i + 2]))
            
    proba = Counter(three)
    return proba

In [730]:
from math import exp

def perplexity(snt, lm):
    """
    snt: sequence of token ids
    lm: language model
    """
    proba = trigrams(snt)
    p = 0
    
    for t in proba:
        p += np.log(lm.get_proba(*t))
    
    result = exp(p) ** (-1/n)
    return result

perplexity(tokenized_text[0], lm)

3.741592066940445e+40

In [745]:
#бим серч не рабочий

In [743]:
def beam_search(input_seq, lm, max_len=10, k=5, tau=1):
    """
    generate sequence from language model *lm* conditioned on input_seq
    input_seq: sequence of token ids for conditioning
    lm: language model
    max_len: max generated sequence length
    k: size of beam
    tau: temperature
    """
    
    beam = [(input_seq, 1)]
    
    for i in range(max_len):
        candidates = []
        candidates_proba = []
        
        for snt, snt_proba in beam:
            if snt == end_token:
                continue
                
            else:    
                proba = lm.infer(snt[-2], snt[-1], tau)
                best_k = sorted(enumerate(proba), key=lambda x:x[1], reverse=True)[:k]
                
                for token in best_k:
                    candidates.append(snt + [token])
                
                for token, proba in enumerate(best_k):
                    candidates_proba.append(snt_proba + np.log(proba))
                
        beam = [(candidates[i], candidates_proba[i]) for i in range(k)]
        
    return beam

In [744]:
input1 = 'horse '
input1 = bpe.transform([input1])[0]
result = beam_search(input1, lm, max_len=10, k=10, tau=0.1)
for i in result:
    print(i[0])
    print(bpe.decode(i[0]), i[1])



[19, 93, 30, 58, (34, 0.8547266510634597), (0, 0.00980392156862745), (0, 0.00980392156862745), (0, 0.00980392156862745), (0, 0.00980392156862745), (0, 0.00980392156862745), (0, 0.00980392156862745), (0, 0.00980392156862745), (0, 0.00980392156862745), (0, 0.00980392156862745)]


TypeError: can only concatenate str (not "NoneType") to str