<a href="https://colab.research.google.com/github/flying-bear/kompluxternaya/blob/master/assignment_1.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>


# Assignment 1

Using text http://www.gutenberg.org/files/2600/2600-0.txt
1. Make text lowercase and remove all punctuation except spaces and dots.
2. Tokenize text by BPE with vocab_size = 100
3. Train 3-gram language model with laplace smoothing $\delta=1$
4. Using beam search with k=10 generate sequences of length=10 conditioned on provided inputs. Treat dots as terminal tokens.
5. Calculate perplexity of the language model for the first sentence.

In [0]:
import nltk
import numpy as np
import re

from collections import Counter
from google.colab import drive
from sklearn.base import TransformerMixin

In [82]:
drive.mount('/content/gdrive')

Drive already mounted at /content/gdrive; to attempt to forcibly remount, call drive.mount("/content/gdrive", force_remount=True).


In [87]:
root_path = 'gdrive/My Drive/studies/HSE/prog/kompluxternaya'
with open(root_path+'/'+'peace.txt', 'r', encoding='utf-8') as f:
  text = f.read()[2:]
print(len(text))

3227579


In [0]:
def preprocess_text(text):
    # TODO
    # make lowercase
    text = text.lower()
    # replace all punctuation except dots with spaces
    pattern = '\]|!|"|#|\$|%|&|\'|\(|\)|\*|\+|,|-|/|:|;|<|=|>|\?|@|\[|\\|^|_|`|{|\||}|~|”|“|—|‘|’'
    text = re.sub(pattern, ' ', text)
    # collapse multiple spaces into one '   ' -> ' '
    text = re.sub('\s+', ' ', text)
    return text

text = preprocess_text(text)
assert len(text) == 3141169

In [0]:
text = text.split('.')
text = [x.strip() for x in text]

In [0]:
def list_to_bigrams(l, bigrams=Counter()):
  """
  transforms a list of lists into a counter of their pairs (tuples) across the lists
  l: a list of lists of integers or strings
  """
  for i in range(len(l)):
    bigrams.update((x, y) for x, y in zip(*[l[i][j:] for j in range(2)]))
  return bigrams

In [0]:
def update_token(text, token, token_id):
  """
  modifies text, all instances of the token (element pairs) in text are replaced by token_id
  text: list of lists of integers
  token: integer tuple
  token_id: the id of the given token, integer
  """
  text_new = [inner_list[:] for inner_list in text]
  for i, sent in enumerate(text):
    deletions = 0
    for j, (v, w) in enumerate(zip(sent[:-1], sent[1:])):
      if (v, w) == token:
        text_new[i][j-deletions] = token_id
        del text_new[i][j-deletions+1]
        deletions += 1
  return text_new

In [8]:
t = ['бля бля бл', ' ', 'бля']
test_itos = list(set(''.join(t))) # list letters
test_stoi = dict(zip(test_itos, range(len(test_itos)))) # dict of letters and their ids
t = [[test_stoi[x] for x in t[i]] for i in range(len(t))] # replace letters with thir ids (str to list)
test_vocab_size = 12
while len(test_itos) < test_vocab_size:
    bigrams = list_to_bigrams(t) # get text bigrams
    if bigrams.most_common(1):
      new_token = bigrams.most_common(1)[0][0] # find most common bigram
      new_id = len(test_itos)
      test_itos.append(new_token)
      test_stoi[new_token] = new_id
      # find occurences of the new_token in the text and replace them with new_id
      t = update_token(t, new_token, new_id)
    else:
      break
print(t)
print(test_itos)

[[10, 10, 4], [0], [7]]
[' ', 'б', 'я', 'л', (1, 3), (1, 3), (2, 0), (4, 2), (4, 2), (4, 2), (4, 6), (4, 6)]


In [9]:
def recursive_token_lookup(tok, itos): #tok int or tuple
  if type(tok) == int:
    content = itos[tok]
    if type(content) == str:
      return content
    else:
      return recursive_token_lookup(content, itos)
  elif type(tok) == tuple:
    return recursive_token_lookup(tok[0], itos) + recursive_token_lookup(tok[1], itos)
  
recursive_token_lookup(7, test_itos)

'бля'

