# Классификация текстов
В этом ноутбуке ваша задача будет разобраться с классификацией твитов на русском языке на позитивные и негативные.

In [109]:
FIRST_RUN = False # Set to False to not run cells that install packages

Для начала подготовим датасет к чтению:

In [110]:
if FIRST_RUN:
    import os
    import shutil
    import gdown
    
    # Define the source and destination paths
    source_files = ['train.csv', 'val.csv']
    destination_dir = os.path.join('.', 'data')
    
    # Create the destination directory if it doesn't exist
    os.makedirs(destination_dir, exist_ok=True)
    
    # Check if files exist in the destination directory
    for file in source_files:
        dest_file_path = os.path.join(destination_dir, file)
        if not os.path.exists(dest_file_path):
            # Download the file if it does not exist
            if file == 'train.csv':
                gdown.download(id="1GujrcFzRdo3E7UtUkcrljzDS9czBBy3s", output=file, quiet=False)
            elif file == 'val.csv':
                gdown.download(id="1vvm-PrV0r2wuGbYYovZSuReYOXpu0JRK", output=file, quiet=False)
            
            # Move the file to the destination directory
            if os.path.exists(file):
                shutil.move(file, destination_dir)
            else:
                print(f"File {file} does not exist.")
        else:
            print(f"File {file} already exists in the destination directory.")

In [111]:
# Do not run this cell each time:
if FIRST_RUN:
    %pip install torch torchtext --index-url https://download.pytorch.org/whl/cu121
    %pip install "numpy<2.0"
    %pip install nltk

import torch
device = torch.device("cuda:0" if torch.cuda.is_available() else "cpu")
print("Used device: ", device)

Used device:  cuda:0


In [112]:
from csv import reader

def dataset_iter(part):
    with open("data/" + part + ".csv", "rt", newline="") as f_in:
        r = reader(f_in)
        next(r)
        while r:
            try: 
                _, text, cls = next(r)
                yield cls, text
            except StopIteration:
                return

In [113]:
def dataset_rows_num(part):
    with open("data/" + part + ".csv", "rt") as f_in:
        rows_num = len(f_in.readlines()) - 1
    return rows_num

In [114]:
from torch.utils.data import IterableDataset

class RawTextIterableDataset(IterableDataset):
    """Простой итератор по текстовому набору данных.
    """

    def __init__(self, full_num_lines, current_pos, iterator):
        """Конструктор
        """
        super(RawTextIterableDataset, self).__init__()
        self.full_num_lines = full_num_lines
        self._iterator = iterator
        self.num_lines = full_num_lines
        self.current_pos = current_pos

    def __iter__(self):
        return self

    def __next__(self):
        if self.current_pos == self.num_lines - 1:
            raise StopIteration
        item = next(self._iterator)
        if self.current_pos is None:
            self.current_pos = 0
        else:
            self.current_pos += 1
        return item

    def __len__(self):
        return self.num_lines

    def pos(self):
        """
        Возвращает текущую позицию в наборе данных.
        """
        return self.current_pos



In [115]:
def RU_TW(part):
    return RawTextIterableDataset(dataset_rows_num(part), 0, dataset_iter(part))

Теперь сделаем словарь:

In [116]:
from torchtext.data.utils import get_tokenizer
from collections import Counter, OrderedDict
from torchtext.vocab import vocab as _vocab

tokenizer = get_tokenizer('toktok', 'ru')
train_iter = RU_TW("train")
counter = Counter()
for (label, line) in train_iter:
    counter.update(tokenizer(line))
sorted_by_freq_tuples = sorted(counter.items(), key=lambda x: x[1], reverse=True)
ordered_dict = OrderedDict(sorted_by_freq_tuples)

unk_token = '<unk>'
vocab = _vocab(ordered_dict, min_freq=1000, specials=[unk_token])
vocab.set_default_index(vocab[unk_token])


Зададим функции предобработки датасета:

In [117]:
text_pipeline = lambda x: [vocab[token] for token in tokenizer(x)]
label_pipeline = lambda x: int(x)

Сделаем загрузчик датасета (на жаргоне "батчеварку"):

In [118]:
import torch
from torch.utils.data import DataLoader

def collate_batch(batch):
    label_list, text_list, offsets = [], [], [0]
    for (_label, _text) in batch:
         label_list.append(label_pipeline(_label))
         processed_text = torch.tensor(text_pipeline(_text), dtype=torch.int64)
         text_list.append(processed_text)
         offsets.append(processed_text.size(0))
    label_list = torch.tensor(label_list, dtype=torch.int64)
    offsets = torch.tensor(offsets[:-1]).cumsum(dim=0)
    text_list = torch.cat(text_list)
    return label_list.to(device), text_list.to(device), offsets.to(device)

