# Языковые модели

Языковые модели играют важную роль в системах распознавания речи, помогая создавать более грамотные и лексически корректные тексты. В данной работе мы будем изучать нграмные языковые модели, которые позволяют довольно легко оценить вероятность и правдоподобность текста.

В нграмной языковой модели, нграм - это последовательность из n слов в тексте. Например, в предложении "по-моему мы сэкономим уйму времени если я сойду с ума прямо сейчас", биграмами будут "по-моему мы", "мы сэкономим", "сэкономим уйму" итд. Языковые модели оценивают вероятность появления последовательности слов, исходя из статистики появления каждого из нграм в обучающей выборке.

Порядком (order) нграм языковой модели называют максимальную длину нграм, которую учитывает модель. 

Практическая работа разделена на 2 части: 
1. Построение нграмой языковой модели - основная часть, 10 баллов
1. Предсказание с помощью языковой модели - дополнительная часть, 6 балла



Полезные сслыки:
* arpa формат - https://cmusphinx.github.io/wiki/arpaformat/
* обучающие материалы - resources/lab2/lecture13_ngrams_with_SRILM.pdf
* обучающие материалы.2 - https://cjlise.github.io/machine-learning/N-Gram-Language-Model/

In [5]:
import numpy as np
from collections import defaultdict
from typing import List, Dict, Tuple

# 1. Построение нграмной языковой модели. (10 баллов)


Вероятность текста с помощью нграмной языковой модели можно вычислить по формуле: 
$$ P(w_1, w_2, .., w_n) = {\prod{{P_{i=0}^{n}(w_i| w_{i-order}, .., w_{i-1})}}} $$

В простом виде, при обучении нграмной языковой модели, чтобы рассчитать условную вероятность каждой нграмы, используется формула, основанная на количестве появлений нграмы в обучающей выборке. Формула выглядит следующим образом:
$$ P(w_i| w_{i-order}, .., w_{i-1}) = {{count(w_{i-order}, .., w_{i})} \over {count(w_{i-order},..., w_{i-1})}} $$

Поскольку униграмы не содержат в себе какого-дибо контекста, вероятность униграмы можно посчитать поделив кол-во этой слова на общее количество слов в обучающей выборке. 


In [None]:
import re
# в первую очередь нам понадобится подсчитать статистику по обучающей выборке 
def count_ngrams(train_text: List[str], order=3, bos=True, eos=True) -> Dict[Tuple[str], int]:
    ngrams = defaultdict(int)
    text = train_text
    if bos:
        text = [' BOS '+ i for i in text]
    if eos:
        text = [i + ' EOS ' for i in text]
    text = ' '.join(text)
    text = re.sub(r'[^\w\s-]', ' ', text.lower())
    text = [word.replace('bos', '<s>').replace('eos', '</s>') for word in text.split() if word and word != '-']
    for sorder in range(order+1):
        for i in range(len(text) - sorder + 1):
            w = tuple(text[i:i+sorder])
            if w:
                ngrams[w] += 1
    return dict(ngrams)

In [7]:
def test_count_ngrams():
    assert count_ngrams(['привет привет как дела'], order=1, bos=True, eos=True) == {
        ('<s>',): 1, 
        ('привет',): 2, 
        ('как',): 1, 
        ('дела',): 1, 
        ('</s>',): 1
    }
    assert count_ngrams(['привет привет как дела'], order=1, bos=False, eos=True) == {
        ('привет',): 2, 
        ('как',): 1, 
        ('дела',): 1, 
        ('</s>',): 1
    }
    assert count_ngrams(['привет привет как дела'], order=1, bos=False, eos=False) == {
        ('привет',): 2, 
        ('как',): 1, 
        ('дела',): 1
    }
    assert count_ngrams(['привет привет как дела'], order=2, bos=False, eos=False) == {
        ('привет',): 2, 
        ('как',): 1, 
        ('дела',): 1,
        ('привет', 'привет'): 1,
        ('привет', 'как'): 1,
        ('как', 'дела'): 1
    }    
    assert count_ngrams(['привет ' * 6], order=2, bos=False, eos=False) == {
        ('привет',): 6, 
        ('привет', 'привет'): 5
    }
    result = count_ngrams(['практическое сентября',
                           'второе практическое занятие пройдет в офлайне 32 сентября в 12 часов 32 минуты',
                           'в офлайне в 32 12'], order=5)
    assert result[('<s>',)] == 3
    assert result[('32',)] == 3
    assert result[('<s>', 'в', 'офлайне', 'в', '32')] == 1
    assert result[('офлайне', 'в', '32', '12', '</s>')] == 1
    print('Test 1a passed')
    
    
