# Assignment 1

Using text http://www.gutenberg.org/files/2600/2600-0.txt
1. Make text lowercase and remove all punctuation except spaces and dots.
2. Tokenize text by BPE with vocab_size = 100
3. Train 3-gram language model with laplace smoothing $\delta=1$
4. Using beam search with k=10 generate sequences of length=10 conditioned on provided inputs. Treat dots as terminal tokens.
5. Calculate perplexity of the language model for the first sentence.

In [0]:
from google.colab import drive
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [0]:
import os
os.chdir('gdrive/My Drive/Colab Notebooks')

In [0]:
text = open('peace.txt', 'r').read()[2:]
len(text)

3227579

In [0]:
import string
import re

def preprocess_text(text):
    # TODO
    # make lowercase
    # replace all punctuation except dots with spaces
    text = re.sub('~|`|!|"|@|#|\$|%|&|\'|”|“|—|‘|’|\(|\)|\\|^|_|{|\||}|\*|\+|,|-|/|:|;|<|=|>|\?|\]|\[', ' ', text).lower()
    # collapse multiple spaces into one '   ' -> ' '
    text = re.sub('\s+', ' ', text)
    return text

text = preprocess_text(text)
assert len(text) == 3141169

In [0]:
text = text.split('.')
text = [x.strip() for x in text]

In [0]:
text[0]

'the project gutenberg ebook of war and peace by leo tolstoy this ebook is for the use of anyone anywhere at no cost and with almost no restrictions whatsoever'

In [0]:
from collections import Counter
import nltk
from sklearn.base import TransformerMixin
import copy


class BPE(TransformerMixin):
    def __init__(self, vocab_size=100):
        super(BPE, self).__init__()
        self.vocab_size = vocab_size
        # index to token
        self.itos = []
        # token to index
        self.stoi = {}
        
    def fit(self, text):
        """
        fit itos and stoi
        text: list of strings 
        """
        
        # tokenize text by symbols and fill in self.itos and self.stoi
        
        all_uniq_letters = list(set(''.join(text)))
        self.itos = all_uniq_letters
        
        d = {}
        for i in range(len(all_uniq_letters)):
          letter = all_uniq_letters[i]
          d[letter] = i
        self.stoi = d
        
        mas = []
        for i in range(len(text)):
          mas_dop = []
          for j in text[i]:
            mas_dop.append(self.stoi[j])
          mas.append(mas_dop)
        text = mas
        
        while len(self.itos) < self.vocab_size:
            # TODO
            # count bigram freqencies in the text
            bigrams = Counter()
            for i in range(len(text)):
              bigrams.update((x, y) for x, y in zip(*[text[i][j:] for j in range(2)]))
            
            top = bigrams.most_common(1)
            new_token = top[0][0]# most common bigram in the text
            new_id = len(self.itos)
            
            self.itos.append(new_token)
            self.stoi[new_token] = new_id
            
            # find occurences of the new_token in the text and replace them with new_id
            new_text = text
            for i, s in enumerate(text):
              k = 0
              for j, (q, p) in enumerate(zip(s[:-1], s[1:])):
                if (q, p) == new_token:
                  new_text[i][j-k] = new_id
                  del new_text[i][j-k+1]
                  k += 1
            text = new_text
            
        return self
    
    def transform(self, text):
        """
        convert text to a sequence of token ids
        text: list of strings
        """
        new_text = []
        for sent in text:
          ms_dop = []
          for symbol in sent:
            if symbol in self.itos:
              symbol = self.stoi[symbol]
              ms_dop.append(symbol)
          new_text.append(ms_dop)
        text = new_text
        
        for token_id, token in enumerate(self.itos):
          new_text_2 = text
          for i, s in enumerate(text):
            k = 0
            for j, (q, p) in enumerate(zip(s[:-1], s[1:])):
              if (q, p) == token:
                new_text_2[i][j-k] = token_id
                del new_text_2[i][j-k+1]
                k += 1
          text = new_text_2
          
        return text
    
    def decode_token(self, tok):
        """
        tok: int or tuple
        """
        
        def rec_func(token):
          if type(token) == int:
            content = self.itos[token]
            if type(content) == str:
              result = content
            else:
              result = rec_func(content)
          if type(token) == tuple:
            result = rec_func(token[0]) + rec_func(token[1])
          return result
        
        result = rec_func(tok)
        return result
            
    def decode(self, text):
        """
        convert token ids into text
        """
        return ''.join(map(self.decode_token, text))
        
        
vocab_size = 100
bpe = BPE(vocab_size)
tokenized_text = bpe.fit_transform(text)

In [0]:
all_uniq_letters_TEST = list(set(''.join(text)))

In [0]:
d_TEST = {}
for i in range(len(all_uniq_letters_TEST)):
  letter = all_uniq_letters_TEST[i]
  d_TEST[letter] = i

In [0]:
mas_TEST = []
for i in range(len(text)):
  mas_dop = []
  for j in text[i]:
    mas_dop.append(d_TEST[j])
  mas_TEST.append(mas_dop)

