<h1><center>Языковые модели и генерация текста </center></h1>

1. Вероятностные модели
2. Нейросетевые модели (RNN)

#### Построим простую вероятностную языковую модель для генерации названий динозавров. 
Возьмем данные с реальными названиями:

In [1]:
!head -n 5 dinos.txt

Aachenosaurus
Aardonyx
Abdallahsaurus
Abelisaurus
Abrictosaurus


In [2]:
dinos = []
with open('dinos.txt', 'r') as f:
    for line in f.readlines():
        dinos.append((line.strip()))

dinos[:5]

['Aachenosaurus', 'Aardonyx', 'Abdallahsaurus', 'Abelisaurus', 'Abrictosaurus']

Мы будем учить модель для биграмм, поэтому нужно будет получить список биграмм для каждого слова:

In [6]:
from nltk.util import bigrams, pad_sequence

print(dinos[0])
list(bigrams(dinos[0]))

Aachenosaurus


[('A', 'a'),
 ('a', 'c'),
 ('c', 'h'),
 ('h', 'e'),
 ('e', 'n'),
 ('n', 'o'),
 ('o', 's'),
 ('s', 'a'),
 ('a', 'u'),
 ('u', 'r'),
 ('r', 'u'),
 ('u', 's')]

Помимо этого, нужно сообщить модели о начале и конце слова, поэтому добавим соответствующие символы:

In [7]:
list(pad_sequence(dinos[0], 
                  pad_left=True, 
                  left_pad_symbol="<",
                  pad_right=True, 
                  right_pad_symbol='>',
                  n = 2))

['<', 'A', 'a', 'c', 'h', 'e', 'n', 'o', 's', 'a', 'u', 'r', 'u', 's', '>']

Все это вместе можно сделать с помощью __padded_everygram_pipeline__ : этот метод создает итераторы для данных и словаря

In [21]:
from nltk.lm.preprocessing import padded_everygram_pipeline
data, vocab = padded_everygram_pipeline(2, dinos)

Выделим часть данных для тестирования модели:

In [22]:
len(dinos)

1536

In [23]:
from random import shuffle

shuffle(dinos)
dinos[:10]

['Neuquensaurus',
 'Kitadanisaurus',
 'Mei',
 'Rajasaurus',
 'Omosaurus',
 'KlamelisaurusKol',
 'Tachiraptor',
 'Aoniraptor',
 'Tianzhenosaurus',
 'Yangchuanosaurus']

In [24]:
spl = int(98*len(dinos)/100)
train = dinos[:spl]
test = dinos[spl:]
len(train), len(test)

(1505, 31)

In [25]:
train_data, vocab = padded_everygram_pipeline(2, train)

Теперь можно обучать модель:

In [26]:
from nltk.lm import MLE

lm = MLE(2) # 2 = наибольший размер используемых n-грамм
len(lm.vocab)

0

In [27]:
lm.fit(train_data, vocab)
len(lm.vocab)

55

In [29]:
print(*sorted(list(lm.vocab)))

</s> <UNK> <s> A B C D E F G H I J K L M N O P Q R S T U V W X Y Z a b c d e f g h i j k l m n o p q r s t u v w x y z


Абсолютная частота униграммы:

In [30]:
lm.counts['A']

165

Абсолютная совместная частота биграммы 'Ab':

In [31]:
lm.counts[['A']]['b']

5

Частота (вероятность) униграммы:

In [32]:
lm.score('A')

0.007850040439602265

Частота (вероятность) биграмм 'bA' и 'Ab':

In [33]:
lm.score("A", ["b"]), lm.score("b", ["A"])

(0.0, 0.030303030303030304)

In [34]:
lm.score("s", ["<s>"])

0.0

Можно генериовать строки заданной длины:

In [35]:
lm.generate(1, random_seed=42)

'o'

In [36]:
lm.generate(6, random_seed=42)

['o', 'c', 'e', 'l', 'o', 's']

In [37]:
lm.generate(10, random_seed=34)

['n', 'k', 'u', 's', 'h', 'i', 'u', 'r', 'i', 'a']

И посчитать перплексию можели на (искусственном) тестовом множестве:

In [38]:
test_data = [('<s>', 'A'), ('D', 'a')]

lm.perplexity(test_data)

5.684927328248404

Обратите внимание, что перплексия для биграмм, которые не встречались в обучающией выборке, будет ожидаемо бесконечной:

In [39]:
print(lm.perplexity([('ф', 'r')]))

inf