train_iter = RU_TW("train")
dataloader = DataLoader(train_iter, batch_size=8, shuffle=False, collate_fn=collate_batch)

Пришло время сделать модель для классификации. Вот ее графическое изображение:

<img src="https://pytorch.org/tutorials/_images/text_sentiment_ngrams_model.png" width="800" height="400">

А вот код:

In [119]:
import torch.nn as nn
import torch.optim as optim
import time

class CNN_TextClassificationModel(nn.Module):
    def __init__(self, vocab_size, embed_dim, num_class):
        super(CNN_TextClassificationModel, self).__init__()
        
        # Embedding layer
        self.embedding = nn.EmbeddingBag(vocab_size, embed_dim, sparse=False, mode='sum')
        
        # Convolutional Layers
        self.conv1 = nn.Conv1d(in_channels=embed_dim, out_channels=128, kernel_size=3, padding=1)
        self.conv2 = nn.Conv1d(in_channels=128, out_channels=128, kernel_size=4, padding=2)
        
        # Max pooling layer
        self.pool = nn.MaxPool1d(padding=3, kernel_size=6)
        
        # Fully connected layers
        self.fc1 = nn.Linear(embed_dim, 256)
        self.fc2 = nn.Linear(256, num_class)
        
        self.dropout = nn.Dropout(0.5)
        
        self.relu = nn.ReLU()
        
        self.init_weights()

    def init_weights(self):
        initrange = 0.5
        self.embedding.weight.data.uniform_(-initrange, initrange)
        self.fc1.weight.data.uniform_(-initrange, initrange)
        self.fc1.bias.data.zero_()
        self.fc2.weight.data.uniform_(-initrange, initrange)
        self.fc2.bias.data.zero_()

    def forward(self, text, offsets):
        embedded = self.embedding(text, offsets)
        embedded = embedded.unsqueeze(1)
        embedded = embedded.permute(0, 2, 1)
        
        # Apply convolutional layers
        conv_out1 = self.relu(self.conv1(embedded))
        conv_out2 = self.relu(self.conv2(conv_out1))
        
        pooled = self.pool(conv_out2)
        pooled_flat = pooled.view(pooled.size(0), -1)
        
        # Apply fully connected layers
        fc1_out = self.dropout(self.relu(self.fc1(pooled_flat)))
        output = self.fc2(fc1_out)
        
        return output



Создадим объект модели:

In [120]:
# Hyperparameters
EPOCHS = 5
LR = 1e-3
BATCH_SIZE = 64
EMBED_DIM = 128

# Create the model
vocab_size = len(vocab)
num_class = len(set([label for (label, text) in RU_TW("train")]))
model = CNN_TextClassificationModel(vocab_size, EMBED_DIM, num_class).to(device)

# Define loss and optimizer
criterion = nn.CrossEntropyLoss()
optimizer = optim.Adam(model.parameters(), lr=LR)
scheduler = optim.lr_scheduler.StepLR(optimizer, 1, gamma=0.9)

Зададим функции тренировки и проверки модели:

In [121]:
def train_cnn(dataloader):
    model.train()
    total_acc, total_count = 0, 0
    for idx, (label, text, offsets) in enumerate(dataloader):
        optimizer.zero_grad()
        predited_label = model(text, offsets)
        loss = criterion(predited_label, label)
        loss.backward()
        optimizer.step()
        total_acc += (predited_label.argmax(1) == label).sum().item()
        total_count += label.size(0)
    return total_acc / total_count

In [122]:
def evaluate_cnn(dataloader):
    model.eval()
    total_acc, total_count = 0, 0
    with torch.no_grad():
        for idx, (label, text, offsets) in enumerate(dataloader):
            predited_label = model(text, offsets)
            total_acc += (predited_label.argmax(1) == label).sum().item()
            total_count += label.size(0)
    return total_acc / total_count

Наконец, обучение:

In [123]:
# Dataset and DataLoader
train_iter = RU_TW("train")
test_iter = RU_TW("val")
train_dataset = list(train_iter)
test_dataset = list(test_iter)

# Split the training set for validation
num_train = int(len(train_dataset) * 0.95)
split_train_, split_valid_ = torch.utils.data.random_split(train_dataset, [num_train, len(train_dataset) - num_train])

train_dataloader = DataLoader(split_train_, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_batch)
valid_dataloader = DataLoader(split_valid_, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_batch)
test_dataloader = DataLoader(test_dataset, batch_size=BATCH_SIZE, shuffle=True, collate_fn=collate_batch)

total_accu = None

