In [None]:
import pandas as pd
import numpy as  np
import tensorflow as tf
import tflearn
import re

from pymorphy2 import MorphAnalyzer
from collections import Counter
from sklearn.model_selection import train_test_split
from tflearn.data_utils import to_categorical
from nltk.stem.snowball import RussianStemmer
from nltk.tokenize import TweetTokenizer

# ПЕРЕМЕННЫЕ

In [None]:
path_to_positive = "data\positive.csv"
path_to_neganive = "data/negative.csv"

VOCAB_SIZE = 5000

# ЗАГРУЖАЕМ ДАННЫЕ

In [None]:
tweets_col_number = 3
negative_tweets = pd.read_csv(path_to_neganive, header=None, delimiter=";")[[tweets_col_number]]
positive_tweets = pd.read_csv(path_to_positive, header=None, delimiter=";")[[tweets_col_number]]

del path_to_neganive
del path_to_positive

# СОЗДАЕМ СТЕММЕР

In [None]:
stemmer = RussianStemmer()
regex = re.compile('[^а-яА-Я ]')
stem_cache = {}

def get_stem(token):
    stem = stem_cache.get(token, None)
    if stem:
        return stem
    token = regex.sub('', token).lower()
    stem = stemmer.stem(token)
    stem_cache[token] = stem
    return stem

# СОЗДАЕМ ЛЕММАТИЗАТОР

In [None]:
lemmatizer = MorphAnalyzer()
lemma_cache = {}

def get_lemma(token):
    lemma = lemma_cache.get(token, None)
    if lemma:
        return lemma
    token = regex.sub('', token).lower()
    lemma = lemmatizer.parse(token)[0].normal_form
    lemma_cache[token] = lemma
    return lemma

# СОЗДАНИЕ СЛОВАРЯ СТЕМ

In [None]:
stem_count = Counter()
tokenizer = TweetTokenizer()

def count_unique_stems_in_tweets(tweets):
    for _, tweet_series in tweets.iterrows():
        tweet = tweet_series[3]
        tokens = tokenizer.tokenize(tweet)
        for token in tokens:
            stem = get_stem(token)
            stem_count[stem] += 1

count_unique_stems_in_tweets(negative_tweets)
count_unique_stems_in_tweets(positive_tweets)

In [None]:
print(f"Найдено {len(stem_count)} уникальных стем")

In [None]:
vocab = sorted(stem_count, key=stem_count.get, reverse=True)[:VOCAB_SIZE]
print(vocab[:20])

In [None]:
token_2_idx = {vocab[i] : i for i in range(VOCAB_SIZE)}
len(token_2_idx)

# СОЗДАНИЕ СЛОВАРЯ ЛЕММ

In [None]:
lemm_count = Counter()
tokenizer = TweetTokenizer()

def count_unique_lemm_in_tweets(tweets):
    for _, tweet_series in tweets.iterrows():
        tweet = tweet_series[3]
        tokens = tokenizer.tokenize(tweet)
        for token in tokens:
            stem = get_lemma(token)
            lemm_count[stem] += 1

# count_unique_lemm_in_tweets(negative_tweets)
# count_unique_lemm_in_tweets(positive_tweets)

In [None]:
print(f"Найдено {len(lemm_count)} уникальных лемм")

In [None]:
lemm_vocab = sorted(lemm_count, key=lemm_count.get, reverse=True)[:VOCAB_SIZE]
print(vocab[:20])

In [None]:
def tweet_to_vector(tweet, show_unknowns=False):
    vector = np.zeros(VOCAB_SIZE, dtype=np.int_)
    for token in tokenizer.tokenize(tweet):
        stem = get_stem(token)
        idx = token_2_idx.get(stem, None)
        if idx is not None:
            vector[idx] = 1
        elif show_unknowns:
            print(f"Unknow token {token}")
    return vector

In [None]:
tweet = negative_tweets.iloc[1][3]
print(f"tweet: {tweet}")
print(f"vector: {tweet_to_vector(tweet)[:10]}")
print(vocab[2])

# ПРЕОБАЗОВАНИЕ ТВИТОВ В ВЕКТОРНОЕ ПРЕДСТАВЛЕНИЕ

In [None]:
tweet_vectors = np.zeros((len(negative_tweets) + len(positive_tweets), VOCAB_SIZE),dtype=np.int_)
tweets = []
for ii, (_, tweet) in enumerate(negative_tweets.iterrows()):
    tweets.append(tweet[3])
    tweet_vectors[ii] = tweet_to_vector(tweet[3])
for ii, (_, tweet) in enumerate(positive_tweets.iterrows()):
    tweets.append(tweet[3])
    tweet_vectors[ii + len(negative_tweets)] = tweet_to_vector(tweet[3])

# ПОДГОТОВКА РАЗМЕТКИ

In [None]:
labels = np.append(
    np.zeros(len(negative_tweets), dtype=np.int_),
    np.ones(len(positive_tweets), dtype=np.int_)
)

# ПОДГОТОВИМ ДАННЫЕ К ОБУЧЕНИЮ

In [None]:
X = tweet_vectors
y = to_categorical(labels, 2)
X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.1)

In [None]:
print(y_test[:10])

# СТРОИМ НЕЙРОННУЮ СЕТЬ

In [None]:
def build_model(learning_rate=0.1):
    tf.compat.v1.reset_default_graph()

    net = tflearn.input_data([None, VOCAB_SIZE])
    net = tflearn.fully_connected(net, 125, activation="ReLU")
    net = tflearn.fully_connected(net, 25, activation="ReLU")
    net = tflearn.fully_connected(net, 2, activation="softmax")
    regression = tflearn.regression(
        net,
        optimizer='sgd',
        learning_rate=learning_rate,
        loss='categorical_crossentropy'
    )

    model = tflearn.DNN(net)
    return model

In [None]:
model = build_model(learning_rate=0.75)

In [None]:
model.fit(
    X_train,
    y_train,
    validation_set=0.01,
    show_metric=True,
    batch_size=128,
    n_epoch=2
)

# Тестирование

In [None]:
predict = (np.array(model.predict(X_test))[:,0] >= 0.5).astype(np.int_)
accuracy = np.mean(predict == y_test[:,0], axis = 0)
print(f"Accuracy: {accuracy}")

In [None]:
predict