In [1]:
# Подключим необходимые библиотеки
from typing import Union, Optional, Tuple, List
import pandas as pd
import numpy as np
import nltk
from nltk.lm.vocabulary import Vocabulary
from sklearn.model_selection import train_test_split as TTsplit
from sklearn.naive_bayes import MultinomialNB
import torch
import torch.nn as nn
from torch.optim import Adam
from tqdm import trange

np.random.seed(42)
torch.manual_seed(42);

In [2]:
# По имени файла формирует признаки, записывая их в переменные
def split_twitter_data(filename: str,
                       target: bool = True) -> Tuple[List, List, List, Optional[List]]:
    df = pd.read_csv(filename)
    keywords, locations, texts = list(df.iloc[:, 1]), list(df.iloc[:, 2]), list(df.iloc[:, 3])
    targets = None
    if target:
        targets = list(df.iloc[:, 4])
    return keywords, locations, texts, targets

In [3]:
# Производит токенизацию списка строк выбранным токенизатором
def tokenize(strings: List[Optional[str]],
             tokenizer, callable_ : bool = False) -> List[List[str]]:
    result = []
    for s in strings:
        if s is np.nan:
            result.append('')
        else:
            result.append(s)
    result = [tokenizer(s) for s in result] if callable_ else [tokenizer.tokenize(s) for s in result]
    return result

In [4]:
# Обрезает слишком длинные токенизированные строки, удлинняяет короткие
# В результате каждая токенизированная строка состоит из одинакового количества токенов
def to_one_len(tokenized_strings: List[List[str]]) -> List[List[str]]:
    result = []
    lens = np.array([len(s) for s in tokenized_strings])
    tokenized_string_max_len = int(np.ceil(np.quantile(lens, 0.95)))
    result = [s[:tokenized_string_max_len] for s in tokenized_strings]
    for s in result:
        if len(s) < tokenized_string_max_len:
            s.extend(['<UNK>'] * (tokenized_string_max_len - len(s)))
    return result

In [5]:
# По списку токенизированных строк получает словарь
def get_vocab(*features, unk_cutoff=1) -> Vocabulary:
    tokens = []
    for feature in features:
        tokens.extend([token for observation in feature for token in observation])
    tokens = np.random.permutation(tokens)
    vocab = Vocabulary(tokens, unk_cutoff)
    return vocab

In [6]:
# Переводит каждый токен в его индекс в словаре
def to_idx(vocab, *features) -> List[np.array]:
    result = []
    for feature in features:
        result.append(
            np.array(
                [[vocab[token] for token in observation] for observation in feature],
                dtype=np.int32
            )
        )
    return result

In [7]:
# Загружает данные из файла с именем filename
# После вызова функции получаем данные, готовые для обучения модели
def twitter_data_preprocessing(filename: str,
                               vocab: Optional[Vocabulary] = None,
                               target: bool = True,
                               unk_cutoff: int = 1,
                               test_size: float = 0.2,
                              ) -> Tuple[Vocabulary, torch.tensor, Optional[torch.tensor]]:
    # Чтение и разбиение файла на признаки
    keywords, locations, texts, targets = split_twitter_data(filename, target)
    if targets is not None:
        targets = np.array(targets, dtype=np.int64)
    
    # Задание токенизаторов
    regular_tokenizer = nltk.tokenize.wordpunct_tokenize
    tweet_tokenizer = nltk.tokenize.TweetTokenizer(preserve_case=False, reduce_len=True,
                                                   strip_handles=True, match_phone_numbers=False)
    
    # Токенизация признаков
    keywords = tokenize(keywords, regular_tokenizer, callable_=True)
    locations = tokenize(locations, regular_tokenizer, callable_=True)
    texts = tokenize(texts, tweet_tokenizer, callable_=False)
    
    # Разбиение на train и test
    if test_size != 0 and target:
        k_train, k_test, l_train, l_test, t_train, t_test, tg_train, tg_test = TTsplit(keywords,
                                                                                       locations,
                                                                                       texts,
                                                                                       targets,
                                                                                       test_size=test_size)
    elif test_size != 0:
        k_train, k_test, l_train, l_test, t_train, t_test = TTsplit(keywords,
                                                                    locations,
                                                                    texts,
                                                                    test_size=test_size)
    
    # Приведение размерности признаков к одной длине
    if test_size == 0:
        keywords = to_one_len(keywords)
        locations = to_one_len(locations)
        texts = to_one_len(texts)
    else:
        k_train, k_test = to_one_len(k_train), to_one_len(k_test)
        l_train, l_test = to_one_len(l_train), to_one_len(l_test)
        t_train, t_test = to_one_len(t_train), to_one_len(t_test)
    
    # Получение словаря
    if vocab is None:
        if test_size == 0:
            vocab = get_vocab(keywords, locations, texts, unk_cutoff=unk_cutoff)
        else:
            vocab = get_vocab(k_train, l_train, t_train, unk_cutoff=unk_cutoff)
    
    # Преобразование признаков в векторы (токен -> его индекс в словаре vocab)
    if test_size == 0:
        keywords, locations, texts = to_idx(vocab, keywords, locations, texts)
    else:
        k_train, l_train, t_train = to_idx(vocab, k_train, l_train, t_train)
        k_test, l_test, t_test = to_idx(vocab, k_test, l_test, t_test) 
    
    # Конкатенация признаков
    if test_size == 0:
        data = torch.hstack((keywords, locations, texts))
    else:
        data_train = np.hstack((k_train, l_train, t_train))
        data_test = np.hstack((k_test, l_test, t_test))
    
    if test_size == 0:
        return vocab, data, targets
    
    if target:
        return vocab, data_train, data_test, tg_train, tg_test
    
    return vocab, data_train, data_test, None, None

