# RNN

Привет! Это семинарский ноутбук для курса DL Basic для Тинькофф. В этом ноутбуке мы рассмотрим рекуррентные нейронные сети (RNN) и их разновидности. Мы будем использовать библиотеку PyTorch.

Для начала установим все нужные библиотеки. Если вы используете Google Colab, то просто запустите следующую ячейку. Если вы используете свой компьютер, то установите все библиотеки, перечисленные в следующей ячейке.

In [1]:
import subprocess
import sys


IN_COLAB = 'google.colab' in sys.modules

if IN_COLAB:
    subprocess.run("pip install datasets nltk gensim", shell=True)
    subprocess.run("python -m nltk.downloader punkt", shell=True)

In [2]:
import gensim
import nltk
import torch

import gensim.downloader as api

from datasets import load_dataset, load_metric

## `datasets` lib

Познакомимся с библиотекой datasets. Эта библиотека содержит наборы данных, которые можно использовать для обучения моделей. В этом ноутбуке мы будем использовать набор данных [SST-2](https://nlp.stanford.edu/sentiment/index.html). Это набор данных, который содержит отзывы на фильмы и их оценки (положительные или отрицательные). Давайте загрузим этот набор данных и посмотрим на него.

In [4]:
sst2_dataset = load_dataset("sst2")
sst2_dataset



  0%|          | 0/3 [00:00<?, ?it/s]

DatasetDict({
    train: Dataset({
        features: ['idx', 'sentence', 'label'],
        num_rows: 67349
    })
    validation: Dataset({
        features: ['idx', 'sentence', 'label'],
        num_rows: 872
    })
    test: Dataset({
        features: ['idx', 'sentence', 'label'],
        num_rows: 1821
    })
})

In [7]:
sst2_dataset["train"]["label"][:10]

[0, 0, 1, 0, 0, 0, 1, 1, 0, 1]

In [None]:
sst2_dataset.map(lambda x: {"length_sentence": len(x["sentence"])})["train"]["length_sentence"]

## Эмбеддинги и токенизация

Для токенизации мы будем использовать библиотеку [NLTK](https://www.nltk.org/). Для работы с эмбеддингами мы будем использовать библиотеку [Gensim](https://radimrehurek.com/gensim/).

In [11]:
tokenizer = nltk.tokenize.WordPunctTokenizer()

In [13]:
input_text = "I am a sentence?"
tokenizer.tokenize(input_text)

['I', 'am', 'a', 'sentence', '?']

In [17]:
nltk.download('omw-1.4')
nltk.download('wordnet')

[nltk_data] Downloading package omw-1.4 to /root/nltk_data...
[nltk_data]   Package omw-1.4 is already up-to-date!
[nltk_data] Downloading package wordnet to /root/nltk_data...


True

In [15]:
lemmatizer = nltk.stem.WordNetLemmatizer()

"разные" - lemmatize("разные") = "разный"
"разные" - stem("разные") = "разн"

In [24]:
input_text = "writing"
lemmatizer.lemmatize(input_text) # will it work?

'writing'

In [19]:
wv = api.load('word2vec-google-news-300')
wv["king"].shape



(300,)

In [None]:
wv["writing"]

In [28]:
HIDDEN_SIZE = 300
MAX_TEXT_LENGTH = 32

## BoW

In [29]:
class SST2Dataset(torch.utils.data.Dataset):
    def __init__(self, tokenizer, dataset):
        self.tokenizer = tokenizer
        
        def tokenizer_sentece(example):
            return {"tokens": self.tokenizer(example["sentence"])}

        self.dataset = dataset.map(tokenizer_sentece)

    def __len__(self):
        return len(self.dataset)

    def __getitem__(self, index):
        example = self.dataset[index]
        return example["tokens"], example["label"]

In [30]:
def tokenize_pipeline(sentence):
    tokens = tokenizer.tokenize(sentence)
    return [lemmatizer.lemmatize(token) for token in tokens if token.isalpha()]

In [31]:
class BoW(torch.nn.Module):
    def __init__(self, wv, embedding_dim=HIDDEN_SIZE):
        super().__init__()

        self.wv = wv
        self.embedding_dim = embedding_dim
        self.linear_cls = torch.nn.Linear(embedding_dim, 1)

    def forward(self, input_ids):
        embs = [self.wv[token] for token in input_ids if token in self.wv]
        if len(embs) > 0:
            return torch.sigmoid(self.linear_cls(torch.tensor(sum(embs))))
        else:
            return torch.sigmoid(self.linear_cls(torch.zeros((self.embedding_dim,))))

In [32]:
model = BoW(wv)
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
criterion = torch.nn.BCELoss()

In [33]:
train_dataset = SST2Dataset(tokenize_pipeline, sst2_dataset["train"])
valid_dataset = SST2Dataset(tokenize_pipeline, sst2_dataset["validation"])

Map:   0%|          | 0/67349 [00:00<?, ? examples/s]

Map:   0%|          | 0/872 [00:00<?, ? examples/s]

In [34]:
train_dataset[0]

(['hide', 'new', 'secretion', 'from', 'the', 'parental', 'unit'], 0)

In [35]:
for example_idx in range(len(train_dataset)):
    optimizer.zero_grad()
    example, y_target = train_dataset[example_idx]
    y_target = torch.tensor(y_target).to(torch.float32)
    y_pred = model(example)
    loss = criterion(y_pred[0], y_target)
    loss.backward()
    optimizer.step()

KeyboardInterrupt: ignored

In [38]:
valid_y_pred = [model(example) for example, _ in valid_dataset]
valid_y_pred[0], valid_dataset[0]

(tensor([0.9360], grad_fn=<SigmoidBackward0>),
 (['it', 's', 'a', 'charming', 'and', 'often', 'affecting', 'journey'], 1))

## Simple RNN

$$
h_t = \tanh(W_{hh}h_{t-1} + W_{xh}x_t + b_h)
$$

In [40]:
class RNNCell(torch.nn.Module):
    def __init__(self, hidden_dim=HIDDEN_SIZE):
        super().__init__()

        self.hidden_linear = torch.nn.Linear(hidden_dim, hidden_dim)
        self.input_linear = torch.nn.Linear(hidden_dim, hidden_dim)

        self.hidden_dim = hidden_dim

    def forward(self, input_vectors, hidden):
        if hidden == None:
            hidden = torch.zeros((self.hidden_dim, ))
        for input in input_vectors:
            hidden = torch.tanh(
                self.hidden_linear(hidden) + self.input_linear(input)
            )
        return hidden

In [41]:
class RNN(torch.nn.Module):
    def __init__(self, wv, hidden_dim=HIDDEN_SIZE, output_dim=1, n_layers=1, dropout=0.0):
        super().__init__()

        self.wv = wv
        self.embedding_dim = hidden_dim
        self.rnn_cell = RNNCell(hidden_dim)

        self.linear_cls = torch.nn.Linear(hidden_dim, 1)

    def forward(self, input_ids):
        embs = [self.wv[token] for token in input_ids if token in self.wv]
        if len(embs) == 0:
            embs = [torch.zeros((self.embedding_dim,))]
        tensors = [torch.Tensor(emb) for emb in embs]
        output_state = self.rnn_cell(tensors, None)
        return torch.sigmoid(self.linear_cls(output_state))

In [43]:
model = RNN(wv)
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
criterion = torch.nn.BCELoss()

In [44]:
for example_idx in range(len(train_dataset)):
    optimizer.zero_grad()
    example, y_target = train_dataset[example_idx]
    y_target = torch.tensor(y_target).to(torch.float32)
    y_pred = model(example)
    loss = criterion(y_pred[0], y_target)
    try:
        loss.backward()
    except RuntimeError:
        print(example)
        raise
    optimizer.step()

  tensors = [torch.Tensor(emb) for emb in embs]


KeyboardInterrupt: ignored

In [46]:
valid_y_pred = [model(example) for example, _ in valid_dataset]
valid_y_pred[1], valid_dataset[1]

(tensor([0.4013], grad_fn=<SigmoidBackward0>),
 (['unflinchingly', 'bleak', 'and', 'desperate'], 0))

## LSTM


\begin{align}
i_t &= \sigma(W_{ii}x_t + W_{hi}h_{t-1} + b_i) \\
f_t &= \sigma(W_{if}x_t + W_{hf}h_{t-1} + b_f) \\
g_t &= \tanh(W_{ig}x_t + W_{hg}h_{t-1} + b_g) \\
o_t &= \sigma(W_{io}x_t + W_{ho}h_{t-1} + b_o) \\
c_t &= f_t \odot c_{t-1} + i_t \odot g_t \\
h_t &= o_t \odot \tanh(c_t)
\end{align}

In [63]:
from tqdm.auto import trange

In [50]:
class LSTMCell(torch.nn.Module):
    def __init__(self, hidden_dim):
        super().__init__()

        self.input_i_linear = torch.nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.input_h_linear = torch.nn.Linear(hidden_dim, hidden_dim)
        self.forget_i_linear = torch.nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.forget_h_linear = torch.nn.Linear(hidden_dim, hidden_dim)
        self.gate_i_linear = torch.nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.gate_h_linear = torch.nn.Linear(hidden_dim, hidden_dim)
        self.out_i_linear = torch.nn.Linear(hidden_dim, hidden_dim, bias=False)
        self.out_h_linear = torch.nn.Linear(hidden_dim, hidden_dim)

        self.hidden_dim = hidden_dim

    def forward(self, input_vectors, hidden = None, context = None):
        if hidden == None:
            hidden = torch.zeros((self.hidden_dim, ))
        if context == None:
            context = torch.zeros((self.hidden_dim, ))
        for input in input_vectors:
            i = torch.sigmoid(
                self.input_i_linear(input) + self.input_h_linear(hidden)
            )
            f = torch.sigmoid(
                self.forget_i_linear(input) + self.forget_h_linear(hidden)
            )
            g = torch.tanh(
                self.gate_i_linear(input) + self.gate_h_linear(hidden)
            )
            o = torch.sigmoid(
                self.out_i_linear(input) + self.out_h_linear(hidden)
            )

            context = f * context + i * g
            hidden = o * torch.tanh(context)
        return hidden, context

In [54]:
class LSTM(torch.nn.Module):
    def __init__(self, wv, hidden_dim=HIDDEN_SIZE, output_dim=1, n_layers=1, dropout=0.0):
        super().__init__()

        self.wv = wv
        self.embedding_dim = hidden_dim
        self.lstm_cell = LSTMCell(hidden_dim)

        self.linear_cls = torch.nn.Linear(hidden_dim, 1)

    def forward(self, input_ids):
        embs = [self.wv[token] for token in input_ids if token in self.wv]
        if len(embs) == 0:
            embs = [torch.zeros((self.embedding_dim,))]
        tensors = [torch.Tensor(emb) for emb in embs]
        output_state = self.lstm_cell(tensors, None, None)
        return torch.sigmoid(self.linear_cls(output_state[0]))

In [64]:
model = LSTM(wv)
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
criterion = torch.nn.BCELoss()

In [65]:
for example_idx in trange(len(train_dataset)):
    optimizer.zero_grad()
    example, y_target = train_dataset[example_idx]
    y_target = torch.tensor(y_target).to(torch.float32)
    y_pred = model(example)
    loss = criterion(y_pred[0], y_target)
    try:
        loss.backward()
    except RuntimeError:
        print(example)
        raise
    optimizer.step()

  0%|          | 0/67349 [00:00<?, ?it/s]

KeyboardInterrupt: ignored

In [66]:
valid_y_pred = [model(example) for example, _ in valid_dataset]
valid_y_pred[0], valid_dataset[0]

(tensor([0.5212], grad_fn=<SigmoidBackward0>),
 (['it', 's', 'a', 'charming', 'and', 'often', 'affecting', 'journey'], 1))

In [None]:
#evaluation

In [58]:
?torch.nn.LSTMCell

In [59]:
class BidirectionalLSTM(torch.nn.Module):
    def __init__(self, wv, hidden_dim=HIDDEN_SIZE, output_dim=1, n_layers=1, dropout=0.0):
        super().__init__()


        self.forward_cell = torch.nn.LSTMCell(hidden_dim, hidden_dim)
        self.backward_cell = torch.nn.LSTMCell(hidden_dim, hidden_dim)
        
        self.wv = wv
        self.embedding_dim = hidden_dim
        self.linear_cls = torch.nn.Linear(hidden_dim, 1)

    def forward(self, input_ids):
        forward_hidden = torch.zeros((self.embedding_dim,))
        backward_hidden = torch.zeros((self.embedding_dim,))
        context = torch.zeros((self.embedding_dim,))

        embs = [self.wv[token] for token in input_ids if token in self.wv]
        if len(embs) == 0:
            embs = [torch.zeros((self.embedding_dim,))]
        tensors = [torch.Tensor(emb) for emb in embs]

        # forward

        for token in tensors:
            forward_hidden, context = self.forward_cell(token, (forward_hidden, context))

        # backward

        context = torch.zeros((self.embedding_dim,))
        for token in tensors[::-1]:
            backward_hidden, context = self.backward_cell(token, (backward_hidden, context))
        
        return torch.sigmoid(self.linear_cls(forward_hidden + backward_hidden))

In [67]:
model = BidirectionalLSTM(wv)
optimizer = torch.optim.SGD(model.parameters(), lr=1e-3)
criterion = torch.nn.BCELoss()

In [68]:
for example_idx in trange(len(train_dataset)):
    optimizer.zero_grad()
    example, y_target = train_dataset[example_idx]
    y_target = torch.tensor(y_target).to(torch.float32)
    y_pred = model(example)
    loss = criterion(y_pred[0], y_target)
    try:
        loss.backward()
    except RuntimeError:
        print(example)
        raise
    optimizer.step()

  0%|          | 0/67349 [00:00<?, ?it/s]

KeyboardInterrupt: ignored

In [62]:
valid_y_pred = [model(example) for example, _ in valid_dataset]
valid_y_pred[0], valid_dataset[0]

(tensor([0.5327], grad_fn=<SigmoidBackward0>),
 (['it', 's', 'a', 'charming', 'and', 'often', 'affecting', 'journey'], 1))

## Compare results