In [40]:
for t in train:
    t_ = ''.join(t)
    if t_.startswith('I'):
        print(t_)

Ischyrosaurus
Isisaurus
Iuticosaurus
Ichthyovenator
Isaberrysaura
Ischisaurus
Issasaurus
Ischioceratops
Indosuchus
Iguanacolossus
Iguanosaurus
Isanosaurus
Ignavusaurus
Inosaurus
Incisivosaurus
Irritator
Illustration
Ingenia
Itemirus
Iguanodon
Iguanoides
Iliosuchus
Indosaurus


In [41]:
test[:3]

['Tatisaurus', 'Eucercosaurus', 'Kryptops']

In [67]:
dino = test[0]
dino

'Tatisaurus'

In [None]:
test_sample = ()

In [70]:
lm.perplexity(list(bigrams(dino)))

4.14842953311193

In [52]:
padded_test = [['<s>']+list(t)+['</s>'] for t in test]
print(*padded_test[0], sep=' ')

<s> T a t i s a u r u s </s>


In [103]:
test_bigrams = [list(bigrams(t)) for t in padded_test]

test_bigrams[0]

[('<s>', 'T'),
 ('T', 'a'),
 ('a', 't'),
 ('t', 'i'),
 ('i', 's'),
 ('s', 'a'),
 ('a', 'u'),
 ('u', 'r'),
 ('r', 'u'),
 ('u', 's'),
 ('s', '</s>')]

In [105]:
test_bigrams[0]

[('<s>', 'T'),
 ('T', 'a'),
 ('a', 't'),
 ('t', 'i'),
 ('i', 's'),
 ('s', 'a'),
 ('a', 'u'),
 ('u', 'r'),
 ('r', 'u'),
 ('u', 's'),
 ('s', '</s>')]

In [110]:
perplexities = [lm.perplexity(t) for t in test_bigrams]

avg_perplexity = sum(perplexities) / len(perplexities)

avg_perplexity

inf

В тестовой выборке встречаются биграммы, которых не было в обучающей выборке, что приводит к бесконечной перплексии. 

Если исключить такие семплы из тестовой выборки:

In [113]:
inf = 10**5

perplexities = [lm.perplexity(t) for t in test_bigrams if lm.perplexity(t) < inf]

avg_perplexity = sum(perplexities) / len(perplexities)

avg_perplexity

10.139349435641014

#### Теперь обучим нейросетевую языковую модель, а именно - рекуррентную нейросеть (RNN).

In [126]:
import numpy as np
import random
import torch
import torch.nn as nn
import torch.optim as optim
import pdb
from torch.utils.data import Dataset, DataLoader

%load_ext autoreload
%autoreload 2

In [127]:
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
hidden_size = 50

Подготовим данные для обучения:

In [128]:
class DinosDataset(Dataset):
    def __init__(self, path):
        super().__init__()
        self.dinos = []
        with open(path, 'r') as f:
            for line in f.readlines():
                self.dinos.append('<'+line.strip()+'>')
        self.vocab = sorted(set(''.join(self.dinos)))
        self.vocab_size = len(self.vocab)
        self.ch_to_idx = {c:i for i, c in enumerate(self.vocab)}
        self.idx_to_ch = {i:c for i, c in enumerate(self.vocab)}
    
    def __getitem__(self, index):
        line = self.dinos[index]
        x_str = line[:-1] 
        y_str = line[1:]
        x = torch.zeros([len(x_str), self.vocab_size], dtype=torch.float)
        y = torch.empty(len(x_str), dtype=torch.long)
        for i, (x_ch, y_ch) in enumerate(zip(x_str, y_str)):
            x[i][self.ch_to_idx[x_ch]] = 1
            y[i] = self.ch_to_idx[y_ch]
        
        return x, y
    
    def __len__(self):
        return len(self.dinos)

In [129]:
trn_ds = DinosDataset('dinos.txt')

In [130]:
trn_ds.dinos[0]

'<Aachenosaurus>'

In [131]:
trn_ds.ch_to_idx

{'<': 0,
 '>': 1,
 'A': 2,
 'B': 3,
 'C': 4,
 'D': 5,
 'E': 6,
 'F': 7,
 'G': 8,
 'H': 9,
 'I': 10,
 'J': 11,
 'K': 12,
 'L': 13,
 'M': 14,
 'N': 15,
 'O': 16,
 'P': 17,
 'Q': 18,
 'R': 19,
 'S': 20,
 'T': 21,
 'U': 22,
 'V': 23,
 'W': 24,
 'X': 25,
 'Y': 26,
 'Z': 27,
 'a': 28,
 'b': 29,
 'c': 30,
 'd': 31,
 'e': 32,
 'f': 33,
 'g': 34,
 'h': 35,
 'i': 36,
 'j': 37,
 'k': 38,
 'l': 39,
 'm': 40,
 'n': 41,
 'o': 42,
 'p': 43,
 'q': 44,
 'r': 45,
 's': 46,
 't': 47,
 'u': 48,
 'v': 49,
 'w': 50,
 'x': 51,
 'y': 52,
 'z': 53}

