# Генерация текста: н-граммы

В этой тетрадке мы научимся делать простую модель генерации текста на основе н-грамм и встречаемости в корпусе.

In [3]:
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
import random
%matplotlib inline

In [4]:
random.seed(1)
np.random.seed(1)

Будем пробовать генерировать шутки. Для обучения будем использовать [датасет с постами reddit](https://kaggle.com/datasets/thedevastator/one-million-reddit-jokes).

In [3]:
path = # YOUR_PATH
data = pd.read_csv(path)

In [4]:
data.head()

Unnamed: 0,type,id,subreddit.id,subreddit.name,subreddit.nsfw,created_utc,permalink,domain,url,selftext,title,score
0,post,ftbp1i,2qh72,jokes,False,1585785543,https://old.reddit.com/r/Jokes/comments/ftbp1i...,self.jokes,,My corona is covered with foreskin so it is no...,I am soooo glad I'm not circumcised!,2
1,post,ftboup,2qh72,jokes,False,1585785522,https://old.reddit.com/r/Jokes/comments/ftboup...,self.jokes,,It's called Google Sheets.,Did you know Google now has a platform for rec...,9
2,post,ftbopj,2qh72,jokes,False,1585785508,https://old.reddit.com/r/Jokes/comments/ftbopj...,self.jokes,,The vacuum doesn't snore after sex.\r\n\r\n&am...,What is the difference between my wife and my ...,15
3,post,ftbnxh,2qh72,jokes,False,1585785428,https://old.reddit.com/r/Jokes/comments/ftbnxh...,self.jokes,,[removed],My last joke for now.,9
4,post,ftbjpg,2qh72,jokes,False,1585785009,https://old.reddit.com/r/Jokes/comments/ftbjpg...,self.jokes,,[removed],The Nintendo 64 turns 18 this week...,134


Так как наша задача генерации требует только текста, оставим только некоторые столбцы.

In [5]:
columns = ['selftext']
data = data[columns]

## Обработка данных
###1. Чистка датасета

__Подсказка:__ Часто пропуски они обозначаются как nan, но иногда можно заметить иные способы.

In [None]:
# <YOUR CODE HERE> #

Надо тексты привести к нижнему регистру. Пунктуацию можно оставить, так как она влияет на смысл предложения и на встречаемость.
Кроме этого можно избавиться от совсем коротких шуток, так как скорее всего это просто ответы на фразы.

In [8]:
from string import punctuation
import re

In [2]:
def clean_text(text: str) -> list:
     '''
     Делит текст на слова и пунктуацию и приводит все к нижнему регистру
     :param text: строка
     :returns: список слов и знаков препинания
     '''
     # <YOUR CODE HERE> #
     return

In [10]:
data['words'] = data['selftext'].apply(clean_text)
data['lens'] = data['words'].apply(len)
data = data[data.lens > 3]

In [11]:
words = data['words'].tolist()

In [12]:
words[:2]

[['my',
  'corona',
  'is',
  'covered',
  'with',
  'foreskin',
  'so',
  'it',
  'is',
  'not',
  'exposed',
  'to',
  'viruses',
  '.'],
 ["it's", 'called', 'google', 'sheets', '.']]

## N-grams
Для начала попробуем создать самую простую модель, основанную на встречаемости н-граммы в корпусе.

In [20]:
from collections import defaultdict, Counter

In [None]:
# добавляем токены начала и конца
BOS, EOS = '[bos]', '[eos]'

class NGramLanguageModel:
    def __init__(self, lines, n):
        assert n >= 1
        self.n = n
        counts = self.ngram_counts(lines, self.n)

        # перевести количества в вероятности
        self.probs = defaultdict(Counter)
        # probs[(word1, word2)][word3] = P(word3 | word1, word2)
        # <YOUR CODE HERE> #

    def get_possible_next_tokens(self, prefix):
        """
        :param prefix: строка запроса
        :returns: словарь с возможными продолжениями заданного префикса
        """
        prefix = prefix.split()
        prefix = prefix[max(0, len(prefix) - self.n + 1):]
        prefix = [ BOS ] * (self.n - 1 - len(prefix)) + prefix
        return self.probs[tuple(prefix)]

    @staticmethod
    def ngram_counts(lines: list, n: int) -> dict:
        '''
        Создаёт словарь, где каждому префиксу (n-1 слово) присваивается словарь,
        в котором ключи - слова, а значения - количество н-грамм в текстах
        :param lines: список списков
        :param n: количество слов в н-грамме
        :returns: словарь, в котором для каждого в префикса известно количество
        н-грамм с каждым словом
        '''
        dictionary = defaultdict(Counter)
        # dictionary[(word1, word2)][word3] = count((word1, word2, word3))
        # <YOUR CODE HERE> #
        return dictionary

# Проверим работу функции ngram_counts
dummy_lines = sorted(words, key=len)[:100]
dummy_counts = NGramLanguageModel.ngram_counts(dummy_lines, n=3)
assert set(map(len, dummy_counts.keys())) == {2}, "please only count {n-1}-grams"
assert len(dummy_counts[(BOS, BOS)]) == 66
assert dummy_counts[BOS, 'a']['melon'] == 1

# Проверим работу модели
dummy_lm = NGramLanguageModel(dummy_lines, n=3)
p_initial = dummy_lm.get_possible_next_tokens('')
assert p_initial.most_common(1)[0][0] == 'a'

1. Попробуем составить предложение используя жадный метод.

In [17]:
def get_next_word(lm: NGramLanguageModel, prefix: str) -> str:
    '''
    :param lm: language model
    :param prefix: строка префикса
    :returns: следующее, наиболее вероятное, слово для данного префикса
    '''
    # <YOUR CODE HERE> #
    return # next word

In [None]:
lm = NGramLanguageModel(words, n=3)
prefix = 'get'
repeat = 20
for _ in range(repeat):
    word = get_next_word(lm, prefix)
    prefix += ' ' + word
    if prefix.endswith(EOS):
        break

print(prefix)

In [19]:
prefix = ''
word = get_next_word(lm, prefix)
while word != EOS:
    prefix += f' {word}'
    word = get_next_word(lm, prefix)

print(prefix + f'{word} ')

2. Выбор наиболее вероятного слова не показал хороших результатов. Давайте попробуем семплировать методом top-k: выбираем k наиболее встречаемых вариантов и из них выбираем один случайным образом.

In [21]:
def get_next_word(lm: NGramLanguageModel, prefix: str, k:int) -> str:
    '''
    :param lm: language model
    :param prefix: строка префикса
    :param k: количество слов в top-k
    :returns: следующее, наиболее вероятное, слово для данного префикса
    '''
    # <YOUR CODE HERE> #
    return # next word

In [None]:
lm = NGramLanguageModel(words, n=3)
prefix = 'get'
repeat = 20
for i in range(repeat):
    word = get_next_word(lm, prefix, 5)
    prefix += ' ' + word
    if prefix.endswith(EOS):
        break

print(prefix)

In [None]:
prefix = ''
word = get_next_word(lm, '', 5)
while word != EOS:
    prefix += f'{word} '
    word = get_next_word(lm, prefix, 5)

print(prefix + f'{word} ')

3. Для сравнения можно сделать beam search. Напоминаем, что он на каждом шаге выбирает k наилучших вариантов - те, с которыми наибольшая вероятность всего предложения. Кроме этого надо не забыть, что мы переводим вероятности в логарифмы: таким образом считается сумма логарифмов вероятностей для каждой н-граммы, из которого состоит предложение.
Чтобы не пересчитывать каждый раз корпус, давайте будем считать вероятность последней н-граммы.

Попробуйте написать свой beam search, для n-gramm, где n=3 и для k=2.

\begin{align}
        \frac{1}{L^\alpha}\sum_{t'=1}^LlogP(y^{t'}|y_{t'-n}, ..., y_{t'}, <bos>)
    \end{align}