# Training loop
for epoch in range(1, EPOCHS + 1):
    epoch_start_time = time.time()
    train_cnn(train_dataloader)
    accu_val = evaluate_cnn(valid_dataloader)
    if total_accu is not None and total_accu > accu_val:
      scheduler.step()
    else:
       total_accu = accu_val
    print('-' * 59)
    print('| end of epoch {:3d} | time: {:5.2f}s | '
          'valid accuracy {:8.3f} '.format(epoch,
                                           time.time() - epoch_start_time,
                                           accu_val))
    print('-' * 59)


-----------------------------------------------------------
| end of epoch   1 | time: 17.81s | valid accuracy    0.646 
-----------------------------------------------------------
-----------------------------------------------------------
| end of epoch   2 | time: 17.64s | valid accuracy    0.653 
-----------------------------------------------------------
-----------------------------------------------------------
| end of epoch   3 | time: 16.92s | valid accuracy    0.656 
-----------------------------------------------------------
-----------------------------------------------------------
| end of epoch   4 | time: 16.08s | valid accuracy    0.654 
-----------------------------------------------------------
-----------------------------------------------------------
| end of epoch   5 | time: 15.95s | valid accuracy    0.652 
-----------------------------------------------------------


И проверка:

In [124]:
print('Testing the model...')
test_acc = evaluate_cnn(test_dataloader)
print(f'Test Accuracy: {test_acc:.3f}')

Testing the model...
Test Accuracy: 0.653


А также финальная, т.н. ручная проверка. Здесь можно задать любой текст, который вы хотите проверить:

In [125]:
def predict(text, text_pipeline):
    with torch.no_grad():
        text = torch.tensor(text_pipeline(text)).to(device)
        output = model(text, torch.tensor([0]).to(device))
        return output.argmax(1).item()

def predict_batch(dataset, text_pipeline):
    predictions = []
    for i in dataset:
        single_line = i[1]
        pred = predict(single_line, text_pipeline)
        predictions.append(pred)
    return predictions

# Assuming test data is loaded
with open("data/Soc_Net_Task_2_test_5.csv", "rt", newline="") as f_in:
    r = reader(f_in)
    next(r)
    test_data = [(None, line) for _, line in r]
predictions = predict_batch(test_data, text_pipeline)

# Print predictions as comma-separated string
print(','.join(map(str, predictions)))


1,1,0,0,0,1,0,1,0,1,1,1,1,1,0,1,0,1,1,1,1,1,0,0,1,0,1,0,0,0,1,1,1,0,1,1,1,0,0,1,1,0,0,1,0,0,1,1,1,0,0,1,1,0,1,1,1,1,1,0,0,1,1,1,0,0,1,0,0,0,1,0,0,1,0,1,1,0,0,1,1,1,1,1,1,0,1,1,0,1,1,0,0,1,1,1,1,1,0,0,0,1,0,0,0,1,0,1,0,0,0,0,0,0,1,0,0,0,0,0,0,0,1,0,0,0,0,0,1,0,1,1,0,1,0,1,1,0,0,1,0,1,0,1,0,1,1,1,1,1,1,1,1,1,1,1,1,1,0,1,0,0,1,1,0,1,1,0,0,1,0,0,0,1,0,1,0,1,0,1,1,0,1,1,1,0,1,0,1,1,0,0,0,1,0,1,0,1,0,1,1,0,0,0,1,1,0,1,1,0,1,1,0,1,1,1,1,0,1,1,1,1,0,1,1,1,1,1,0,0,0,1,1,0,0,0,0,0,1,1,1,1,1,0,1,1,0,1,0,1,1,1,1,0,1,1,0,1,0,1,1,1,1,0,1,1,1,1,1,1,0,1,1,1,1,1,1,0,0,1,1,0,1,0,0,1,0,0,1,1,0,1,1,1,0,1,0,1,1,1,1,1,1,0,0,1,1,1,1,1,1,0,0,0,1,1,1,1,1,0,1,0,1,1,1,1,0,0,0,1,1,1,0,1,1,1,0,0,0,1,0,0,0,0,1,0,0,0,1,0,1,1,0,1,1,0,0,1,1,0,1,1,1,0,1,1,0,0,0,1,0,1,1,1,0,0,0,0,0,1,1,1,1,0,0,0,0,1,1,1,1,1,1,1,0,1,0,0,0,1,1,0,0,0,1,0,1,1,1,0,1,1,1,0,0,1,1,1,1,0,1,1,0,1,1,0,1,1,1,0,0,0,0,1,1,0,0,0,0,1,0,0,1,0,1,0,1,1,0,0,0,1,1,1,1,0,1,1,0,1,1,1,1,1,1,1,0,1,1,1,0,1,0,0,1,1,1,0,1,1,1,0,0,0,1,0,1,1,0,0,0,1,1,1,1,0,1,1,1,1,

Получил Test Accuracy: 0.653. Для теста с `Soc_Net_Task_2_test_5.csv` получил "Точность вашей модели: 0.673. Превосходно!".