In [132]:
trn_dl = DataLoader(trn_ds, shuffle=True)

In [133]:
x, y = trn_ds[0]
x, y

(tensor([[1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
          0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
          0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 1., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
          0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
          0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
          0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0., 0., 0.,
          0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
         [0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.,
          0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 1., 0., 0., 0., 0., 0.,
          0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0., 0.],
         [0.

In [134]:
x.shape, y.shape

(torch.Size([14, 54]), torch.Size([14]))

Опишем модель, функцию потерь и алгоритм оптимизации:

In [135]:
class RNN(nn.Module):
    def __init__(self, input_size, hidden_size, output_size):
        super().__init__()
        self.i2h = nn.Linear(input_size + hidden_size, hidden_size)
        self.dropout = nn.Dropout(0.2)
        self.i2o = nn.Linear(hidden_size, output_size)
    
    def forward(self, h_prev, x):
        combined = torch.cat([h_prev, x], dim = 1) 
        h = torch.tanh(self.dropout(self.i2h(combined)))
        y = self.i2o(h)
        return h, y

In [136]:
model = RNN(trn_ds.vocab_size, hidden_size, trn_ds.vocab_size).to(device)
loss_fn = nn.CrossEntropyLoss()
optimizer = optim.SGD(model.parameters(), lr=0.01)

In [137]:
def sample(model):
    model.eval()
    word_size=0
    newline_idx = trn_ds.ch_to_idx['>']
    with torch.no_grad():
        h_prev = torch.zeros([1, hidden_size], dtype=torch.float, device=device)
        x = h_prev.new_zeros([1, trn_ds.vocab_size])
        start_char_idx = trn_ds.ch_to_idx['<']
        indices = [start_char_idx]
        x[0, start_char_idx] = 1
        predicted_char_idx = start_char_idx
        
        while predicted_char_idx != newline_idx and word_size != 50:
            h_prev, y_pred = model(h_prev, x)
            y_softmax_scores = torch.softmax(y_pred, dim=1)
            
            np.random.seed(np.random.randint(1, 5000))
            idx = np.random.choice(np.arange(trn_ds.vocab_size), p=y_softmax_scores.cpu().numpy().ravel())
            indices.append(idx)
            
            x = (y_pred == y_pred.max(1)[0]).float()
            
            predicted_char_idx = idx
            
            word_size += 1
        
        if word_size == 50:
            indices.append(newline_idx)
    return indices

In [138]:
def print_sample(sample_idxs):
    [print(trn_ds.idx_to_ch[x], end ='') for x in sample_idxs]
    print()

In [144]:
def train_one_epoch(model, loss_fn, optimizer, to_print = False):
    model.train()
    for line_num, (x, y) in enumerate(trn_dl):
        loss = 0
        optimizer.zero_grad()
        h_prev = torch.zeros([1, hidden_size], dtype=torch.float, device=device)
        x, y = x.to(device), y.to(device)
        for i in range(x.shape[1]):
            h_prev, y_pred = model(h_prev, x[:, i])
            loss += loss_fn(y_pred, y[:, i])

           
        loss.backward()
        optimizer.step()
    
    if to_print:
        perplexity = torch.exp(loss)
        print(f'Perplexity:{perplexity}')  

In [147]:
def train(model, loss_fn, optimizer, to_print, epochs=1):
    for e in range(1, epochs+1):
        if to_print:
            print('\nEpoch:{}'.format(e))
        train_one_epoch(model, loss_fn, optimizer, to_print)

In [148]:
train(model, loss_fn, optimizer, to_print = True, epochs = 50)


Epoch:1
Perplexity:5.105245947638579e+16

Epoch:2
Perplexity:1413456.375

Epoch:3
Perplexity:7683404.0

Epoch:4
Perplexity:42315952.0

Epoch:5
Perplexity:46993477632.0

Epoch:6
Perplexity:14090447872.0

Epoch:7


KeyboardInterrupt: 

In [149]:
print_sample(sample(model))

<Bgachertifaurus>
