Импортируем библоитеки

In [None]:
from google.colab import drive
import pandas as pd
from nltk.stem.snowball import RussianStemmer
from pymorphy2 import MorphAnalyzer
import numpy as np
import torch.nn as nn
import torch
import multiprocessing
from gensim.models import Word2Vec
from torch.utils.data import Dataset, DataLoader
import torch.utils.data as data_utils
import torch.optim as optim
import time
from sklearn.metrics import classification_report

#Часть 1. Чтение данных и первичная предобработка

В программе будет три режима работы:
1.   Без уменьшения словаря
2.   Стемминг
3.   Лемматизация

In [None]:
REDUCTION_MODE = 0 #0 - no reduction, 1 - stemming, 2 - lemmatize

Подключаем гугл диск, где содержится файл с корпусом, который был получен ранее

In [None]:
drive.mount('/content/drive/')

dir = 'drive/MyDrive/BS/DATA_EXTRACTION/'
corp_cased = dir + 'corp_cased.csv'

Считываем csv файл корпуса и выводим первые пять элементов

In [None]:
df = pd.read_csv(corp_cased, sep='\t', header=None, on_bad_lines='warn')

df.head()

In [None]:
stemmer = RussianStemmer()
lemmatizer = MorphAnalyzer()

Определяем функцию, которая исходя из режима работы делает соотвестующие преобразования слов

In [None]:
def reduction(x):
    x = str(x)
    if REDUCTION_MODE == 0:
        return x.split()
    elif REDUCTION_MODE == 1:
        return [stemmer.stem(token) for token in x.split(' ')]
    elif REDUCTION_MODE == 2:
        return [lemmatizer.normal_forms(token)[0] for token in x.split(' ')]

Используем функцию выше

In [None]:
sentences = df[df.columns[0]].to_numpy()

sentences = np.array(list(map(reduction, sentences)))

print(sentences[:5])

Также разделяем теги на токены

In [None]:
tags = df[df.columns[1]].to_numpy()

tags = np.array(list(map(lambda x: str(x).split(), tags)))

print(tags[:5])

Создаем Word2Vec модель из имеющихся предложений

In [None]:
size, window, min_cnt, sg = 30, 2, 2, 0 # Используем модель CBOW
workers = multiprocessing.cpu_count()
n_iter = 150
w2v_model = Word2Vec(sentences, size = size, window = window, min_count = min_cnt,
                    sg = sg, workers = workers, iter = n_iter)

# Часть 2. Подготовка датасетов для моделей

Создаем функции для создания словарей слов и тегов (чтобы перевести текст в цифру), а также создаем модель для классификатора\
[слово_до, слово, слово_после] -> часть речи

In [None]:
def build_voc_w(stoi):
    idx = 1

    for sentence in sentences:
        for word in sentence:
            if word not in stoi:
                stoi[word] = idx
                idx += 1


def build_voc_t(ttoi):
    idx = 0
    
    for tags_ in tags:
        for tag in tags_:
            if tag not in ttoi:
                ttoi[tag] = idx
                idx += 1

def creator(x, y, stoi, ttoi):
    for i in range(len(sentences)):
        for j in range(len(sentences[i])):
            x_elem = []
            #word before
            if j == 0:
                x_elem.append(0)
            else:
                x_elem.append(stoi[sentences[i][j - 1]])

            #current word
            x_elem.append(stoi[sentences[i][j]])

            #word after
            if j == len(sentences[i]) - 1:
                x_elem.append(0)
            else:
                x_elem.append(stoi[sentences[i][j + 1]])

            x.append(x_elem)
            y.append(ttoi[tags[i][j]])

Применяем определенные выше функции. Нулевой индекс оставляем для выравнивания

In [None]:
#sentences vocs
stoi = {None: 0}

#tags vocs
ttoi = {}

build_voc_w(stoi)
build_voc_t(ttoi)

x = []
y = []

creator(x, y, stoi, ttoi)

Пример данных

In [None]:
print(x[:5])
print(y[:5])

Определяем устройство на котором будет обучаться модель

In [None]:
device = torch.device('cuda' if torch.cuda.is_available() else 'cpu')

Переопределяем класс Dataset из torch