In [0]:
#mas_TEST[0]

In [0]:
bigrams_TEST = Counter()
for i in range(len(text)):
  bigrams_TEST.update((x, y) for x, y in zip(*[text[i][j:] for j in range(2)]))

In [0]:
#bigrams_TEST

In [0]:
assert bpe.decode(tokenized_text[0]) == text[0]

In [0]:
import numpy as np
        
    
start_token = vocab_size
end_token = vocab_size + 1
        
    
class LM:
    def __init__(self, vocab_size, delta=1):
        self.delta = delta
        self.vocab_size = vocab_size + 2
        self.proba = Counter()# TODO create array for storing 3-gram counters
        
    def infer(self, a, b, tau=1):
        """
        return vector of probabilities of size self.vocab for 3-grams which start with (a,b) tokens
        a: first token id
        b: second token id
        tau: temperature
        """
        m = []
        for i in range(self.vocab_size):
          m.append(self.get_proba(a,b,i))
        result = m
        
        return result
        
    def get_proba(self, a, b, c, tau=1):
        """
        get probability of 3-gram (a,b,c)
        a: first token id
        b: second token id
        c: third token id
        tau: temperature
        """

        delta = 1
        smooth = (self.proba[(a, b, c)] + delta) ** (1/tau)
        
        mas  = []
        for i in range(self.vocab_size):
          mas.append((self.proba[(a, b, i)] + delta) ** (1/tau))
        
        result = smooth / sum(mas) # TODO approximate probability by counters
        
        return result
    
    def fit(self, text):
        """
        train language model on text
        text: list of lists
        """
        ms = []
        for t in text:
          s = [start_token] + t + [end_token]
          ms.append(s)
          
        trigrams = Counter()
        for i in range(len(ms)):
          trigrams.update((x, y, z) for x, y, z in zip(*[ms[i][j:] for j in range(3)]))
        
        self.proba = trigrams# TODO count 3-grams in the text
        
        return self
    
lm = LM(vocab_size, 1).fit(tokenized_text)

In [0]:
from math import log

def beam_search(input_seq, lm, max_len=10, k=5, tau=1):
    """
    generate sequence from language model *lm* conditioned on input_seq
    input_seq: sequence of token ids for conditioning
    lm: language model
    max_len: max generated sequence length
    k: size of beam
    tau: temperature
    """
    
    input_seq_new = [start_token] + input_seq
    x, y = input_seq_new[-2:]
    beam = [] # TODO store in beam tuples of current sequences and their log probabilities
    m = lm.infer(x, y, tau)
    for token, proba in sorted(enumerate(m), key=lambda x:x[1], reverse=True)[:k]:
      beam.append(([y, token], proba))
    
    for i in range(max_len):
        candidates = []
        candidates_proba = []
        for snt, snt_proba in beam:
            if snt[-1] == end_token: # TODO process terminal token
              continue
            else:
              a, b = snt[-2:]
              proba = lm.infer(a, b, tau) # probability vector of the next token
              best_k = sorted(enumerate(proba), key=lambda x:x[1], reverse=True)[:k] # top-k most probable tokens
              
              # TODO update candidates' sequences and corresponding probabilities
              mas_dop_1 = []
              mas_dop_2 = []
              for token, proba in best_k:
                mas_dop_1.append(snt+[token])
                mas_dop_2.append(snt_proba + log(proba))
              candidates += mas_dop_1
              candidates_proba += mas_dop_2
        
        beam = []# select top-k most probable sequences from candidates
        for idx, prob in sorted(enumerate(candidates_proba), key=lambda x:x[1], reverse=True):
            beam.append((candidates[idx], proba))
    result = []
    for i, prob in beam:
      result.append((i[1:], exp(prob)))
    return result
    

 Следующие блоки с примерами долго работали, так ничего и не удалось вывести

In [0]:
input1 = 'horse '
input1 = bpe.transform([input1])[0]
result = beam_search(input1, lm, max_len=10, k=10, tau=0.1)
# TODO print decoded generated strings and their probabilities
for sent, proba in result:
  print('sent: ', bpe.decode(sent))
  print('probability: ', proba, '\n')

In [0]:
input1 = 'her'
input1 = bpe.transform([input1])[0]
result = beam_search(input1, lm, max_len=10, k=10, tau=0.1)
# TODO print decoded generated strings and their probabilities
for sent, proba in result:
  print('sent: ', bpe.decode(sent))
  print('probability: ', proba, '\n')

In [0]:
input1 = 'what'
input1 = bpe.transform([input1])[0]
result = beam_search(input1, lm, max_len=10, k=10, tau=1)
# TODO print decoded generated strings and their probabilities
for sent, proba in result:
  print('sent: ', bpe.decode(sent))
  print('probability: ', proba, '\n')

In [0]:
input1 = 'gun '
input1 = bpe.transform([input1])[0]
result = beam_search(input1, lm, max_len=10, k=10, tau=0.1)
# TODO print decoded generated strings and their probabilities
for sent, proba in result:
  print('sent: ', bpe.decode(sent))
  print('probability: ', proba, '\n')