In [8]:
%%time
# Предобработка данных из файла train.csv, перенос на видеопамять
vocab, X_train, X_test, y_train, y_test = twitter_data_preprocessing('train.csv',
                                                                     target=True, unk_cutoff=4)
len(vocab)

CPU times: total: 1.25 s
Wall time: 1.25 s


3430

Решим задачу с помощью наивного байесовского классификатора

In [9]:
%%time
model = MultinomialNB(class_prior=np.array([0.6, 0.4]))
model.fit(X_train, y_train)
print('Model accuracy on train data after training: {}'.format(model.score(X_train, y_train)))
print('Model accuracy on test data after training: {}'.format(model.score(X_test, y_test)))

Model accuracy on train data after training: 0.5801313628899836
Model accuracy on test data after training: 0.5666447800393959
CPU times: total: 62.5 ms
Wall time: 8.14 ms


Теперь решим задачу с помощью нейросетевой модели LSTM

In [10]:
# Описание модели нейронной сети LSTM для классификации
class LSTM(nn.Module):
    def __init__(self, num_embeddings, embedding_dim, hidden_size, num_layers=1):
        super().__init__()
        self.emb = nn.Embedding(num_embeddings, embedding_dim)
        self.lstm = nn.LSTM(embedding_dim, hidden_size, num_layers=num_layers, batch_first=True)
        self.dropout1 = nn.Dropout(0.5)
        self.linear1 = nn.Linear(hidden_size, hidden_size//4)
        self.relu2 = nn.ReLU()
        self.dropout2 = nn.Dropout(0.5)
        self.linear2 = nn.Linear(hidden_size//4, hidden_size//8)
        self.relu3 = nn.ReLU()
        self.dropout3 = nn.Dropout(0.5)
        self.linear3 = nn.Linear(hidden_size//8, 2)
    
    def forward(self, inp):
        emb_out = self.emb(inp)
        lstm_out, _ = self.lstm(emb_out)
        lin1_out = self.linear1(self.dropout1(lstm_out[:, -1]))
        lin2_out = self.linear2(self.dropout2(self.relu2(lin1_out)))
        lin3_out = self.linear3(self.dropout3(self.relu3(lin2_out)))
        return lin3_out

In [11]:
# Описание процесса тренировки нейронной сети
def train(model, loss_function, X, y, epochs, optimizer, **params):
    print('Training: ')
    optimizer_ = optimizer(model.parameters(), **params)
    for _ in trange(epochs):
        out = model(X)
        loss = loss_function(out, y)
        optimizer_.zero_grad()
        loss.backward()
        optimizer_.step()

In [12]:
def accuracy(model, X, y):
    model.eval()
    y_pred = model(X).detach().argmax(axis=1)
    score = (y_pred == y).sum() / len(y)
    model.train()
    return score

In [13]:
X_train, X_test = torch.tensor(X_train, device='cuda'), torch.tensor(X_test, device='cuda')
y_train, y_test = torch.tensor(y_train, device='cuda'), torch.tensor(y_test, device='cuda')

In [14]:
# Задание модели и функции потерь
model = LSTM(len(vocab), 500, 500).cuda()
loss_function = nn.CrossEntropyLoss().cuda()

In [None]:
%%time
# Тренировка модели, проверка её качества на тренировочных данных после обучения
train(model, loss_function, X_train, y_train, epochs=1000, optimizer=Adam, lr=1e-3)
print('Model accuracy on train data after training: {}'.format(accuracy(model, X_train, y_train)))
print('Model accuracy on test data after training: {}'.format(accuracy(model, X_test, y_test)))

Training: 


 77%|████████████████████████████████████████████████████████████▉                  | 772/1000 [03:13<00:57,  3.96it/s]

In [None]:
# Часть кода для формирования таблицы на kaggle

"""
%%time
_, X_test, _ = twitter_data_preprocessing('test.csv', vocab, target=False, test_size=0)
indices = np.array(pd.read_csv('test.csv').iloc[:, 0], dtype=np.int32)

# Считает предсказания модели и записывает в таблицу
def predict_and_save(model, data, indices, batch_size=40, filename='sample_submission.csv'):
    data_ = data.cuda()
    out = model(data_).argmax(axis=1).cpu().detach().numpy().astype(np.int32)
    table = pd.DataFrame({'id': indices, 'target': np.hstack(outs)})
    table.to_csv(filename, index=False)

predict_and_save(model, X_test, indices)
""";