In [None]:
class PosTagDataset(Dataset):
    def __init__(self, x, y):
        self.x = x
        self.y = y

        self.x = torch.LongTensor(self.x).to(device)
        self.y = torch.LongTensor(self.y).to(device)

    def __len__(self):
        return len(self.x)

    def __getitem__(self, idx):
        return (self.x[idx], self.y[idx])

Делим данные на три множества: train, validation, test

In [None]:
x_train, x_val, x_test = tuple(np.split(x, [int(.7 * len(x)), int(.8 * len(x))]))
y_train, y_val, y_test = tuple(np.split(y, [int(.7 * len(y)), int(.8 * len(y))]))

Создаем датасеты

In [None]:
dataset_train = PosTagDataset(x_train, y_train)
dataset_val = PosTagDataset(x_val, y_val)
dataset_test = PosTagDataset(x_test, y_test)

Проверяем корректность метода '__getitem __'

In [None]:
print(dataset_train.__getitem__(0))

Создаем DataLoader на основе созданные ранее датасетов

In [None]:
batch_size = 64

dataloader_train = DataLoader(dataset_train, batch_size=batch_size, shuffle=False)
dataloader_val = DataLoader(dataset_val, batch_size=batch_size, shuffle=False)
dataloader_test = DataLoader(dataset_test, batch_size=batch_size, shuffle=False)

Определяем функцию для получения весов из W2V модели, чтобы в дальнейшем

In [None]:
#перевод word2vec в массив весов для слоя Embedding    
def make_e_weights():
    # wv.index2word - список слов словаря
    # wv.vectors - массив координат слов
    dict_w2v = dict(zip(w2v_model.wv.index2word, w2v_model.wv.vectors))
    e_weights = np.zeros((len(stoi), size))
    for w, t in stoi.items(): # Слово и его код
        w_coords = dict_w2v.get(w) # Координаты слова
        if w_coords is not None:
            e_weights[t] = w_coords
    return torch.FloatTensor(e_weights)

Определяем модель

In [None]:
class W2VPoSTagger(nn.Module):
    def __init__(self, size_w2v, hidden_layer_s):
        
        super().__init__()
        
        weights = make_e_weights()
        weights.to(device)
        self.embedding = nn.Embedding.from_pretrained(weights, freeze=False)
        self.flatten = nn.Flatten()
        self.dropout = nn.Dropout(p=0.3)
        self.fc1 = nn.Linear(size_w2v * 3, hidden_layer_s)
        self.act1 = nn.ReLU()
        self.fc2 = nn.Linear(hidden_layer_s, len(ttoi))
        self.act2 = nn.Softmax()
        
    def forward(self, x):
        x = self.embedding(x)
        x = self.flatten(x)
        x = self.dropout(x)
        x = self.fc1(x)
        x = self.act1(x)
        x = self.fc2(x)
        x = self.act2(x)

        return x

Создаем экземпляр модели

In [None]:
model = W2VPoSTagger(size_w2v=size, hidden_layer_s=30)
print(model)

Считаем количество параметров

In [None]:
def count_parameters(model):
    return sum(p.numel() for p in model.parameters() if p.requires_grad)

print(f'Модель имеет {count_parameters(model):,} обучаемых параметров')

# Часть 3. Обучение модели

Определяем оптимизатор и функцию потерь

In [None]:
optimizer = optim.Adam(model.parameters())
criterion = nn.CrossEntropyLoss()

Помещаем модель и функцию потерь на `device`

In [None]:
model = model.to(device)
criterion = criterion.to(device)

Определяем функцию обучения модели

In [None]:
def train(model, iterator, optimizer, criterion):
    
    epoch_loss = 0

    model.train()

    all_preds = []
    all_tags = []
    
    for batch in iterator:
        
        text = batch[0]
        tags = batch[1]
                
        optimizer.zero_grad()
        
        predictions = model(text)

        all_preds.append(predictions.detach().cpu().numpy())
        all_tags.append(tags.detach().cpu().numpy())

        loss = criterion(predictions, tags)
        
        loss.backward()
        
        optimizer.step()
        
        epoch_loss += loss.item()

        
    return epoch_loss / len(iterator), np.concatenate(all_preds, 0).argmax(1).reshape(-1), np.concatenate(all_tags, 0)

