# Генерація тексту

У цій роботі ми використаємо мовну модель для генерації тексту. Її бажано виконувати після перегляду [лекції 4.2](https://youtu.be/yAUCnoKW2QI)

У класичної мовної моделі є два взаємопов'язані визначення:

1. Оцінити ймовірність вхідного тексту.
2. Маючи певний префікс на вході, видати ймовірностий розподіл наступного слова.

Для генерації тексту нам ідеально підходить друге визначення.

## Початок роботи

Будь ласка, заповніть поля `EMAIL`, `NAME` та `GROUP` нижче:

In [1]:
#############################################№№№№№№№№###########################
# FILL-IN:
#-----------------------------------------------------------------------
EMAIL = "maksym.mysak.knm.2018@lpnu.ua"    # наприклад, oleksiy.syvokon@lpnu.ua
NAME = "Максим Мисак"            # наприклад, "Олексій Сивоконь"
GROUP = "КН-409"  # підставте вашу групу, залиште КН-400, якщо жодна не підходить
#####################################№№№№№№№№###################################

import requests
import json

def report(stage, answer):
    if answer is ...:
        raise ValueError("Please, implement a solution")

    payload = {"email": EMAIL, "name": NAME, "group": GROUP}
    payload["stage"] = str(stage)
    payload["answer"] = str(answer)
    payload["lab"] = "lab4"
    
    r = requests.post("http://134.209.248.229:8082/report", json=payload)
    if not r.ok:
        print("Проблема з сервером :( Спробуйте пізніше або напишіть викладачеві")
    
    return answer

assert EMAIL, "Заповніть поле EMAIL"
assert NAME, "Заповніть поле NAME"
report("Ready", "YES!")

'YES!'

### Завантаження моделі

Тренування мовної моделі з нуля займає багато часу: від кількох годин до кількох днів для маленьких та середніх моделей й до кількох місяців чи навіть років для великих (звичайно, за рахунок .

Для цієї роботи я попередньо натренував невеличку LSTM модель. Тренувальними даними були речення українською мовою, набрані з інтернету навмання. Текст був приведений до нижнього регістру. 

In [2]:
!wget http://134.209.248.229:8081/oscar-epoch3.pt

'wget' is not recognized as an internal or external command,
operable program or batch file.


In [None]:
import torch

In [None]:
model_state = torch.load("oscar-epoch3.pt", map_location=torch.device('cpu'))

In [None]:
import math
import random
import torch
import torch.nn as nn
import torch.nn.functional as F

class LstmLM(nn.Module):

    def __init__(self, vocab_size, dim_embed, dim_hidden, num_layers, dropout=0.5, tie_weights=False):
        super().__init__()
        self.vocab_size = vocab_size
        self.dropout = nn.Dropout(dropout)
        self.embed = nn.Embedding(vocab_size, dim_embed)
        self.rnn = nn.LSTM(dim_embed, dim_hidden, num_layers)
        self.linear = nn.Linear(dim_hidden, vocab_size)

        if tie_weights:
            if dim_hidden != dim_embed:
                raise ValueError('When using the tied flag, dim_hidden must be equal to dim_embed')
            self.linear.weight = self.embed.weight

        self.init_weights()

        self.dim_hidden = dim_hidden
        self.num_layers = num_layers

    def init_weights(self):
        initrange = 0.1
        nn.init.uniform_(self.embed.weight, -initrange, initrange)
        nn.init.zeros_(self.linear.weight)
        nn.init.uniform_(self.linear.weight, -initrange, initrange)

    def forward(self, input, hidden):
        emb = self.dropout(self.embed(input))
        output, hidden = self.rnn(emb, hidden)
        output = self.dropout(output)
        decoded = self.linear(output)
        decoded = decoded.view(-1, self.vocab_size)
        return F.log_softmax(decoded, dim=1), hidden

    def init_hidden(self, bsz=1):
        weight = next(self.parameters())
        return (weight.new_zeros(self.num_layers, bsz, self.dim_hidden),
                weight.new_zeros(self.num_layers, bsz, self.dim_hidden))

In [None]:
model = LstmLM(vocab_size=32000,
               dim_embed=512,
               dim_hidden=512,
               num_layers=3,
               dropout=0.3,
               tie_weights=True)
model.load_state_dict(model_state)
model.eval()

### Словник

Завантажимо словник, з яким тренувалася модель:

In [None]:
!wget http://134.209.248.229:8081/bpe.dict

Клас `Vocab` допоможе зручно виконувати такі часті операції:

1. `Vocab.word2idx()` - отримати індекс слова
2. `Vocab.idx2word()` - отримати слово за індексом

In [None]:
class Vocab(object):
    def __init__(self):
        self.word2idx = {}
        self.idx2word = []

    @classmethod
    def from_file(cls, path):
        d = Vocab()
        with open(path) as f:
            for word in f:
                d.add_word(word.rstrip("\n"))
        return d

    def add_word(self, word):
        if word not in self.word2idx:
            self.idx2word.append(word)
            self.word2idx[word] = len(self.idx2word) - 1
        return self.word2idx[word]

    def __len__(self):
        return len(self.idx2word)

vocab = Vocab.from_file("bpe.dict")

In [None]:
# Перевірка
vocab.word2idx["the"]

In [None]:
vocab.idx2word[22586]

## Перевірка моделі -- один крок ітерації

Ми генеруватимемо текст в циклі токен за токеном, зліва направо.Але для початку розберемо, як виглядає один крок такого циклу. 

Рекурентні мережі зберігають свою "пам'ять" або "стан" у векторі (у випадку з LSTM, двох векторах). Цей стан містить в собі інформацію про вже побачені токени. 

Оскільки на першому кроці ми ще бачили жодного токену, ініціалізуємо "пам'ять" нулями та збережемо її у змінній `hidden`:


In [None]:
hidden = model.init_hidden()
hidden

На кожному кроці на вхід моделі подаємо наступний токен речення. Починаємо зі спеціального токену `<BOS>` ("begin of sentence"):
 

In [None]:
# Знайдемо індекс токена в словнику
index = vocab.word2idx["<BOS>"]
index

In [None]:
# Формуємо вхідний батч. Майже завжди для більшої ефективності
# нейронні мережі очікують на вхід кілька незалежних речень 
# (або зображень у випадку з комп'ютерним зором)
#
# В нашій роботі ми завжди працюємо лише з одним реченням,
# тож розмір батча дорівнює одинці. Але все одно маємо
# оформити вхід як матрицю:
input_ = torch.LongTensor([[index]])
input_

In [None]:
# Нарешті робимо крок LSTM
# Вхід: поточний стан (пам'ять) та вхідний токен
# Вихід: розподіл по словнику (передбачення наступного токена) та оновлений стан
output, hidden = model(input_, hidden)

Перевіримо ймовірностний розподіл, який видала модель:

In [None]:
# Розмірність збігається з розміром словника
output.shape

In [None]:
# Модель повертає логарифми ймовірностей
# Якщо ми проекспоненціонуємо їх, отримаємо "звичайні" ймовірності
probs = output.squeeze().exp()

# Сума ймовірностей має дорівнювати 1.0
probs.sum()

In [None]:
# Кожному слову в словнику відповідає своя ймовірність бути побаченим
# після заданого префікса. Префіксом у нас зараз був лише одни токен <BOS>

# Яка ймовірність, що речення почнеться зі слова я?
probs[vocab.word2idx["я"]]

Тепер, коли маємо ймовірностний розподіл по словнику, можемо обрати слово, яке вважатимемо згенерованим. Тут можливі кілька стратегій, які ми розглянемо в наступних розділах.

## Greedy decoding

Найпростіший (але й не дуже цікавий) спосіб -- це завжди обирати токен з найбільшою ймовірністю:

In [None]:
next_token_id = probs.argmax()
next_token_id

In [None]:
probs[100]

In [None]:
vocab.idx2word[next_token_id]  # Наш перший згенерований токен

In [None]:
report("First generated token", vocab.idx2word[next_token_id])

Зберемо наш код докупи та додамо цикл. В циклі ми продовжуватимемо генерувати текст токен за токеном, поки не настане одна з двох умов:
1. Модель видала спецальний токен `<EOS>` (end of sentence)
2. Довжина згенерованого тексту перевищила певний поріг `max_len`

У хорошої моделі в більшості випадків має спрацьовувати перша умова зупнки. Проте іноді модель може впасти в безкінчений цикл. Щоб цьому запобігти, маємо другу умову.

In [None]:
@torch.no_grad()
def greedy_decode(model, vocab, max_len=50):
    result = []
    EOS_TOKEN = "<EOS>"
    hidden = model.init_hidden()
    start_index = vocab.word2idx["<BOS>"]
    input_ = torch.LongTensor([[start_index]])

    while len(result) < max_len:

        # Передбачення ймовірностней наступного токена
        output, hidden = model(input_, hidden)
        probs = output.squeeze().exp()
        
        # Обираємо токен, що має найбільшу ймовірність
        token_index = probs.argmax()

        # Обраний токен стає наступним вхідним токеном для моделі
        input_.fill_(token_index)
        
        # Додаємо обраний токен в згенерований текст
        token = vocab.idx2word[token_index]
        if token == EOS_TOKEN:
            break
        result.append(token)
        
    return "".join(result)


greedy_decode(model, vocab)

In [None]:
report("greedy decode", greedy_decode(model, vocab))

### Примітка: Byte-pair encoding (BPE)

Наша модель використовує subword токенізацію, а саме byte-pair encoding (BPE). В сучасному NLP це найрозповсюдженіший спосіб токенізації. Детально можете подивитися в [цьому відео](https://www.youtube.com/watch?v=tOMjTCO0htA).

Для наших цілей зараз важливо, що BPE заміняє пробіли на спеціальні Unicode-символи "▁" (зверніть увагу, це не звичайний символ підкреслення "_"). Щоб отримати чистий текст, треба виконати наступну заміну:

In [None]:
def bpe_decode(s):
    result = s.replace("▁", " ")
    if result.startswith(" "):
        result = result[1:]
    return result

bpe_decode(greedy_decode(model, vocab))

## Generic decoding function

Обирати слово з найбільшою ймовірністю -- не найкращий варіант для генерації тексту хоча б тому, що він завжди детерміновано призводить до однієї послідовності. Нижче ми подивимося на цікавіші альтернативи.

Цикл генерації залишиться той самий, що і в `greedy_decode()`. Відрізнятися буде лише один рядок -- той, в якому ми приймали рішення, яке слово обрати. Для зручності, винесемо цей рядок в окрему функцію. Ця функція прийматиме на вхід ймовірностний розподіл по словнику і повертає обраний токен. 

In [None]:
def greedy_choice(probs):
    return probs.argmax()

Функцію генерації також трохи переробимо.

По-перше, додамо параметр `sample_fn` -- це має бути функція, яка обирає слово з ймовірностного розподілу, наприклад, `greedy_choice`.

По-друге, для зручності виконуватимемо BPE декодінг в середині функції генерації.

In [None]:
@torch.no_grad()
def generate(model, vocab, sample_fn, max_len=50, bpe_decoded=True):
    result = []
    EOS_TOKEN = "<EOS>"
    hidden = model.init_hidden()
    start_index = vocab.word2idx["<BOS>"]
    input_ = torch.LongTensor([[start_index]])
    
    while len(result) < max_len:

        # Передбачення ймовірностней наступного токена
        output, hidden = model(input_, hidden)
        probs = output.squeeze().exp()
        
        # Обираємо наступний токен
        token_index = sample_fn(probs)     # <---------------- цей рядок змінено

        # Обраний токен стає наступним вхідним токеном для моделі
        input_.fill_(token_index)
        
        # Додаємо обраний токен в згенерований текст
        token = vocab.idx2word[token_index]
        if token == EOS_TOKEN:
            break
        result.append(token)


    result_str = "".join(result)            # <---------------- ці рядки змінено
    if bpe_decoded:
        result_str = bpe_decode(result_str)

    return result_str


# Перевірка:
generate(model, vocab, greedy_choice)

## Simple sampling

Перший альтернатива -- це sampling. Тут ми обираємо наступний токен випадково, але з урахуванням ймовірностей.

In [None]:
def simple_sample(probs):
    return torch.multinomial(probs, num_samples=1)[0]

# Згенеруємо 10 речень
for i in range(1, 11):
    result = generate(model, vocab, simple_sample, allow_unk=False)
    print(f"#{i}: {result}")
    print()

## Поліпшення семплінгу

### Заборона `<UNK>`

Іноді наша модель генерує `<UNK>` токени. Такі токени важливі в деяких випадках.

Наприклад, візьмемо модель машинного перекладу, яка перекладає з української на англійську. На вхід моделі приходить речення:

```
Мене звати Олексій Сивоконь.
```

Якщо в словнику моделі немає слова "Сивоконь", то модель може згенерувати токен `<UNK>`. Це сигнал, що певні слова вона не може перекласти. В такому випадку вихід моделі буде виглядати якось так:

```
My name is Oleksiy <UNK>.
```

В такому випадку, як правило, запускають додатковий postprocessing модуль, який, скажімо, копіює невідомі слова з вхідного тексту:

```
My name is Oleksiy Сивоконь.
```

Не ідеально, але краще ніж нічого.

Трохи розумніша система могла б робити транслітерацію.

Отже, `<UNK>` токени важливі в машинному перекладі. Однак вони недоцільні у вільній генерації тексту. Тому ми просто заборонимо їх генерацію, призначивши їм нульову ймовірність.

## Sampling with temperature

Ми також можемо впливати на генерацію параметром температури softmax.

Більші значення температури призводять до того, що різниця між ймовірностями токенів зменшується, тобто розподіл стає більш рівномірним. На практиці це означає, що менш ймовірні варіанти обиратимуться частіше і згенерований текст може бути цікавішим. Однак якщо продовжувати піднімати температуру, то текст спочатку втратить зв'язність, далі почнуть розпадатися слова та граматичність.

Менші значення температури змінюють розподіл таким чином, що основна ймовірніста маса припадає на невелику кількість топових токенів. При температурі 0 вся ймовірність дістанеться одному токену й семплінг перетвориться на greedy decoding.

Додамо в функцію `generate()` параметр `temperature` та згенеруємо тексти з різною температурою:

In [None]:
@torch.no_grad()
def generate(model, vocab, sample_fn, max_len=50, min_len=0, allow_unk=True, temperature=1.0):
    result = []
    hidden = model.init_hidden(1)
    # input_ = torch.LongTensor([[random.randint(0, len(vocab))]])
    input_ = torch.LongTensor([[vocab.word2idx["<BOS>"]]])
    
    # Generate continuation
    for _ in range(max_len):
        output, hidden = model(input_, hidden)
        probs = output[-1].squeeze().div(temperature).exp()   # TODO: is this correct?
        
        if not allow_unk:
            probs[vocab.word2idx["<UNK>"]] = 0.
            
        if len(result) < min_len:
            probs[vocab.word2idx["<EOS>"]] = 0.
        
        token_index = sample_fn(probs)
        input_.fill_(token_index)
        
        token = vocab.idx2word[token_index]
        if token == "<EOS>":
            break
        result.append(token)
        
    return bpe_decode("".join(result))

In [None]:
for temperature in (0.1, 0.3, 0.5, 0.8, 1.0, 1.5, 2.0, 3.0, 5.0):
    print(f"Sampling with temperature={temperature}")
    result = generate(model, vocab, simple_sample, temperature=temperature, allow_unk=False)
    print(result)
    print()

In [None]:
# Яке значення `temperature` здається вам оптимальною?
report("best_temperature", ...)

## Top-k sampling

In [None]:
def top_k_sampling(probs, k):
    topk = probs.topk(k)
    index = torch.multinomial(topk.values, num_samples=1)[0]
#     print(f"Top {k} words take {topk.values.sum():%} probability mass")
    return topk.indices[index]

In [None]:
k = 15
sample_fn = lambda probs: top_k_sampling(probs, k=k)
for i in range(1, 10):
    result = generate(model, vocab, sample_fn, allow_unk=False)
    print(f"#{i}: {result}")
    print()

## Nucleus (top-p) sampling

In [None]:
def nucleus_sampling(probs, max_p):
    sorted_probs = probs.sort(descending=True)
    cum_prob = 0.0
    sample_indices = []
    sample_probs = []
    for i in range(0, len(sorted_probs.values)):
        p = sorted_probs.values[i]
        cum_prob += p
        sample_probs.append(p)
        sample_indices.append(sorted_probs.indices[i])
        if cum_prob >= max_p:
            break

    index = torch.multinomial(torch.tensor(sample_probs), num_samples=1)
    return sample_indices[index]

In [None]:
for max_p in (0.1, 0.3, 0.5, 0.6, 0.8, 1.0):
    sample_fn = lambda probs: nucleus_sampling(probs, max_p=max_p)
    result = generate(model, vocab, sample_fn, allow_unk=False)
    print(f"#{max_p}: {result}")
    print()

In [None]:
# Яке значення p здається вам оптимальним?
report("best_p", ...)

## Start from prompt

До цього моменту ми генерували текст з нуля. Однак значно кориснішим є задача генерації тексту від певного префікса або "підказки" -- в англійській мові це називається "prompt".

In [None]:
!pip install youtokentome
!wget http://134.209.248.229:8081/bpe.model

In [None]:
import youtokentome
bpe = youtokentome.BPE("bpe.model")

In [None]:
@torch.no_grad()
def generate(model, vocab, sample_fn, max_len=50, temperature=1.0, allow_unk=True, prompt=None):
    if prompt is None:
        prompt = random.choice(vocab.idx2word)
    prompt = [vocab.idx2word[x] for x in bpe.encode(prompt)]
        
    # force decode prompt
    result = []
    hidden = model.init_hidden(1)
    for i, token in enumerate(prompt[:-1]):
        index = vocab.word2idx[token]
        input_ = torch.LongTensor([[index]])
        output, hidden = model(input_, hidden)
        
    input_ = torch.LongTensor([[vocab.word2idx[prompt[-1]]]])
    result = prompt[:]
    
    # Generate continuation
    for _ in range(max_len):
        output, hidden = model(input_, hidden)
        probs = output[-1].squeeze().div(temperature).exp()
        
        if not allow_unk:
            probs[vocab.word2idx["<UNK>"]] = 0.
        
        token_index = sample_fn(probs)
        input_.fill_(token_index)
        
        token = vocab.idx2word[token_index]
        if token == "<EOS>":
            break
        result.append(token)
        
    return bpe_decode("".join(result))

In [None]:
k = 20
sample_fn = lambda probs: top_k_sampling(probs, k=k)
for i in range(1, 5):
    # президент україни
    # така несподівана заява
    result = generate(model, vocab, sample_fn, allow_unk=False, prompt="магазини відкрили свої")
    print(f"#{i}: {result}")
    print()

Спробуйте продовжити інші префікси:

In [None]:
report("prompt1", generate(model, vocab, sample_fn, allow_unk=False, prompt="студенти зустрілися з"))

In [None]:
report("prompt2", generate(model, vocab, sample_fn, allow_unk=False, prompt="президент україни"))

In [None]:
report("prompt3", generate(model, vocab, sample_fn, allow_unk=False, prompt="така несподівана заява"))

Придумайте свій початок речення:

In [None]:
report("prompt3", generate(model, vocab, sample_fn, allow_unk=False, prompt="Все буде добре"))

# Autocompletion

In [None]:
@torch.no_grad()
def autocomplete(model, vocab, tokens, n=10):
    if isinstance(tokens, str):
        tokens = [vocab.idx2word[x] for x in bpe.encode(tokens)]

    # force decode prompt
    result = []
    hidden = model.init_hidden(1)
    for i, token in enumerate(tokens):
        index = vocab.word2idx[token]
        input_ = torch.LongTensor([[index]])
        output, hidden = model(input_, hidden)
    
    topk = output.squeeze().topk(n)
    words = [vocab.idx2word[x.item()] for x in topk.indices]
    probs = [math.exp(p) for p in topk.values]
    return list(zip(words, probs))

In [None]:
autocomplete(model, vocab, "сьогодні я побачив кілька")

In [None]:
@torch.no_grad()
def score_sentence(model, vocab, tokens):
    if isinstance(tokens, str):
        tokens = ["<BOS>"] + [vocab.idx2word[x] for x in bpe.encode(tokens)]
        
    probs = []

    # force decode prompt
    result = []
    hidden = model.init_hidden(1)
    for i, token in enumerate(tokens):
        index = vocab.word2idx[token]
        input_ = torch.LongTensor([[index]])
        output, hidden = model(input_, hidden)
        log_prob = output[-1].squeeze()[index]
        print(f"{token:<20} {log_prob.item()}")
        probs.append(log_prob.item())
        
    return sum(probs) / len(probs)

# Score sentence

In [None]:
score_sentence(model, vocab, "президент україни борщ")

In [None]:
score_sentence(model, vocab, "президент україни заявив")

In [None]:
report("ALL DONE", "💪")