# Языковые модели

Языковые модели играют важную роль в системах распознавания речи, помогая создавать более грамотные и лексически корректные тексты. В данной работе мы будем изучать нграмные языковые модели, которые позволяют довольно легко оценить вероятность и правдоподобность текста.

В нграмной языковой модели, нграм - это последовательность из n слов в тексте. Например, в предложении "по-моему мы сэкономим уйму времени если я сойду с ума прямо сейчас", биграмами будут "по-моему мы", "мы сэкономим", "сэкономим уйму" итд. Языковые модели оценивают вероятность появления последовательности слов, исходя из статистики появления каждого из нграм в обучающей выборке.

Порядком (order) нграм языковой модели называют максимальную длину нграм, которую учитывает модель. 

Практическая работа разделена на 2 части: 
1. Построение нграмой языковой модели - основная часть, 10 баллов
1. Предсказание с помощью языковой модели - дополнительная часть, 6 балла



Полезные сслыки:
* arpa формат - https://cmusphinx.github.io/wiki/arpaformat/
* обучающие материалы - resources/lab2/lecture13_ngrams_with_SRILM.pdf
* обучающие материалы.2 - https://cjlise.github.io/machine-learning/N-Gram-Language-Model/

In [1]:
import numpy as np
from collections import defaultdict
from typing import List, Dict, Tuple

# 1. Построение нграмной языковой модели. (10 баллов)


Вероятность текста с помощью нграмной языковой модели можно вычислить по формуле: 
$$ P(w_1, w_2, .., w_n) = {\prod{{P_{i=0}^{n}(w_i| w_{i-order}, .., w_{i-1})}}} $$

В простом виде, при обучении нграмной языковой модели, чтобы рассчитать условную вероятность каждой нграмы, используется формула, основанная на количестве появлений нграмы в обучающей выборке. Формула выглядит следующим образом:
$$ P(w_i| w_{i-order}, .., w_{i-1}) = {{count(w_{i-order}, .., w_{i})} \over {count(w_{i-order},..., w_{i-1})}} $$

Поскольку униграмы не содержат в себе какого-дибо контекста, вероятность униграмы можно посчитать поделив кол-во этой слова на общее количество слов в обучающей выборке. 


In [2]:
# в первую очередь нам понадобится подсчитать статистику по обучающей выборке 
def count_ngrams(train_text: List[str], order=3, bos=True, eos=True) -> Dict[Tuple[str], int]:
    ngrams = defaultdict(int)
    # TODO реализуйте функцию, которая подсчитывает все 1-gram 2-gram ... order-gram ngram'ы в тексте
    for text in train_text:
        tokens = text.split()
        
        if bos:
            tokens = ['<s>'] + tokens
        if eos:
            tokens = tokens + ['</s>']
        
        for n in range(1, order + 1):
            for i in range(len(tokens) - n + 1):
                ngram = tuple(tokens[i:i + n])
                ngrams[ngram] += 1

    return dict(ngrams)

In [3]:
def test_count_ngrams():
    assert count_ngrams(['привет привет как дела'], order=1, bos=True, eos=True) == {
        ('<s>',): 1, 
        ('привет',): 2, 
        ('как',): 1, 
        ('дела',): 1, 
        ('</s>',): 1
    }
    assert count_ngrams(['привет привет как дела'], order=1, bos=False, eos=True) == {
        ('привет',): 2, 
        ('как',): 1, 
        ('дела',): 1, 
        ('</s>',): 1
    }
    assert count_ngrams(['привет привет как дела'], order=1, bos=False, eos=False) == {
        ('привет',): 2, 
        ('как',): 1, 
        ('дела',): 1
    }
    assert count_ngrams(['привет привет как дела'], order=2, bos=False, eos=False) == {
        ('привет',): 2, 
        ('как',): 1, 
        ('дела',): 1,
        ('привет', 'привет'): 1,
        ('привет', 'как'): 1,
        ('как', 'дела'): 1
    }    
    assert count_ngrams(['привет ' * 6], order=2, bos=False, eos=False) == {
        ('привет',): 6, 
        ('привет', 'привет'): 5
    }
    result = count_ngrams(['практическое сентября',
                           'второе практическое занятие пройдет в офлайне 32 сентября в 12 часов 32 минуты',
                           'в офлайне в 32 12'], order=5)
    assert result[('<s>',)] == 3
    assert result[('32',)] == 3
    assert result[('<s>', 'в', 'офлайне', 'в', '32')] == 1
    assert result[('офлайне', 'в', '32', '12', '</s>')] == 1
    print('Test 1a passed')
    
    
