# Generování textu znakovou RNN

In [None]:
import numpy as np
import matplotlib.pyplot as plt
import os
import pickle
import random
import tqdm

from IPython.core.debugger import set_trace

plt.rcParams['figure.figsize'] = (12., 8.)
plt.rcParams['image.interpolation'] = 'nearest'
plt.rcParams['image.cmap'] = 'gray'

In [None]:
import torch
import torch.nn.functional as F
from torch.autograd import Variable
from torch import nn
from torch import optim

V tomto cvičení nebudeme používat GPU, protože budeme zpracovávat znaky po jednom a v takto malých dávkách overhead způsobený neustálými přesuny dat mezi GPU a RAM výpočty pouze zpomalí.

# Data

Namísto obrazu tentokrát použijeme textová data. Konkrétně se jedná o novinové nadpisy, které se budeme snažit generovat automaticky. Všechna data jsou v jediném souboru, který si stáhněte [odsud](https://1drv.ms/t/s!AotVPA94wWKxoWLULaBqvPXiNS5t) a uložte jako `data/headlines.txt`.

Z textu byly odstraneny hacky, carky a vsechny nestandardni znaky. Neni tedy potreba resit kodovani apod.

In [None]:
data = open('data/headlines.txt').read()
lines = [line for line in data.split('\n') if line]

Ukázka dat:

In [None]:
for i in range(10):
    print(i, random.choice(lines))

Sada znaků = náš slovník:

In [None]:
chars = list(sorted(set(data)))
print(len(chars), chars)

Následující tabulka (`dict`) nám usnadní převod znaku na index.

In [None]:
chr2idx = {c: i for i, c in enumerate(chars)}

Podíváme se na statistické rozložení prvních znaků ve větách.

In [None]:
counts = {c: 0 for c in chars}
for line in lines:
    counts[line[0]] += 1
counts = np.array([counts[c] for c in chars], dtype=np.float)
p0 = counts / counts.sum()

In [None]:
plt.figure(figsize=(16, 8))
rects = plt.bar(range(len(chars)), 100. * p0)
plt.xticks(range(len(chars)), ['{}'.format(repr(c)) for c in chars])
for r in rects:
    x, w, h = r.get_x(), r.get_width(), r.get_height()
    plt.text(x + w / 2., h + 0.1, '{:.1f}'.format(h), ha='center', va='bottom', fontsize=8)
plt.ylabel('počet')
plt.show()

Funkce pro zobrazení průběhu lossu:

In [None]:
def plot_history(history, avg_range=1):
    if avg_range > 1:
        y = np.mean(np.reshape(history[:avg_range * (len(history) // avg_range)], (-1, avg_range)), axis=1)
    else:
        y = history
    plt.plot(y)
    plt.show()

# Sekvenční data a PyTorch

Následující funkce převede řetězec na sekvenci čísel odpovídajících indexům znaků v tabulce. Pokud např. `chars = ['a', 'b', 'c']`, pak řetězec `'acba'` převede na `[0, 2, 1, 0]`. Výsledek vrátí jako PyTorch `Variable`.

In [None]:
def char_tensor(string):
    tensor = torch.zeros(len(string)).long()
    for c in range(len(string)):
        tensor[c] = chr2idx[string[c]]
    x = Variable(tensor)
    return x

In [None]:
x = char_tensor('abca')
x

Další funkce bude dělat opak: převede sekvenci indexů na řetězec.

In [None]:
def to_string(indices):
    if isinstance(indices, Variable):
        indices = indices.data
    return ''.join([chars[i] for i in indices])

In [None]:
to_string(x)

Sekvenci čísel potřebujeme převést na vektory jednotlivých znaků. Tento proces se v anglické literatuře označuje jako embedding a PyTorch ho implementuje jako vrstvu třídou `Embedding`. Vyjádřením této operace diferencovatelnou vrstvou umožňuje učení vektorů, které tedy nemusejí být fixní. O tom ale až příště.

In [None]:
# velikost slovniku je `len(chars)`
# dimenze znakoveho vektoru bude napr. 30
emb = nn.Embedding(len(chars), 30)

# dopredny pruchod
e = emb(x)
e

PyTorch implementuje tři z nejrozšířenějších typů sítí třídami `RNN`, `LSTM` a `GRU`. API je pro všechny stejné: dopředný průchod `forward` očekává "zespodu" nějaký vstup `input` a "zleva" minulý stav `h0`. U `LSTM` je tento stav dvouvektorový. Výstupem je `output`, což je vlastně sekvence skrytých stavů poslední vrstvy rekurentní sítě pro jednotlivé kroky v čase, a nový stav `hn` po provedení celého průchodu. Vše vystihuje následující obrázek.

![](https://i.stack.imgur.com/SjnTl.png)

Zdroj: https://stackoverflow.com/a/48305882/9418551

V nejjednoušším případě máme pouze jednu vrstvu sítě a jeden krok. Potom `output` a `hn` jsou stejné. `output` tedy **neprochází žádnou lineární vrstvou**, jak by se mohlo na první pohled zdát. Transformaci na skóre/pravděpodobnost jednotlivých znaků tedy musíme provést sami.

**Příklad:** porovnejme `output` a `hidden`.
tensory by měly být tvaru `(seq, batch, dim)`
- `seq` ... jak jdou znaky ve "věte" za sebou
- `batch` ... počet paralelně zpracovávaných sekvencí, nezávisle na sobě
- `dim` ... příznaky na vstupu

Například tedy: `(10, 3, 5)` by znamenalo:
- 3 paralelně zpracovávané
- 10-znakové věty,
- kde každý znak reprezentuje 5dimenzionální vektor

In [None]:
# do site posleme pouze jeden znak
e0 = e[0].view(1, 1, -1)
e0.shape

In [None]:
# RNN ocekava na vstupu vektor o rozmeru 6 a skryty stav bude mit rozmer 8
rnn = nn.RNN(30, 8)

# inicializace skryteho stavu a vstupu
# tensory by mely byt tvaru (seq, batch, dim)
h = Variable(torch.rand(8))
o, h = rnn(e0)

print(o)
print(h)

Nyní už více samostatně. Zadefinujeme vlastní třídu, která bude řešit jednotlivé kroky sama ve svém dopředném průchodu. Vstupem tedy bude sekvence čísel, výstupem skóre jednotlivých kroků a skrytý stav z posledního kroku.

In [None]:
class RNN(nn.Module):
    def __init__(self, voc_size, emb_dim, hidden_size, output_size, n_layers=1):
        super(RNN, self).__init__()

        self.emb = ...
        self.rnn = ...
        self.fc = ...

    def forward(self, x, hidden):
        ...
        return score, hidden

    def init_hidden(self):
        h = Variable(torch.zeros(self.rnn.num_layers, 1, self.rnn.hidden_size))
        c = Variable(torch.zeros(self.rnn.num_layers, 1, self.rnn.hidden_size))
        
        return h, c

In [None]:
voc_size = ...
emb_dim = ...
hidden_dim = ...
output_dim = ...

rnn = RNN(voc_size, emb_dim, hidden_dim, output_dim, n_layers=1)
rnn_history = []
example_history = []

Vytvoříme trénovací data. `y_train` je v tomto případě stejného rozměru jako `X_train` a ke každému znaku udává následjící. Poslední znak má jako label `\n`, značící konec sekvence. Data vytvoříme jako seznamy, tj. `list`, kde každý prvek je jedna věta, už převedená na indexy znaků metodou `char_tensor`.

In [None]:
X_train = ...
y_train = ...

In [None]:
idx = random.randrange(len(X_train))
print(to_string(X_train[idx].data))
print('data:  {} ... {}'.format(to_string(X_train[idx].data.numpy()[:10]), to_string(X_train[idx].data.numpy()[-10:])))
print('label: {} ... {}'.format(to_string(y_train[idx].data.numpy()[:10]), to_string(y_train[idx].data.numpy()[-10:])))

Vytvoříme si také funkci pro samplování z naší sítě. Funkce přijme náš model `rnn`, nějaký inicializační text `init_text`, příp. i inicializační `hidden`, a vygeneruje text - vrací tedy string.

In [None]:
def sample(rnn, init_text='', hidden=None, maxlen=150, mode='multinomial'):
    # vystupni text bude pole (na konci prevedeme na str)
    out_text = list(init_text)
    
    # pokud nezadan, inicializujeme nahodne, dle rozlozeni prvnich znaku
    if not out_text:
        s = np.random.choice(len(chars), p=p0)
        out_text = [chars[s]]
    
    # to same hidden
    if hidden is None:
        hidden = rnn.init_hidden()
        
        # sit projedeme vstupem, abychom ziskali aktualni hidden stav
        x = char_tensor(out_text)
        for i in range(len(out_text)):
            score, hidden = rnn(x[i], hidden)
    
    # nasledujici znak je posledni znak prozatimniho vystupu
    x = char_tensor(out_text[-1])

    while True:
        # dopredny pruchod
        ...
        
        # pravdepodobnosti znaku
        ...
        
        # vybrat nasledujici znak --> index do `k`
        if mode == 'multinomial':
            k = torch.multinomial(score.data.view(-1).div(0.9).exp(), 1)[0]
        elif mode == 'argmax':
            k = ...
        elif mode == 'proportional':
            k = ...
        
        # zastavit, pokud end-token
        ...
        
        # pridat znak
        text.append(chars[k])
        
        # zastavit, pokud text je moc dlouhy
        ...
        
        # novy vstupni znak
        ...
    
    return ''.join(text)

In [None]:
print(sample(rnn, init_text='prezident', mode='multinomial'))

# Trénování

In [None]:
optimizer = torch.optim.Adam(rnn.parameters(), lr=0.005)
criterion = nn.CrossEntropyLoss()

In [None]:
example = sample(rnn, mode='argmax')
smooth_loss = -np.log(1. / len(chars))
max_per_epoch = 10000

for epoch in range(1):
    if max_per_epoch < len(lines):
        perm = np.random.permutation(len(lines))
    else:
        perm = np.arange(len(lines))
    
    pb = tqdm.tqdm_notebook(perm, desc='ep {:03d}'.format(epoch))
    
    for li in enumerate(pb):
        hidden = rnn.init_hidden()
        rnn.zero_grad()
        loss = 0.
    
        x = ...
        y = ...
        
        for ic, c in enumerate(line):
            # dopredny pruchod
            ...
            
            # loss
            ...
        
        loss /= len(x)

        loss.backward()
        optimizer.step()
        
        if li % 100 == 0:
            example = sample(rnn)
            example_history.append(example)
        
        rnn_history.append(float(loss))
        smooth_loss = 0.99 * smooth_loss + 0.01 * float(loss)
        pb.set_postfix(loss='{:.3f}'.format(smooth_loss), ex=example[:40])

In [None]:
plot_history(rnn_history, avg_range=100)

In [None]:
print(sample(rnn, init_text='prezident', mode='multinomial'))