Определяем функцию валидации модели

In [None]:
def evaluate(model, iterator, criterion):
    
    epoch_loss = 0
    
    model.eval()

    all_preds = []
    all_tags = []
    
    with torch.no_grad():
    
        for batch in iterator:

            text = batch[0]
            tags = batch[1]
            
            predictions = model(text)

            all_preds.append(predictions.detach().cpu().numpy())
            all_tags.append(tags.detach().cpu().numpy())
            
            loss = criterion(predictions, tags)

            epoch_loss += loss.item()
        
    return epoch_loss / len(iterator), np.concatenate(all_preds, 0).argmax(1).reshape(-1), np.concatenate(all_tags, 0)

Определяем функцию для подсчета времени выполнения

In [None]:
def epoch_time(start_time, end_time):
    elapsed_time = end_time - start_time
    elapsed_mins = int(elapsed_time / 60)
    elapsed_secs = int(elapsed_time - (elapsed_mins * 60))
    return elapsed_mins, elapsed_secs

Задаем параметры для `classification_report`

In [None]:
cr_labels = []
cr_names = []

for name, label in ttoi.items():
    cr_labels.append(label)
    cr_names.append(name)

print(cr_labels)
print(cr_names)

Обучаем модель

In [None]:
N_EPOCHS = 15

best_valid_loss = float('inf')

for epoch in range(N_EPOCHS):

    start_time = time.time()
    
    train_loss, train_preds, train_tags = train(model, dataloader_train, optimizer, criterion)
    valid_loss, _, __ = evaluate(model, dataloader_val, criterion)
    
    end_time = time.time()

    epoch_mins, epoch_secs = epoch_time(start_time, end_time)
    
    if valid_loss < best_valid_loss:
        best_valid_loss = valid_loss
        torch.save(model.state_dict(), 'tut2-model.pt')
    
    print(f'Epoch: {epoch+1:02} | Epoch Time: {epoch_mins}m {epoch_secs}s')
    print(f'\tTrain Loss: {train_loss:.3f}')

    print(classification_report(train_tags, train_preds, labels=cr_labels, target_names=cr_names))

    print(f'\t Val Loss: {valid_loss:.3f}')

Тестируем модель на отложенной выборке

In [None]:
model.load_state_dict(torch.load('tut2-model.pt'))

test_loss, test_preds, test_tags = evaluate(model, dataloader_test, criterion)

print(f'Test Loss: {test_loss:.3f}')

print(test_preds[10:])
print(test_tags[10:])

print(classification_report(test_tags, test_preds, labels=cr_labels, target_names=cr_names))

# Часть 4. Использование модели keras

Импортируем необходимые библиотеки

In [None]:
import keras
from keras.models import Model
from keras.layers import Input, Dense, Flatten, Embedding, Dropout, Reshape
import time
from sklearn import model_selection

In [None]:
print(x[:10])
print(y[:10])

Создаем модель

In [None]:
#создание модели          
def create_model(sq, num_words, size, num_classes):
    inp = Input(shape = (sq, ), dtype = 'int32')
    e_weights = make_e_weights()
    x = Embedding(num_words, output_dim = size, input_length = sq, 
                weights = [e_weights], trainable = True)(inp)
    x = Flatten()(x)
    x = Dropout(0.3)(x)
    x = Dense(size, activation = 'relu', use_bias = True)(x)
    output = Dense(num_classes, activation = 'softmax', use_bias = True)(x)
    model = Model(inp, output)
    model.summary()
    return model

Определяем функцию-обертку для `clasification_report`

In [None]:
def cl_rep(y_true, y_pred):
    print(classification_report(y_true.numpy(), y_pred.numpy()))

Обучаем модель

In [None]:
#обучение НС
model = create_model(3, len(stoi), 30, len(ttoi))

model.compile(optimizer = 'adam', loss = losses.sparse_categorical_crossentropy, metrics = ['accuracy', cl_rep])

x_train, x_test, y_train, y_test = model_selection.train_test_split(x, y, test_size = 0.25)

startTime = time.time()
history = model.fit(x_train, y_train, batch_size = 128, epochs = 15, verbose = 2, validation_data = (x_test, y_test))
print('Full time:', time.time() - startTime)