In [None]:
lm = NGramLanguageModel(words, n=3)
prefix = 'he' # YOUR IDEA
prefixes = [prefix, prefix]
probs = [0, 0]
k = 2

# <YOUR CODE HERE> #

--------------

## Решение

## Обработка данных
###1. Чистка датасета

Для начала надо избавиться от пустых строк, или же нанов. Часто они обозначаются как nan, но иногда можно заметить иные способы.

In [9]:
data['selftext'].value_counts()[:10]

[removed]                          232919
[deleted]                          188442
\[removed\]                           272
To get to the other side.             125
Dr. Dre                               111
A stick.                               83
None.                                  81
A stick                                76
He worked it out with a pencil.        74
Then it hit me.                        72
Name: selftext, dtype: int64

Можно заметить, что наиболее частым классом являются _removed_ или _deleted_.

In [10]:
print('Размер данных до чистки', data.shape)
data = data[~data.isin(['[removed]', '[deleted]', '\[removed\]', 'removed', 'deleted'])]
data = data.dropna()
print('Размер данных после чистки', data.shape)

Размер данных до чистки (999998, 1)
Размер данных после чистки (573887, 1)


Надо тексты привести к нижнему регистру и убрать пунктуацию.
Кроме этого можно избавиться от совсем коротких шуток, так как скорее всего это просто ответы на фразы.

