# Свёрточные нейросети и POS-теггинг

POS-теггинг - определение частей речи (снятие частеречной неоднозначности)

In [5]:
# Если Вы запускаете ноутбук на colab или kaggle,
# выполните следующие строчки, чтобы подгрузить библиотеку dlnlputils:

# !git clone https://github.com/Samsung-IT-Academy/stepik-dl-nlp.git && pip install -r stepik-dl-nlp/requirements.txt
import sys; sys.path.append('./stepik-dl-nlp')

In [None]:
# !pip install pyconll
# !pip install spacy_udpipe

In [3]:
%load_ext autoreload
%autoreload 2

import warnings
warnings.filterwarnings('ignore')

from sklearn.metrics import classification_report

import numpy as np

import pyconll

import torch
from torch import nn
from torch.nn import functional as F
from torch.utils.data import TensorDataset

import dlnlputils
from dlnlputils.data import tokenize_corpus, build_vocabulary, \
    character_tokenize, pos_corpus_to_tensor, POSTagger
from dlnlputils.pipeline import train_eval_loop, predict_with_model, init_random_seed

init_random_seed()

The autoreload extension is already loaded. To reload it, use:
  %reload_ext autoreload


## Загрузка текстов и разбиение на обучающую и тестовую подвыборки

In [6]:
# Если Вы запускаете ноутбук на colab или kaggle, добавьте в начало пути ./stepik-dl-nlp
!wget -O ./datasets/ru_syntagrus-ud-train.conllu https://raw.githubusercontent.com/UniversalDependencies/UD_Russian-SynTagRus/master/ru_syntagrus-ud-train.conllu
!wget -O ./datasets/ru_syntagrus-ud-dev.conllu https://raw.githubusercontent.com/UniversalDependencies/UD_Russian-SynTagRus/master/ru_syntagrus-ud-dev.conllu

"wget" не является внутренней или внешней
командой, исполняемой программой или пакетным файлом.
"wget" не является внутренней или внешней
командой, исполняемой программой или пакетным файлом.


In [None]:
# Если Вы запускаете ноутбук на colab или kaggle, добавьте в начало пути ./stepik-dl-nlp
full_train = pyconll.load_from_file('./datasets/ru_syntagrus-ud-train.conllu')
full_test = pyconll.load_from_file('./datasets/ru_syntagrus-ud-dev.conllu')

In [None]:
for sent in full_train[:2]:
    for token in sent:
        print(token.form, token.upos)
    print()

In [None]:
MAX_SENT_LEN = max(len(sent) for sent in full_train)
MAX_ORIG_TOKEN_LEN = max(len(token.form) for sent in full_train for token in sent)
print('Наибольшая длина предложения', MAX_SENT_LEN)
print('Наибольшая длина токена', MAX_ORIG_TOKEN_LEN)

In [None]:
all_train_texts = [' '.join(token.form for token in sent) for sent in full_train]
print('\n'.join(all_train_texts[:10]))

In [None]:
train_char_tokenized = tokenize_corpus(all_train_texts, tokenizer=character_tokenize)
char_vocab, word_doc_freq = build_vocabulary(train_char_tokenized, max_doc_freq=1.0, min_count=5, pad_word='<PAD>')
print("Количество уникальных символов", len(char_vocab))
print(list(char_vocab.items())[:10])

In [None]:
UNIQUE_TAGS = ['<NOTAG>'] + sorted({token.upos for sent in full_train for token in sent if token.upos})
label2id = {label: i for i, label in enumerate(UNIQUE_TAGS)}
label2id

In [None]:
train_inputs, train_labels = pos_corpus_to_tensor(full_train, char_vocab, label2id, MAX_SENT_LEN, MAX_ORIG_TOKEN_LEN)
train_dataset = TensorDataset(train_inputs, train_labels)

test_inputs, test_labels = pos_corpus_to_tensor(full_test, char_vocab, label2id, MAX_SENT_LEN, MAX_ORIG_TOKEN_LEN)
test_dataset = TensorDataset(test_inputs, test_labels)

In [None]:
train_inputs[1][:5]

In [None]:
train_labels[1]

## Вспомогательная свёрточная архитектура