test_count_ngrams()  

Test 1a passed



Простой подход к вычислению вероятностей через количество нграм имеет существенный недостаток. Если в тексте встретится нграмма, которой не было в обучающей выборке, то вероятность всего текста будет равна нулю. 

Чтобы избежать данного недостатка, вводится специальное сглаживание - add-k сглаживание ([Additive, Laplace smoothing](https://en.wikipedia.org/wiki/Additive_smoothing)). Данная техника позволяет учитывать нграмы, не встретившиеся в обучающей выборке, и при этом не делает вероятность текста равной нулю.

Формула сглаживания Лапласа выглядит следующим образом:

$$ P(w_i| w_{i-order}, .., w_{i-1}) = {{count(w_{i-order}, .., w_{i}) + k} \over {count(w_{i-order},..., w_{i-1}) + k*V}} $$

Здесь V - количество слов в словаре, а k - гиперпараметр, который контролирует меру сглаживания. Как правило, значение k выбирается экспериментально, чтобы найти оптимальный баланс между учетом редких нграм и сохранением вероятности для часто встречающихся нграм.


In [4]:
# функция подсчета вероятности через количество со сглаживанием Лапласа
def calculate_ngram_prob(ngram: Tuple[str], counts: Dict[Tuple[str], int], V=None, k=0) -> float:
    # подсчитывет ngram со сглаживанием Лапласа
    # TODO
    if V is None:
        V = len([unigram for unigram in counts.keys() if len(unigram) == 1])
    
    if len(ngram) == 1:
        total_unigrams = sum(count for gram, count in counts.items() if len(gram) == 1)
        prob = (counts.get(ngram, 0) + k) / (total_unigrams + k * V)
        return prob
    
    context = ngram[:-1]
    context_count = counts.get(context, 0)

    prob = (counts.get(ngram, 0) + k) / (context_count + k * V)
    return prob

In [5]:
def test_calculate_ngram_prob():
    counts = count_ngrams(['практическое сентября',
                           'второе практическое занятие в офлайне 32 сентября в 12 часов 32 минуты',
                           'в офлайне в 32 12'], order=4)
    assert calculate_ngram_prob(('в', 'офлайне'), counts) == 0.5
    assert calculate_ngram_prob(('в', ), counts) == 4/25
    assert calculate_ngram_prob(('в', ), counts, k=0.5) == (4+0.5)/(25+0.5*12)
    assert calculate_ngram_prob(('в', 'офлайне', 'в', '32'), counts) == 1.0
    assert calculate_ngram_prob(('в', 'офлайне'), counts, k=1) == 0.1875
    assert calculate_ngram_prob(('в', 'офлайне'), counts, k=0.5) == 0.25
    assert calculate_ngram_prob(('в', 'онлайне'), counts, k=0) == 0.0
    assert calculate_ngram_prob(('в', 'онлайне'), counts, k=1) == 0.0625
    assert calculate_ngram_prob(('в', 'офлайне'), counts, k=0.5) == 0.25

    print("Test 1.b passed")
    

test_calculate_ngram_prob()  

Test 1.b passed


Основной метрикой язковых моделей является перплексия. 

Перплексия  — безразмерная величина, мера того, насколько хорошо распределение вероятностей предсказывает выборку. Низкий показатель перплексии указывает на то, что распределение вероятности хорошо предсказывает выборку.

$$ ppl = {P(w_1, w_2 ,..., w_N)^{- {1} \over {N}}} $$


In [6]:
# Языковая модель 
class NgramLM:
    def __init__(self, order=3, bos=True, eos=True, k=1, predefined_vocab=None):
        self.order = order
        self.eos = eos
        self.bos = bos
        self.k = k
        self.vocab = predefined_vocab
        self.ngrams_count = None
        
    @property
    def V(self) -> int:
        return len(self.vocab)
    
    def fit(self, train_text: List[str]) -> None:
        # TODO
        # Подсчет vocab и ngrams_count по обучающей выборке
        self.ngrams_count = count_ngrams(train_text, order=self.order, bos=self.bos, eos=self.eos)
        self.vocab = [i for i in self.ngrams_count.keys() if len(i) == 1]
                
    
    def predict_ngram_log_proba(self, ngram: Tuple[str]) -> float:
        # TODO 
        # считаем логарифм вероятности конкретной нграмы
        prob = calculate_ngram_prob(ngram, self.ngrams_count, V=self.V, k=self.k)
        return np.log(prob)

    def text_preprocess(self, text: List[str]) -> List[str]:
        text_str = ' '.join(text)
        # text_str = re.sub(r'[^\w\s-]', ' ', text_str.lower(), flags=re.UNICODE)
        words = [word for word in text_str.split() if word and word != '-']
        return words
    
    def predict_log_proba(self, words: List[str]) -> float:
        if self.bos:
            words = ['<s>'] + words
        if self.eos:
            words = words + ['</s>']
        logprob = 0
        # TODO 
        # применяем chain rule, чтобы посчитать логарифм вероятности всей строки
        words = self.text_preprocess(words)
        for i in range(len(words)):
            if i == 0:
                ngram = tuple(words[i:i+1])
            else:
                ngram = tuple(words[max(0, i-self.order+1):i+1])
            logprob += self.predict_ngram_log_proba(ngram)
        return logprob
        
    def ppl(self, text: List[str]) -> float:
        #TODO 
        # подсчет перплексии
        # Для того, чтобы ваш код был численно стабильным, 
        #    не считайте формулу напрямую, а воспользуйтесь переходом к логарифмам вероятностей
        # 
        log_prob = self.predict_log_proba(text)
        if self.bos:
            text = ['<s>'] + text
        if self.eos:
            text = text + ['</s>']
        
        words = self.text_preprocess(text)
        N = len(words)
        perplexity = np.exp(-log_prob / N)
        return perplexity

In [7]:
def test_lm():
    train_data = ["по-моему мы сэкономим уйму времени если я сойду с ума прямо сейчас",
                  "если я сойду с ума прямо сейчас по-моему мы сэкономим уйму времени",
                  "мы сэкономим уйму времени если я сейчас сойду с ума по-моему"]
    global lm
    lm = NgramLM(order=2)
    lm.fit(train_data)
    assert lm.V == 14
    assert np.isclose(lm.predict_log_proba(['мы']), lm.predict_log_proba(["если"]))
    assert lm.predict_log_proba(["по-моему"]) > lm.predict_log_proba(["если"]) 
    
    gt = ((3+1)/(41 + 14) * 1/(3+14))**(-1/2)
    ppl = lm.ppl([''])
    assert  np.isclose(ppl, gt), f"{ppl=} {gt=}"
    
    gt = ((3+1)/(41 + 14) * 1/(3+14) * 1/(14)) ** (-1/3)
    ppl = lm.ppl(['ЧТО'])
    assert  np.isclose(ppl, gt), f"{ppl=} {gt=}"
    
    test_data = ["по-моему если я прямо сейчас сойду с ума мы сэкономим уйму времени"]
    ppl = lm.ppl(test_data)
    assert round(ppl, 2) == 7.33, f"{ppl}"
    print("Test passed!")
    
test_lm()

Test passed!


# 2. Предсказания с помощью языковой модели (6 балла)

In [8]:
from datasets import load_dataset
import re
import string

ds = load_dataset("zloelias/lenta-ru")

  from .autonotebook import tqdm as notebook_tqdm


In [9]:
ds

DatasetDict({
    train: Dataset({
        features: ['title', 'text', 'topic', 'labels'],
        num_rows: 185972
    })
    test: Dataset({
        features: ['title', 'text', 'topic', 'labels'],
        num_rows: 20664
    })
})

In [10]:
texts = ds["train"]["text"][:100]

In [11]:
punctuation_without_ = string.punctuation.replace('-', '')
translator = str.maketrans('', '', string.punctuation)    

train = [text.lower().translate(translator) for text in texts]

In [12]:
model = NgramLM(order=2)
model.fit(train)

In [13]:
def predict_next_word(lm: NgramLM, prefix, topk=4):
    if isinstance(prefix, str):
        prefix = prefix.split()
    
    candidates = []
    for word_tuple in lm.vocab:
        word = word_tuple[0]
        
        if word in ['<s>', '</s>']:
            continue
            
        full_sequence = prefix + [word]
        log_prob = lm.predict_log_proba(full_sequence)
        
        candidates.append((word, log_prob))
    
    candidates.sort(key=lambda x: x[1], reverse=True)
    return candidates[:topk]

In [14]:
print(predict_next_word(lm=model, prefix='два', topk=3))

[('года', np.float64(-29.31441989773503)), ('тура', np.float64(-30.39982732210797)), ('год', np.float64(-30.400721605144252))]


In [15]:
def complete_sentence(lm: NgramLM, prefix: str, num_words: int = 5):
    current_sentence = prefix.split()
    
    for _ in range(num_words):
        candidates = predict_next_word(lm, current_sentence, topk=1)
        
        if not candidates:
            break
            
        next_word = candidates[0][0]
        current_sentence.append(next_word)
    
    return ' '.join(current_sentence)

In [16]:
prefix = "два"
completed = complete_sentence(model, prefix, num_words=3)
print(f"Исходное: '{prefix}'")
print(f"Дополненное: '{completed}'")

Исходное: 'два'
Дополненное: 'два года в россии'


Попробуйте обучить ngram языковую модель на нескольких стихотворениях. Не забудьте трансформировать стихотворение в удобный для ngram модели формат (как сделать так, чтобы модель моделировала рифму?). 
Попробуйте сгенерировать продолжение для стихотворения с помощью такой языковой модели. 

In [17]:
ds = load_dataset("Pclanglais/Onegin")

In [18]:
ds

DatasetDict({
    train: Dataset({
        features: ['work', 'author', 'date', 'chapter', 'stanza', 'verse', 'text'],
        num_rows: 977
    })
})

In [19]:
ds["train"]["text"]

Column(['«Мой дядя самыхъ честныхъ правилъ,', '«Когда не вшутку занемогъ,', '«Онъ уважать себя заставилъ,', '«И лучше выдумать не могъ;', '«Его примѣръ другимъ наука:'])

In [20]:
def prepare_poetry_data(dataset, max_lines=None):
    poems = []
    
    for example in dataset['train']:
        if max_lines and len(poems) >= max_lines:
            break
            
        text = example.get('text', '')
        if text:
            lines = text.split('\n')
            for line in lines:
                line = line.strip()
                if line and len(line.split()) > 1:
                    line = line.lower()
                    poems.append(line)

    punctuation_without_ = string.punctuation + '«' + '»'
    translator = str.maketrans('', '', punctuation_without_)    

    poems = [text.lower().translate(translator) for text in poems]
    return poems

poetry_data = prepare_poetry_data(ds, max_lines=200)

print(f"Загружено {len(poetry_data)} строк стихотворений")
print("Примеры строк:")
for i in range(min(5, len(poetry_data))):
    print(f"- {poetry_data[i]}")

Загружено 200 строк стихотворений
Примеры строк:
- мой дядя самыхъ честныхъ правилъ
- когда не вшутку занемогъ
- онъ уважать себя заставилъ
- и лучше выдумать не могъ
- его примѣръ другимъ наука


In [22]:
poetry_model = NgramLM(order=2, k=0.1)
poetry_model.fit(poetry_data)

def generate_quatrain(lm: NgramLM, first_line: str):
    lines = [first_line]
    
    for i in range(3):  # генерируем еще 3 строки
        # берем последние 2 слова предыдущей строки как начало новой
        prev_words = lines[-1].split()[-2:] if lines[-1].split() else []
        current_line = prev_words.copy()
        
        while len(current_line) < 4:
            candidates = predict_next_word(lm, current_line, topk=3)
            if candidates:
                current_line.append(candidates[0][0])
            else:
                break
        
        lines.append(" ".join(current_line))
    
    return lines

# Тестирование
start_lines = [
    "любви надежды тихой славы",
    "мороз и солнце день чудесный", 
    "я помню чудное мгновенье"
]

for i, start_line in enumerate(start_lines):
    print(f"\n--- Четверостишие {i+1} ---")
    print(f"Начало: '{start_line}'")
    
    quatrain = generate_quatrain(poetry_model, start_line)
    
    for j, line in enumerate(quatrain):
        print(f"{j+1}: {line}")


--- Четверостишие 1 ---
Начало: 'любви надежды тихой славы'
1: любви надежды тихой славы
2: тихой славы забавлять забавлять
3: забавлять забавлять забавлять забавлять
4: забавлять забавлять забавлять забавлять

--- Четверостишие 2 ---
Начало: 'мороз и солнце день чудесный'
1: мороз и солнце день чудесный
2: день чудесный забавлять забавлять
3: забавлять забавлять забавлять забавлять
4: забавлять забавлять забавлять забавлять

--- Четверостишие 3 ---
Начало: 'я помню чудное мгновенье'
1: я помню чудное мгновенье
2: чудное мгновенье забавлять забавлять
3: забавлять забавлять забавлять забавлять
4: забавлять забавлять забавлять забавлять