In [0]:
class BPE(TransformerMixin):
    def __init__(self, vocab_size=100):
        super(BPE, self).__init__()
        self.vocab_size = vocab_size
        # index to token
        self.itos = []
        # token to index
        self.stoi = {}

    def fit(self, text):
        """
        fits 
        - self.itos (a list of strings (symbols) and integer tuples (most frequent bigrams while vocabulary does not exceed vocab_size
                      each symbol/bigram is indexed by its place in the list) )
        - self.stoi (a dict of symbols and bigrams to thir indicies in the self.stoi list)
        - text (symbols replaced by thir ids, bigrams of ids replaced by their ids
        text: list of strings
        """
        # tokenize text by symbols and fill in self.itos and self.stoi
        self.itos = list(set(''.join(text))) # list letters
        self.stoi = dict(zip(self.itos, range(len(self.itos)))) # dict of letters and their ids
        text = [[self.stoi[x] for x in text[i]] for i in range(len(text))] # replace letters with thir ids (str to list)
        
        while len(self.itos) < self.vocab_size:
            bigrams = list_to_bigrams(text) # get text bigrams
            if bigrams.most_common(1):
              new_token = bigrams.most_common(1)[0][0] # find most common bigram
              new_id = len(self.itos)
              self.itos.append(new_token)
              self.stoi[new_token] = new_id
              # find occurences of the new_token in the text and replace them with new_id
              text = update_token(text, new_token, new_id)
            else:
              break
        return self
    
    def transform(self, text):
        """
        convert text to a sequence of symbol ids then replaces bigrams of ids with their indicies in self.stoi
        text: list of strings
        """
        text_in_vocabulary = [[symbol for symbol in sent if symbol in self.itos] for sent in text] # exclude out of vocabulary symbols
        text = [[self.stoi[letter] for letter in sent] for sent in text_in_vocabulary] # tokenize text by symbols using self.stoi
        for token_id, token in enumerate(self.itos): # find occurences of a complex token in the text and replace them with token_id
            text = update_token(text, token, token_id)    
        return text
    
    def decode_token(self, tok):
        """
        returns a text coded by the tok
        tok: either an int - id, or a tuple - pair of ids)
        """
        def recursive_token_lookup(token): #token int or tuple
          if type(token) == int:
            content = self.itos[token]
            if type(content) == str:
              return content # only returns strings
            else:
              return recursive_token_lookup(content) # continue lookup on the tuple that was found
          elif type(token) == tuple:
            return recursive_token_lookup(token[0]) + recursive_token_lookup(token[1]) # concatenate string results
        return recursive_token_lookup(tok)
            
    def decode(self, text):
        """
        convert token ids into text
        text: tokenized text, list of lists of integers
        """
        return ''.join(map(self.decode_token, text))
        
        
vocab_size = 100
bpe = BPE(vocab_size)
tokenized_text = bpe.fit_transform(text)

In [11]:
test_text = ['бля бля бл', ' ', 'бля']
test_vocab_size = 12
test_bpe = BPE(test_vocab_size)
test_tokenized_text = test_bpe.fit_transform(test_text)
test_tokenized_text
test_bpe.decode(test_tokenized_text[0])


'бля бля бл'

In [0]:
assert bpe.decode(tokenized_text[0]) == text[0]

In [0]:
def list_to_trigrams(l, trigrams=Counter()):
  """
  transforms a list of lists into a counter of their pairs (tuples) across the lists
  l: a list of lists of integers or strings
  """
  for i in range(len(l)):
    trigrams.update((x, y, z) for x, y, z in zip(*[l[i][j:] for j in range(3)]))
  return trigrams

In [0]:
def smooth(count, delta=1, tau=1):
  return (count + delta) ** (1 / tau)

In [70]:
test_counts = list_to_trigrams(test_tokenized_text)
unsmoothed = [test_counts[(0, 1, x)] for x in range(len(set([item for sublist in test_l for item in sublist])))]
smoothed = [smooth(test_counts[(0, 1, x)]) for x in range(len(set([item for sublist in test_l for item in sublist])))]
print('non-smooth:')
print(unsmoothed)
print('____')
print('smooth:')
print(smoothed)

non-smooth:
[0, 0, 2, 20]
____
smooth:
[1.0, 1.0, 3.0, 21.0]