In [11]:
from string import punctuation
import re

In [12]:
def clean_text(text):
     text = text.lower()
     new_text = []
     for word in text.split():
        if word.endswith(tuple(punctuation)):
            new_text.append(word[:-1])
            new_text.append(word[-1])
        else:
            new_text.append(word)
     return new_text

In [13]:
data['words'] = data['selftext'].apply(clean_text)
data['lens'] = data['words'].apply(len)
data = data[data.lens > 3]

In [16]:
words = data['words'].tolist()

In [17]:
words[:2]

[['my',
  'corona',
  'is',
  'covered',
  'with',
  'foreskin',
  'so',
  'it',
  'is',
  'not',
  'exposed',
  'to',
  'viruses',
  '.'],
 ["it's", 'called', 'google', 'sheets', '.']]

## N-grams
Для начала попробуем создать самую простую модель, основанную на встречаемости н-граммы в корпусе.

In [None]:
from collections import defaultdict, Counter

In [None]:
# добавляем токены начала и конца
BOS, EOS, UNK = '[bos]', '[eos]', '[unk]'

def ngram_counts(lines, n):
    dictionary = defaultdict(Counter)
    for line in lines:
        new_line = [BOS] * (n-1) + line + [EOS]
        for i in range(n-1, len(new_line)):
            prefix = tuple(new_line[i-n+1:i])
            word = new_line[i]
            dictionary[prefix][word] += 1
    return dictionary

dummy_lines = sorted(words, key=len)[:100]
dummy_counts = ngram_counts(dummy_lines, n=3)
assert set(map(len, dummy_counts.keys())) == {2}, "please only count {n-1}-grams"
assert len(dummy_counts[(BOS, BOS)]) == 66
assert dummy_counts[BOS, 'a']['melon'] == 1

In [None]:
# добавляем токены начала и конца
BOS, EOS = '[bos]', '[eos]'

class NGramLanguageModel:
    def __init__(self, lines, n):
        assert n >= 1
        self.n = n
        counts = self.ngram_counts(lines, self.n)

        # перевести количества в вероятности
        self.probs = defaultdict(Counter)
        # probs[(word1, word2)][word3] = P(word3 | word1, word2)

        for key, value in counts.items():
            sum_of_prefix = sum(value.values())
            for word, cnts in value.items():
                self.probs[key][word] = cnts / sum_of_prefix

    def get_possible_next_tokens(self, prefix):
        """
        :param prefix: строка запроса
        :returns: словарь с возможными продолжениями заданного префикса
        """
        prefix = prefix.split()
        prefix = prefix[max(0, len(prefix) - self.n + 1):]
        prefix = [ BOS ] * (self.n - 1 - len(prefix)) + prefix
        return self.probs[tuple(prefix)]

    @staticmethod
    def ngram_counts(lines, n):
        dictionary = defaultdict(Counter)
        for line in lines:
            new_line = [BOS] * (n-1) + line + [EOS]
            for i in range(n-1, len(new_line)):
                prefix = tuple(new_line[i-n+1:i])
                word = new_line[i]
                dictionary[prefix][word] += 1
        return dictionary

