In [1]:
import re
import time
import json
import torch
import nltk
import pandas as pd
import torch.nn as nn
import torch.nn.functional as F
from typing import List, Callable, Union, Tuple
from pymystem3 import Mystem
from torchtext.data.utils import get_tokenizer
from torch.utils.data import Dataset, DataLoader
from nltk.stem import WordNetLemmatizer
from nltk.stem.snowball import SnowballStemmer
from gensim import corpora
from nltk.corpus import stopwords
from torch.utils.data.dataset import random_split

  from .autonotebook import tqdm as notebook_tqdm


In [2]:
nltk.download("stopwords")
russian_stopwords = stopwords.words("russian")

[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/nikolai/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


# Data processing

In [4]:
with open('/Users/nikolai/Downloads/mini_wiki_cats.jsonl(1)', 'r') as json_file:
    json_list = list(json_file)

print('Length of data: ',len(json_list), '\n')
#n = 0
categories = []#{'история'}
for json_str in json_list:
    result = json.loads(json_str)
    categories.append(result['cats'][0])
categories = set(categories)
print('Categories:')
print(categories)

Length of data:  6336 

Categories:
{'промышленность', 'игры', 'история', 'литература', 'искусства', 'технологии', 'политика', 'география', 'достопримечательности', 'здравоохранение', 'инфраструктура', 'физика', 'математика'}


In [5]:
wiki_data = [[json.loads(json_str).get('text'), json.loads(json_str).get('cats')[0]] for json_str in json_list]

In [6]:
cats_dict = {cat: 0 for cat in categories}
for json_str in json_list:
    result = json.loads(json_str)
    cats_dict[result['cats'][0]] += 1
print('Number of elements in categories:')
cats_dict

Number of elements in categories:


{'промышленность': 83,
 'игры': 760,
 'история': 3127,
 'литература': 301,
 'искусства': 76,
 'технологии': 277,
 'политика': 145,
 'география': 898,
 'достопримечательности': 354,
 'здравоохранение': 12,
 'инфраструктура': 22,
 'физика': 261,
 'математика': 20}

In [7]:
tokenizer = get_tokenizer(tokenizer=None)
mystem = Mystem()
def tokenize(text, tokenizer=tokenizer):
    clean_text = re.sub(r'[^\w\s]', '', text)
    clean_text = re.sub(r'[^\sА-Яа-я]', '', clean_text)
    clean_text = clean_text.lower()
    tokenized_text = tokenizer(clean_text)
    lemmatized_text = [mystem.lemmatize(token)[0] for token in tokenized_text]
    clean_doc = [re.sub(r'\b[0-9]+\b', '<NUM>', token) for token in lemmatized_text]
    clean_doc = [token for token in clean_doc if token not in russian_stopwords]
    return clean_doc

In [None]:
tokenised_corpus = [tokenize(article[0], tokenizer) for article in wiki_data]
tokenised_corpus[0]

In [10]:
def make_bow_vector(sentence, dict):
    """Make vector with ones and zeros like [1., 1., 0., 0., 1.]"""
    vec = torch.zeros(len(dict))
    for word in sentence:
        vec[dict.token2id[word]] += 1
    return vec.view(-1)

In [11]:
class Wiki_Dataset_BoW(Dataset):
    def __init__(self, data: List = wiki_data) -> None:
        tokenizer = get_tokenizer(tokenizer=None)
        self.corpus = [tokenize(article[0], tokenizer) for article in data]
        great_dictionary = corpora.Dictionary(self.corpus)
        self.bow_corpus = [make_bow_vector(doc, great_dictionary) for doc in self.corpus]
        cats_dict = {cat: i for cat, i in zip(categories, range(len(categories)))}
        self.labels = [cats_dict.get(article[1]) for article in data]

    def __len__(self) -> int:
        return len(self.labels)

    def __getitem__(self, i) -> Union[List, List, int]:
        return (
            self.bow_corpus[i],
            self.labels[i],
        )

In [19]:
class Wiki_Dataset_BoW_Vector(Dataset):
    def __init__(self, data: List) -> None:
        tokenizer = get_tokenizer(tokenizer=None)
        self.corpus = [tokenize(article[0], tokenizer) for article in data]
        great_dictionary = corpora.Dictionary(self.corpus)
        self.bow_corpus = [great_dictionary.doc2bow(doc) for doc in self.corpus]
        cats_dict = {cat: i for cat, i in zip(categories, range(len(categories)))}
        self.labels = [cats_dict.get(article[1]) for article in cats_dict]

    def __len__(self) -> int:
        return len(self.labels)

    def __getitem__(self, i) -> Union[List, List, int]:
        return (
            self.corpus[i],
            self.bow_corpus[i][0],
            self.labels[i],
        )

In [12]:
dataset_bow = Wiki_Dataset_BoW(wiki_data)

In [13]:
n = 0
for bow, label in dataset_bow:
    print(bow)
    print(label)
    n += 1
    if n > 3:
        break

tensor([3., 1., 1.,  ..., 0., 0., 0.])
2
tensor([0., 0., 0.,  ..., 0., 0., 0.])
2
tensor([0., 0., 0.,  ..., 0., 0., 0.])
2
tensor([0., 0., 0.,  ..., 0., 0., 0.])
11


In [14]:
def split_train_valid_test(corpus: Dataset, valid_ratio: float = 0.1,
                           test_ratio: float = 0.1):
    """Split dataset into train, validation, and test."""
    test_length = int(len(corpus) * test_ratio)
    valid_length = int(len(corpus) * valid_ratio)
    train_length = len(corpus) - valid_length - test_length
    return random_split(
        corpus, lengths=[train_length, valid_length, test_length],
    )

In [15]:
BATCH_SIZE = 250

train_dataset, valid_dataset, test_dataset = split_train_valid_test(
    dataset_bow, valid_ratio=0.1, test_ratio=0.1
)

train_loader = DataLoader(train_dataset, batch_size=BATCH_SIZE)#, collate_fn=lambda x: x )
valid_loader = DataLoader(valid_dataset, batch_size=BATCH_SIZE)#, collate_fn=lambda x: x )
test_loader = DataLoader(test_dataset, batch_size=BATCH_SIZE)#, collate_fn=lambda x: x )

In [16]:
for vectors, labels in train_loader:
    print(vectors.size())
    print(labels.size())
    break

torch.Size([250, 129435])
torch.Size([250])


# Model

In [17]:
class Linear_Classifier(nn.Module):
    def __init__(
        self,
        vocab_size: int = 129435,
        num_labels: int = 13
    ) -> None:
        super(Linear_Classifier, self).__init__()
        fc1 = nn.Linear(vocab_size, vocab_size//10)
        fc2 = nn.Linear(vocab_size//10, vocab_size//100)
        fc3 = nn.Linear(vocab_size//100, vocab_size//1000)
        fc4 = nn.Linear(vocab_size//1000, num_labels)
        leaky_relu = nn.LeakyReLU(0.2)
        self.model = nn.Sequential(
            fc1, leaky_relu, fc2, leaky_relu, fc3, leaky_relu, fc4)

    def forward(self, x):
        output = self.model(x)
        return F.log_softmax(output, dim=1)

In [18]:
model = Linear_Classifier()
model

Linear_Classifier(
  (model): Sequential(
    (0): Linear(in_features=129435, out_features=12943, bias=True)
    (1): LeakyReLU(negative_slope=0.2)
    (2): Linear(in_features=12943, out_features=1294, bias=True)
    (3): LeakyReLU(negative_slope=0.2)
    (4): Linear(in_features=1294, out_features=129, bias=True)
    (5): LeakyReLU(negative_slope=0.2)
    (6): Linear(in_features=129, out_features=13, bias=True)
  )
)

In [19]:
def accuracy(pred: torch.Tensor,
             ground: torch.Tensor) -> float:
    pred_class = pred.argmax(dim=1)
    return sum(pred_class==ground).item()


def train(model: nn.Sequential,
          device: torch.device,
          train_loader: DataLoader,
          optimizer,
          criterion) -> None:
    model.train()
    for bow_vectors, labels in train_loader:
        bow_vectors = bow_vectors.to(device)
        labels = labels.to(device)
        optimizer.zero_grad()

        output = model(bow_vectors)
        loss = criterion(output, labels)
        loss.backward()
        optimizer.step()


def test(model: nn.Sequential,
         device: torch.device,
         test_loader: DataLoader,
         criterion) -> Tuple[float, float]:
    model.eval()
    test_loss = 0
    acc = 0
    with torch.no_grad():
        for bow_vectors, labels in test_loader:
            bow_vectors = bow_vectors.to(device)
            labels = labels.to(device)
            output = model(bow_vectors)
            acc += accuracy(output, labels)
            loss = criterion(output, labels)
            test_loss += loss.item()
    return (test_loss / len(test_loader.dataset),
            acc / len(test_loader.dataset))

In [22]:
device = torch.device('cpu')
t0 = time.time()
acc_list =[]
test_loss_list = []
epochs = 1
lr = 1e-3
criterion = nn.CrossEntropyLoss()
optimizer = torch.optim.Adam(model.parameters(), lr=lr)
for epoch in range(epochs):
    train(model, device, train_loader, optimizer, criterion)
    test_loss, acc = test(model, device, test_loader, criterion)
    acc_list.append(acc)
    test_loss_list.append(test_loss)
    t1 = (time.time() - t0) / 60
    print('Epoch: {}, test loss: {:.3f}, accuracy: {:.3f}, ' + 
            'time: {:.2f} min'.format(epoch+1, test_loss, acc, t1))

KeyboardInterrupt: 