In [0]:
start_token = vocab_size
end_token = vocab_size + 1
        
    
class LM:
    def __init__(self, vocab_size, delta=1):
        self.delta = delta
        self.vocab_size = vocab_size + 2
        self.counts = Counter() # TODO create array for storing 3-gram counts (empty 3-gram counter)

    def fit(self, text):
        """
        train language model on text
        text: list of lists
        """
        text = [[start_token]+sent+[end_token] for sent in text]
        self.counts = list_to_trigrams(text, self.counts) # count 3-grams in the text  
        return self
        
    def get_proba(self, a, b, c, tau=1):
        """
        get Laplace smoothed probability of 3-gram (a,b,c)
        a: first token id
        b: second token id
        c: third token id
        tau: temperature
        """
        trigram = (a, b, c)
        counts_abx = [smooth(self.counts[(a, b, x)]) for x in range(self.vocab_size)] # smooth counts for every possible trigram starting with given bigram
        result = smooth(self.counts[trigram]) / sum(counts_abx) # approximate probability by counters
        return result
            
    def infer(self, a, b, tau=1):
        """
        return vector of probabilities of size self.vocab_size for 3-grams which start with (a,b) tokens
        a: first token id
        b: second token id
        tau: temperature
        """
        result = [self.get_proba(a,b,x, tau) for x in range(self.vocab_size)]
        return result
    
lm = LM(vocab_size, 1).fit(tokenized_text)

In [76]:
l = [[0,1,2,3], [3,1,2,0,4], [4,1,2,3,2,3,1,2,3,2]]
test_lm = LM(len(set([item for sublist in l for item in sublist])), 1).fit(l)
test_lm.infer(1,2)

[0.18181818181818182,
 0.09090909090909091,
 0.09090909090909091,
 0.36363636363636365,
 0.09090909090909091,
 0.09090909090909091,
 0.09090909090909091]

In [80]:
test_l = test_tokenized_text
real_test_lm = LM(len(set([item for sublist in test_l for item in sublist])), 1).fit(test_l)
real_test_lm.infer(0,1)

[0.125, 0.125, 0.125, 0.375, 0.125, 0.125]

In [78]:
lm.infer(39,3)[10:20]

[7.927699381639449e-05,
 0.009671793245600127,
 7.927699381639449e-05,
 7.927699381639449e-05,
 0.00015855398763278897,
 7.927699381639449e-05,
 7.927699381639449e-05,
 7.927699381639449e-05,
 7.927699381639449e-05,
 0.1277152370382115]

In [0]:
def enum_sort_tuple(arr):
  """
  makes list of tuples of ids and their values in the decreasing order of the values from the list input
  arr: flat list
  """
  return sorted(enumerate(arr), key=lambda x:x[1], reverse=True)

In [51]:
enum_sort_tuple([1, 2, 3, 1, 31, 0])

[(4, 31), (2, 3), (1, 2), (0, 1), (3, 1), (5, 0)]

In [52]:
def beam_search(input_seq, lm, max_len=10, k=5, tau=1):
    """
    generate prediction list from language model *lm* conditioned on input_seq
    input_seq: list of token ids for conditioning
    lm: language model
    max_len: max generated list length
    k: size of beam
    tau: temperature
    """
    input_seq = [start_token] + input_seq
    beam = [] # store in beam tuples of current sequences and their log probabilities
    
    for i in range(max_len):
        candidates = []
        candidates_proba = []
        for snt, snt_proba in beam:
            if snt[-1] == end_token:# TODO process terminal token 
              continue
            else:
              a, b = snt[-2:]
              proba = lm.infer(a, b, tau) # probability vector of the next token
              best_k = # top-k most probable tokens
                # TODO update candidates' sequences and corresponding probabilities
                
        beam = # select top-k most probable sequences from candidates
    return beam
    

SyntaxError: ignored

In [0]:
input1 = 'horse '
input1 = bpe.transform([input1])[0]
result = beam_search(input1, lm, max_len=10, k=10, tau=0.1)
# TODO print decoded generated strings and their probabilities
    

In [0]:
input1 = 'her'
input1 = bpe.transform([input1])[0]
result = beam_search(input1, lm, max_len=10, k=10, tau=0.1)
# TODO print decoded generated strings and their probabilities

In [0]:
input1 = 'what'
input1 = bpe.transform([input1])[0]
result = beam_search(input1, lm, max_len=10, k=10, tau=1)
# TODO print decoded generated strings and their probabilities

In [0]:
input1 = 'gun '
input1 = bpe.transform([input1])[0]
result = beam_search(input1, lm, max_len=10, k=10, tau=0.1)
# TODO print decoded generated strings and their probabilities

In [0]:
def perplexity(snt, lm):
    """
    snt: sequence of token ids
    lm: language model
    """
    result = #TODO perplexity for the sentence
    return result

perplexity(tokenized_text[0], lm)