# Проверим работу функции ngram_counts
dummy_lines = sorted(words, key=len)[:100]
dummy_counts = NGramLanguageModel.ngram_counts(dummy_lines, n=3)
assert set(map(len, dummy_counts.keys())) == {2}, "please only count {n-1}-grams"
assert len(dummy_counts[(BOS, BOS)]) == 66
assert dummy_counts[BOS, 'a']['melon'] == 1

# Проверим работу модели
dummy_lm = NGramLanguageModel(dummy_lines, n=3)
p_initial = dummy_lm.get_possible_next_tokens('')
assert p_initial.most_common(1)[0][0] == 'a'

1. Жадный метод.

In [25]:
def get_next_word(lm, prefix):
    return lm.get_possible_next_tokens(prefix).most_common(1)[0][0]

In [None]:
lm = NGramLanguageModel(words, n=3)
prefix = 'get'
repeat = 20
for _ in range(repeat):
    word = get_next_word(lm, prefix)
    prefix += ' ' + word
    if prefix.endswith(EOS):
        break

print(prefix)

In [None]:
prefix = ''
word = get_next_word(lm, prefix)
while word != EOS:
    prefix += f' {word}'
    word = get_next_word(lm, prefix)

print(prefix + f'{word} ')

2. Top-k.

In [None]:
def get_next_word(lm, prefix, k):
    next_words = lm.get_possible_next_tokens(prefix).most_common(k)
    index = random.randint(0, min(k, len(next_words))-1)
    return next_words[index][0]

In [None]:
lm = NGramLanguageModel(words, n=3)
prefix = 'get'
repeat = 20
for _ in range(repeat):
    word = get_next_word(lm, prefix)
    prefix += ' ' + word
    if prefix.endswith(EOS):
        break

print(prefix)

In [None]:
prefix = ''
word = get_next_word(lm, prefix)
while word != EOS:
    prefix += f' {word}'
    word = get_next_word(lm, prefix)

print(prefix + f'{word} ')

3. Beam search

In [None]:
lm = NGramLanguageModel(words, n=3)
prefix = 'he' # YOUR IDEA
prefixes = [prefix, prefix]
probs = [0, 0]
k = 2

step = 1
while (not prefixes[0].endswith(EOS)) and (not prefixes[1].endswith(EOS)) and (step != 20):
    print('step', step)
    print(prefixes, probs, sep='\n')
    step += 1
    possible_words1 = lm.get_possible_next_tokens(prefixes[0]).most_common(k)
    probs1 = []
    for word in possible_words1:
        probs1.append((word[0], 1/(step)*(probs[0]+np.log(word[1])), probs[0]+np.log(word[1])))
    possible_words2 = lm.get_possible_next_tokens(prefixes[1]).most_common(k)
    probs2 = []
    for word in possible_words2:
        probs2.append((word[0], 1/(step)*(probs[1]+np.log(word[1])), probs[0]+np.log(word[1])))
    choice = []
    probs1 = sorted(probs1, key=lambda x: x[1], reverse=True)
    probs2 = sorted(probs2, key=lambda x: x[1], reverse=True)
    probs_new = []
    while len(choice) != k and len(probs1) != 0 and len(probs2) != 0:
        if probs1[0][1] > probs2[0][1]:
            choice.append(prefixes[0] + f' {probs1[0][0]}')
            probs_new.append(probs1[0][2])
            probs1 = probs1[1:]
            possible_words1 = probs1[1:]
        else:
            choice.append(prefixes[1] + f' {probs2[0][0]}')
            probs_new.append(probs2[0][2])
            probs2 = probs2[1:]
            possible_words2 = probs2[1:]
    prefixes = choice
    probs = probs_new