test_count_ngrams()  

Test 1a passed



Простой подход к вычислению вероятностей через количество нграм имеет существенный недостаток. Если в тексте встретится нграмма, которой не было в обучающей выборке, то вероятность всего текста будет равна нулю. 

Чтобы избежать данного недостатка, вводится специальное сглаживание - add-k сглаживание ([Additive, Laplace smoothing](https://en.wikipedia.org/wiki/Additive_smoothing)). Данная техника позволяет учитывать нграмы, не встретившиеся в обучающей выборке, и при этом не делает вероятность текста равной нулю.

Формула сглаживания Лапласа выглядит следующим образом:

$$ P(w_i| w_{i-order}, .., w_{i-1}) = {{count(w_{i-order}, .., w_{i}) + k} \over {count(w_{i-order},..., w_{i-1}) + k*V}} $$

Здесь V - количество слов в словаре, а k - гиперпараметр, который контролирует меру сглаживания. Как правило, значение k выбирается экспериментально, чтобы найти оптимальный баланс между учетом редких нграм и сохранением вероятности для часто встречающихся нграм.


In [10]:
# Fixed function to avoid unreachable code
def calculate_ngram_prob(ngram: Tuple[str], counts: Dict[Tuple[str], int], V=None, k=0) -> float:
    if V is None:
        V = len([i for i in counts.keys() if len(i) == 1])
    if len(ngram) == 1:
        total_tokens = sum([count for key, count in counts.items() if len(key) == 1])
        if total_tokens == 0 and k == 0:
            prob = 0.0
        else:
            prob = (counts.get(ngram, 0) + k) / (total_tokens + k * V)
    else:
        context = ngram[:-1]
        context_count = counts.get(context, 0)
        if context_count == 0 and k == 0:
            prob = 0.0
        else:
            prob = (counts.get(ngram, 0) + k) / (context_count + k * V)
    return prob

In [11]:
def test_calculate_ngram_prob():
    counts = count_ngrams(['практическое сентября',
                           'второе практическое занятие в офлайне 32 сентября в 12 часов 32 минуты',
                           'в офлайне в 32 12'], order=4)
    assert calculate_ngram_prob(('в', 'офлайне'), counts) == 0.5
    assert calculate_ngram_prob(('в', ), counts) == 4/25
    assert calculate_ngram_prob(('в', ), counts, k=0.5) == (4+0.5)/(25+0.5*12)
    assert calculate_ngram_prob(('в', 'офлайне', 'в', '32'), counts) == 1.0
    assert calculate_ngram_prob(('в', 'офлайне'), counts, k=1) == 0.1875
    assert calculate_ngram_prob(('в', 'офлайне'), counts, k=0.5) == 0.25
    assert calculate_ngram_prob(('в', 'онлайне'), counts, k=0) == 0.0
    assert calculate_ngram_prob(('в', 'онлайне'), counts, k=1) == 0.0625
    assert calculate_ngram_prob(('в', 'офлайне'), counts, k=0.5) == 0.25

    print("Test 1.b passed")
    

test_calculate_ngram_prob()  

Test 1.b passed


Основной метрикой язковых моделей является перплексия. 

Перплексия  — безразмерная величина, мера того, насколько хорошо распределение вероятностей предсказывает выборку. Низкий показатель перплексии указывает на то, что распределение вероятности хорошо предсказывает выборку.

$$ ppl = {P(w_1, w_2 ,..., w_N)^{- {1} \over {N}}} $$


In [None]:
class NgramLM:
    def __init__(self, order=3, bos=True, eos=True, k=1, predefined_vocab=None):
        self.order = order
        self.eos = eos
        self.bos = bos
        self.k = k
        self.vocab = predefined_vocab
        self.ngrams_count = None
        
    @property
    def V(self) -> int:
        return len(self.vocab)
    
    def fit(self, train_text: List[str]) -> None:
        self.ngrams_count = count_ngrams(train_text, order=self.order, bos=self.bos, eos=self.eos)
        self.vocab = [i for i in self.ngrams_count.keys() if len(i) == 1]

    def text_clean(self, text: List[str]) -> List[str]:
        text_str = ' '.join(text)
        text_str = re.sub(r'[^\w\s-]', ' ', text_str.lower())
        words = [word.replace('bos', '<s>').replace('eos', '</s>') for word in text_str.split() if word and word != '-']
        return words
                
    def predict_ngram_log_proba(self, ngram: Tuple[str]) -> float:
        prob = calculate_ngram_prob(ngram, self.ngrams_count, self.V, self.k)
        return np.log(prob)
           
    def predict_log_proba(self, words: List[str]) -> float:
        if self.bos:
            words = [' BOS '] + words
        if self.eos:
            words = words + [' EOS ']
        words = self.text_clean(words)
        logprob = 0
        for i in range(len(words)):
            if i == 0:
                ngram = tuple(words[i:i+1])
            else:
                ngram = tuple(words[max(0, i-self.order+1):i+1])
            logprob += self.predict_ngram_log_proba(ngram)
        return logprob
        
    def ppl(self, text: List[str]) -> float:
        logprob = self.predict_log_proba(text)
        if self.bos:
            text = [' BOS '] + text
        if self.eos:
            text = text + [' EOS ']
        processed_words = self.text_clean(text)
        num_words = len(processed_words)
        perplexity = np.exp(-logprob/num_words)
        return perplexity


In [16]:
def test_lm():
    train_data = ["по-моему мы сэкономим уйму времени если я сойду с ума прямо сейчас",
                  "если я сойду с ума прямо сейчас по-моему мы сэкономим уйму времени",
                  "мы сэкономим уйму времени если я сейчас сойду с ума по-моему"]
    global lm
    lm = NgramLM(order=2)
    lm.fit(train_data)
    assert lm.V == 14
    assert np.isclose(lm.predict_log_proba(['мы']), lm.predict_log_proba(["если"]))
    assert lm.predict_log_proba(["по-моему"]) > lm.predict_log_proba(["если"]) 
    
    gt = ((3+1)/(41 + 14) * 1/(3+14))**(-1/2)
    ppl = lm.ppl([''])
    assert  np.isclose(ppl, gt), f"{ppl=} {gt=}"
    
    gt = ((3+1)/(41 + 14) * 1/(3+14) * 1/(14)) ** (-1/3)
    ppl = lm.ppl(['ЧТО'])
    assert  np.isclose(ppl, gt), f"{ppl=} {gt=}"
    
    test_data = ["по-моему если я прямо сейчас сойду с ума мы сэкономим уйму времени"]
    ppl = lm.ppl(test_data)
    assert round(ppl, 2) == 7.33, f"{ppl}"
    print("Test 1.c passed")
test_lm()

Test 1.c passed


# 2. Предсказания с помощью языковой модели (6 балла)

In [17]:
with open('text_lab_2_1.txt') as f:
    text = f.read()

In [18]:
llm = NgramLM(order=2)
llm.fit([text])

In [19]:
def predict_next_word(lm: NgramLM, prefix: str, topk=4, mode='vocab'):
    
    prefix_words = lm.text_clean([prefix])
    last_word = prefix_words[-1] if prefix_words else ""  
    candidates = []
    
    if mode == 'vocab':
        for word in lm.vocab:
            if word in ['<s>', '</s>'] or word == last_word:
                continue
            ngram = prefix_words[lm.order-1:] + [word]
            log_prob = lm.predict_ngram_log_proba(tuple(ngram)[-lm.order:])
            candidates.append((word, log_prob))
    elif mode == 'ngrams':
        for ngram in lm.ngrams_count.keys():
            if ngram[0] in ['<s>', '</s>', 'BOS', 'EOS']:
                continue
            if len(ngram) >= len(prefix_words):
                ngram = tuple(prefix_words) + ngram
            else:
                ngram = prefix_words[:-len(ngram)] + [ngram]
            log_prob = lm.predict_ngram_log_proba(tuple(ngram)[-lm.order:])
            if ngram[-1] != last_word:
                candidates.append((ngram[-1], log_prob))
    
    candidates.sort(key=lambda x: x[1], reverse=True)
    
    seen = set()
    result = []
    for word, _ in candidates:
        if word not in seen:
            seen.add(word)
            result.append(word)
        if len(result) == topk:
            break        
    return result

In [20]:
result_vocab = predict_next_word(llm, 'Машинное', topk=3, mode='vocab')
print(f"Предсказания (vocab): {result_vocab}")

result_ngrams = predict_next_word(llm, 'Машинное', topk=3, mode='ngrams')
print(f"Предсказания (ngrams): {result_ngrams}")


Предсказания (vocab): [('<s>',), ('машинное',), ('обучение',)]
Предсказания (ngrams): ['обучение', 'сети', 'как']


In [21]:
text = 'это'
for i in range(5):
    result_ngrams = predict_next_word(llm, text, topk=1, mode='ngrams')
    if isinstance(result_ngrams[0], tuple):
        result_ngrams = result_ngrams[0][-1]
    else:
        result_ngrams = result_ngrams[0]
    text = f"{text} {result_ngrams}"
print(text)


это обучение сети машинное машинное машинное


In [22]:
text = 'машинное обучение'
for i in range(5):
    result_ngrams = predict_next_word(llm, text, topk=1, mode='vocab')
    if isinstance(result_ngrams[0], tuple):
        result_ngrams = result_ngrams[0][-1]
    else:
        result_ngrams = result_ngrams[0]
    text = f"{text} {result_ngrams}"
print(text)


машинное обучение <s> <s> <s> <s> <s>


Попробуйте обучить ngram языковую модель на нескольких стихотворениях. Не забудьте трансформировать стихотворение в удобный для ngram модели формат (как сделать так, чтобы модель моделировала рифму?). 
Попробуйте сгенерировать продолжение для стихотворения с помощью такой языковой модели. 

In [None]:
with open("/mnt/asr/data/punctuator_data/ru3/processed/RusPoems_train_v2.txt") as f:
    text = f.read()

In [25]:
llm = NgramLM(order=2)
llm.fit([text])

In [26]:
predict_next_word(llm, 'Подруга', topk=3, mode='ngrams')

['breaklabel', 'и', 'страдать']

In [27]:
text = 'Привет'
for i in range(10):
    result_ngrams = predict_next_word(llm, text, topk=3, mode='ngrams')[-1]
    if isinstance(result_ngrams, tuple):
        result_ngrams = result_ngrams[0]
    text = f"{text} {result_ngrams}"
print(text)


Привет страдать мой мальчик мальчик мальчик мальчик мальчик мальчик мальчик мальчик


In [28]:
text = 'Привет'
for i in range(10):
    result_ngrams = predict_next_word(llm, text.split(' ')[-1], topk=3, mode='ngrams')[-1]
    if isinstance(result_ngrams, tuple):
        result_ngrams = result_ngrams[-1]
    text = f"{text} {result_ngrams}"
print(text)

Привет страдать мой страдать мой страдать мой страдать мой страдать мой