In [None]:
class StackedConv1d(nn.Module):
    def __init__(self, features_num, layers_n=1, kernel_size=3, conv_layer=nn.Conv1d, dropout=0.0):
        super().__init__()
        layers = []
        for _ in range(layers_n):
            layers.append(nn.Sequential(
                conv_layer(features_num, features_num, kernel_size, padding=kernel_size//2),
                nn.Dropout(dropout),
                nn.LeakyReLU()))
        self.layers = nn.ModuleList(layers)
    
    def forward(self, x):
        """x - BatchSize x FeaturesNum x SequenceLen"""
        for layer in self.layers:
            x = x + layer(x)
        return x

## Предсказание частей речи на уровне отдельных токенов

In [None]:
class SingleTokenPOSTagger(nn.Module):
    def __init__(self, vocab_size, labels_num, embedding_size=32, **kwargs):
        super().__init__()
        self.char_embeddings = nn.Embedding(vocab_size, embedding_size, padding_idx=0)
        self.backbone = StackedConv1d(embedding_size, **kwargs)
        self.global_pooling = nn.AdaptiveMaxPool1d(1)
        self.out = nn.Linear(embedding_size, labels_num)
        self.labels_num = labels_num
    
    def forward(self, tokens):
        """tokens - BatchSize x MaxSentenceLen x MaxTokenLen"""
        batch_size, max_sent_len, max_token_len = tokens.shape
        tokens_flat = tokens.view(batch_size * max_sent_len, max_token_len)
        
        char_embeddings = self.char_embeddings(tokens_flat)  # BatchSize*MaxSentenceLen x MaxTokenLen x EmbSize
        char_embeddings = char_embeddings.permute(0, 2, 1)  # BatchSize*MaxSentenceLen x EmbSize x MaxTokenLen
        
        features = self.backbone(char_embeddings)
        
        global_features = self.global_pooling(features).squeeze(-1)  # BatchSize*MaxSentenceLen x EmbSize
        
        logits_flat = self.out(global_features)  # BatchSize*MaxSentenceLen x LabelsNum
        logits = logits_flat.view(batch_size, max_sent_len, self.labels_num)  # BatchSize x MaxSentenceLen x LabelsNum
        logits = logits.permute(0, 2, 1)  # BatchSize x LabelsNum x MaxSentenceLen
        return logits

In [None]:
single_token_model = SingleTokenPOSTagger(len(char_vocab), len(label2id), embedding_size=64, layers_n=3, kernel_size=3, dropout=0.3)
print('Количество параметров', sum(np.product(t.shape) for t in single_token_model.parameters()))

In [None]:
(best_val_loss,
 best_single_token_model) = train_eval_loop(single_token_model,
                                            train_dataset,
                                            test_dataset,
                                            F.cross_entropy,
                                            lr=5e-3,
                                            epoch_n=10,
                                            batch_size=64,
                                            device='cuda',
                                            early_stopping_patience=5,
                                            max_batches_per_epoch_train=500,
                                            max_batches_per_epoch_val=100,
                                            lr_scheduler_ctor=lambda optim: torch.optim.lr_scheduler.ReduceLROnPlateau(optim, patience=2,
                                                                                                                       factor=0.5,
                                                                                                                       verbose=True))

In [None]:
# Если Вы запускаете ноутбук на colab или kaggle, добавьте в начало пути ./stepik-dl-nlp
torch.save(best_single_token_model.state_dict(), './models/single_token_pos.pth')

In [None]:
# Если Вы запускаете ноутбук на colab или kaggle, добавьте в начало пути ./stepik-dl-nlp
single_token_model.load_state_dict(torch.load('./models/single_token_pos.pth'))

In [None]:
train_pred = predict_with_model(single_token_model, train_dataset)
train_loss = F.cross_entropy(torch.tensor(train_pred),
                             torch.tensor(train_labels))
print('Среднее значение функции потерь на обучении', float(train_loss))
print(classification_report(train_labels.view(-1), train_pred.argmax(1).reshape(-1), target_names=UNIQUE_TAGS))
print()

test_pred = predict_with_model(single_token_model, test_dataset)
test_loss = F.cross_entropy(torch.tensor(test_pred),
                            torch.tensor(test_labels))
print('Среднее значение функции потерь на валидации', float(test_loss))
print(classification_report(test_labels.view(-1), test_pred.argmax(1).reshape(-1), target_names=UNIQUE_TAGS))

## Предсказание частей речи на уровне предложений (с учётом контекста)

In [None]:
class SentenceLevelPOSTagger(nn.Module):
    def __init__(self, vocab_size, labels_num, embedding_size=32, single_backbone_kwargs={}, context_backbone_kwargs={}):
        super().__init__()
        self.embedding_size = embedding_size
        self.char_embeddings = nn.Embedding(vocab_size, embedding_size, padding_idx=0)
        self.single_token_backbone = StackedConv1d(embedding_size, **single_backbone_kwargs)
        self.context_backbone = StackedConv1d(embedding_size, **context_backbone_kwargs)
        self.global_pooling = nn.AdaptiveMaxPool1d(1)
        self.out = nn.Conv1d(embedding_size, labels_num, 1)
        self.labels_num = labels_num
    
    def forward(self, tokens):
        """tokens - BatchSize x MaxSentenceLen x MaxTokenLen"""
        batch_size, max_sent_len, max_token_len = tokens.shape
        tokens_flat = tokens.view(batch_size * max_sent_len, max_token_len)
        
        char_embeddings = self.char_embeddings(tokens_flat)  # BatchSize*MaxSentenceLen x MaxTokenLen x EmbSize
        char_embeddings = char_embeddings.permute(0, 2, 1)  # BatchSize*MaxSentenceLen x EmbSize x MaxTokenLen
        char_features = self.single_token_backbone(char_embeddings)
        
        token_features_flat = self.global_pooling(char_features).squeeze(-1)  # BatchSize*MaxSentenceLen x EmbSize

        token_features = token_features_flat.view(batch_size, max_sent_len, self.embedding_size)  # BatchSize x MaxSentenceLen x EmbSize
        token_features = token_features.permute(0, 2, 1)  # BatchSize x EmbSize x MaxSentenceLen
        context_features = self.context_backbone(token_features)  # BatchSize x EmbSize x MaxSentenceLen

        logits = self.out(context_features)  # BatchSize x LabelsNum x MaxSentenceLen
        return logits

In [None]:
sentence_level_model = SentenceLevelPOSTagger(len(char_vocab), len(label2id), embedding_size=64,
                                              single_backbone_kwargs=dict(layers_n=3, kernel_size=3, dropout=0.3),
                                              context_backbone_kwargs=dict(layers_n=3, kernel_size=3, dropout=0.3))
print('Количество параметров', sum(np.product(t.shape) for t in sentence_level_model.parameters()))

In [None]:
(best_val_loss,
 best_sentence_level_model) = train_eval_loop(sentence_level_model,
                                              train_dataset,
                                              test_dataset,
                                              F.cross_entropy,
                                              lr=5e-3,
                                              epoch_n=10,
                                              batch_size=64,
                                              device='cuda',
                                              early_stopping_patience=5,
                                              max_batches_per_epoch_train=500,
                                              max_batches_per_epoch_val=100,
                                              lr_scheduler_ctor=lambda optim: torch.optim.lr_scheduler.ReduceLROnPlateau(optim, patience=2,
                                                                                                                         factor=0.5,
                                                                                                                         verbose=True))

In [None]:
# Если Вы запускаете ноутбук на colab или kaggle, добавьте в начало пути ./stepik-dl-nlp
torch.save(best_sentence_level_model.state_dict(), './models/sentence_level_pos.pth')

In [None]:
# Если Вы запускаете ноутбук на colab или kaggle, добавьте в начало пути ./stepik-dl-nlp
sentence_level_model.load_state_dict(torch.load('./models/sentence_level_pos.pth'))

In [None]:
train_pred = predict_with_model(sentence_level_model, train_dataset)
train_loss = F.cross_entropy(torch.tensor(train_pred),
                             torch.tensor(train_labels))
print('Среднее значение функции потерь на обучении', float(train_loss))
print(classification_report(train_labels.view(-1), train_pred.argmax(1).reshape(-1), target_names=UNIQUE_TAGS))
print()

test_pred = predict_with_model(sentence_level_model, test_dataset)
test_loss = F.cross_entropy(torch.tensor(test_pred),
                            torch.tensor(test_labels))
print('Среднее значение функции потерь на валидации', float(test_loss))
print(classification_report(test_labels.view(-1), test_pred.argmax(1).reshape(-1), target_names=UNIQUE_TAGS))

## Применение полученных теггеров и сравнение

In [None]:
single_token_pos_tagger = POSTagger(single_token_model, char_vocab, UNIQUE_TAGS, MAX_SENT_LEN, MAX_ORIG_TOKEN_LEN)
sentence_level_pos_tagger = POSTagger(sentence_level_model, char_vocab, UNIQUE_TAGS, MAX_SENT_LEN, MAX_ORIG_TOKEN_LEN)

In [None]:
test_sentences = [
    'Мама мыла раму.',
    'Косил косой косой косой.',
    'Глокая куздра штеко будланула бокра и куздрячит бокрёнка.',
    'Сяпала Калуша с Калушатами по напушке.',
    'Пирожки поставлены в печь, мама любит печь.',
    'Ведро дало течь, вода стала течь.',
    'Три да три, будет дырка.',
    'Три да три, будет шесть.',
    'Сорок сорок'
]
test_sentences_tokenized = tokenize_corpus(test_sentences, min_token_size=1)

In [None]:
for sent_tokens, sent_tags in zip(test_sentences_tokenized, single_token_pos_tagger(test_sentences)):
    print(' '.join('{}-{}'.format(tok, tag) for tok, tag in zip(sent_tokens, sent_tags)))
    print()

In [None]:
for sent_tokens, sent_tags in zip(test_sentences_tokenized, sentence_level_pos_tagger(test_sentences)):
    print(' '.join('{}-{}'.format(tok, tag) for tok, tag in zip(sent_tokens, sent_tags)))
    print()

## Свёрточный модуль своими руками

In [None]:
class MyConv1d(nn.Module):
    def __init__(self, in_channels, out_channels, kernel_size, padding=0):
        super().__init__()
        self.in_channels = in_channels
        self.out_channels = out_channels
        self.kernel_size = kernel_size
        self.padding = padding
        self.weight = nn.Parameter(torch.randn(in_channels * kernel_size, out_channels) / (in_channels * kernel_size),
                                   requires_grad=True)
        self.bias = nn.Parameter(torch.zeros(out_channels), requires_grad=True)
    
    def forward(self, x):
        """x - BatchSize x InChannels x SequenceLen"""

        batch_size, src_channels, sequence_len = x.shape        
        if self.padding > 0:
            pad = x.new_zeros(batch_size, src_channels, self.padding)
            x = torch.cat((pad, x, pad), dim=-1)
            sequence_len = x.shape[-1]

        chunks = []
        chunk_size = sequence_len - self.kernel_size + 1
        for offset in range(self.kernel_size):
            chunks.append(x[:, :, offset:offset + chunk_size])

        in_features = torch.cat(chunks, dim=1)  # BatchSize x InChannels * KernelSize x ChunkSize
        in_features = in_features.permute(0, 2, 1)  # BatchSize x ChunkSize x InChannels * KernelSize
        out_features = torch.bmm(in_features, self.weight.unsqueeze(0).expand(batch_size, -1, -1)) + self.bias.unsqueeze(0).unsqueeze(0)
        out_features = out_features.permute(0, 2, 1)  # BatchSize x OutChannels x ChunkSize
        return out_features

In [None]:
sentence_level_model_my_conv = SentenceLevelPOSTagger(len(char_vocab), len(label2id), embedding_size=64,
                                                      single_backbone_kwargs=dict(layers_n=3, kernel_size=3, dropout=0.3, conv_layer=MyConv1d),
                                                      context_backbone_kwargs=dict(layers_n=3, kernel_size=3, dropout=0.3, conv_layer=MyConv1d))
print('Количество параметров', sum(np.product(t.shape) for t in sentence_level_model_my_conv.parameters()))

In [None]:
(best_val_loss,
 best_sentence_level_model_my_conv) = train_eval_loop(sentence_level_model_my_conv,
                                                      train_dataset,
                                                      test_dataset,
                                                      F.cross_entropy,
                                                      lr=5e-3,
                                                      epoch_n=10,
                                                      batch_size=64,
                                                      device='cuda',
                                                      early_stopping_patience=5,
                                                      max_batches_per_epoch_train=500,
                                                      max_batches_per_epoch_val=100,
                                                      lr_scheduler_ctor=lambda optim: torch.optim.lr_scheduler.ReduceLROnPlateau(optim, patience=2,
                                                                                                                                 factor=0.5,
                                                                                                                                 verbose=True))

In [None]:
train_pred = predict_with_model(best_sentence_level_model_my_conv, train_dataset)
train_loss = F.cross_entropy(torch.tensor(train_pred),
                             torch.tensor(train_labels))
print('Среднее значение функции потерь на обучении', float(train_loss))
print(classification_report(train_labels.view(-1), train_pred.argmax(1).reshape(-1), target_names=UNIQUE_TAGS))
print()

test_pred = predict_with_model(best_sentence_level_model_my_conv, test_dataset)
test_loss = F.cross_entropy(torch.tensor(test_pred),
                            torch.tensor(test_labels))
print('Среднее значение функции потерь на валидации', float(test_loss))
print(classification_report(test_labels.view(-1), test_pred.argmax(1).reshape(-1), target_names=UNIQUE_TAGS))